http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index b1b7b10..2309e87 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -35,9 +35,10 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, - WriteType writeType) + WriteType writeType, + long queryStartNanoTime) { - super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType); + super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType, queryStartNanoTime); assert consistencyLevel.isDatacenterLocal(); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 47eacdf..3f1ff3c 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -52,7 +52,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> public final ResponseResolver resolver; private final SimpleCondition condition = new SimpleCondition(); - private final long start; + private final long queryStartNanoTime; final int blockfor; final List<InetAddress> endpoints; private final ReadCommand command; @@ -69,24 +69,25 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> /** * Constructor when response count has to be calculated and blocked for. */ - public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, long queryStartNanoTime) { this(resolver, consistencyLevel, consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)), command, Keyspace.open(command.metadata().ksName), - filteredEndpoints); + filteredEndpoints, + queryStartNanoTime); } - public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints, long queryStartNanoTime) { this.command = command; this.keyspace = keyspace; this.blockfor = blockfor; this.consistencyLevel = consistencyLevel; this.resolver = resolver; - this.start = System.nanoTime(); + this.queryStartNanoTime = queryStartNanoTime; this.endpoints = endpoints; // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size(); @@ -97,7 +98,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> public boolean await(long timePastStart, TimeUnit unit) { - long time = unit.toNanos(timePastStart) - (System.nanoTime() - start); + long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime); try { return condition.await(time, TimeUnit.NANOSECONDS); @@ -138,7 +139,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve(); if (logger.isTraceEnabled()) - logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime)); return result; } @@ -163,7 +164,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> TraceState traceState = Tracing.instance.get(); if (traceState != null) traceState.trace("Initiating read-repair"); - StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState)); + StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState, queryStartNanoTime)); } } } @@ -210,10 +211,12 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> private class AsyncRepairRunner implements Runnable { private final TraceState traceState; + private final long queryStartNanoTime; - public AsyncRepairRunner(TraceState traceState) + public AsyncRepairRunner(TraceState traceState, long queryStartNanoTime) { this.traceState = traceState; + this.queryStartNanoTime = queryStartNanoTime; } public void run() @@ -236,7 +239,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> ReadRepairMetrics.repairedBackground.mark(); - final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size()); + final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime); AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); for (InetAddress endpoint : endpoints) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9283c04..9bf90dc 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -223,10 +223,11 @@ public class StorageProxy implements StorageProxyMBean CASRequest request, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, - ClientState state) + ClientState state, + long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { - final long start = System.nanoTime(); + final long startTimeForMetrics = System.nanoTime(); int contentions = 0; try { @@ -236,14 +237,14 @@ public class StorageProxy implements StorageProxyMBean CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (System.nanoTime() - start < timeout) + while (System.nanoTime() - queryStartNanoTime < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); + final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); final UUID ballot = pair.left; contentions += pair.right; @@ -253,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; FilteredPartition current; - try (RowIterator rowIter = readOne(readCommand, readConsistency)) + try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime)) { current = FilteredPartition.create(rowIter); } @@ -281,9 +282,9 @@ public class StorageProxy implements StorageProxyMBean Commit proposal = Commit.newProposal(ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) + if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime)) { - commitPaxos(proposal, consistencyForCommit, true); + commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); Tracing.trace("CAS successful"); return null; } @@ -318,7 +319,7 @@ public class StorageProxy implements StorageProxyMBean { if(contentions > 0) casWriteMetrics.contention.update(contentions); - final long latency = System.nanoTime() - start; + final long latency = System.nanoTime() - startTimeForMetrics; casWriteMetrics.addNano(latency); writeMetricsMap.get(consistencyForPaxos).addNano(latency); } @@ -373,7 +374,7 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static Pair<UUID, Integer> beginAndRepairPaxos(long start, + private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, CFMetaData metadata, List<InetAddress> liveEndpoints, @@ -388,7 +389,7 @@ public class StorageProxy implements StorageProxyMBean PrepareCallback summary = null; int contentions = 0; - while (System.nanoTime() - start < timeout) + while (System.nanoTime() - queryStartNanoTime < timeout) { // We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected // already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known @@ -403,7 +404,7 @@ public class StorageProxy implements StorageProxyMBean // prepare Tracing.trace("Preparing {}", ballot); Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos); + summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime); if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); @@ -426,11 +427,11 @@ public class StorageProxy implements StorageProxyMBean else casReadMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); - if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) + if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime)) { try { - commitPaxos(refreshedInProgress, consistencyForCommit, false); + commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime); } catch (WriteTimeoutException e) { @@ -481,10 +482,10 @@ public class StorageProxy implements StorageProxyMBean MessagingService.instance().sendOneWay(message, target); } - private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos) + private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime) throws WriteTimeoutException { - PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos); + PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); for (InetAddress target : endpoints) MessagingService.instance().sendRR(message, target, callback); @@ -492,10 +493,10 @@ public class StorageProxy implements StorageProxyMBean return callback; } - private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel) + private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws WriteTimeoutException { - ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel); + ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); for (InetAddress target : endpoints) MessagingService.instance().sendRR(message, target, callback); @@ -511,7 +512,7 @@ public class StorageProxy implements StorageProxyMBean return false; } - private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint) throws WriteTimeoutException + private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws WriteTimeoutException { boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); @@ -524,7 +525,7 @@ public class StorageProxy implements StorageProxyMBean if (shouldBlock) { AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime); } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); @@ -597,8 +598,9 @@ public class StorageProxy implements StorageProxyMBean * * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation + * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ - public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level) + public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); @@ -613,12 +615,12 @@ public class StorageProxy implements StorageProxyMBean { if (mutation instanceof CounterMutation) { - responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter)); + responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime)); } else { WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH; - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt)); + responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt, queryStartNanoTime)); } } @@ -728,8 +730,9 @@ public class StorageProxy implements StorageProxyMBean * @param mutations the mutations to be applied across the replicas * @param writeCommitLog if commitlog should be written * @param baseComplete time from epoch in ms that the local base mutation was(or will be) completed + * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ - public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete) + public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for mutation"); @@ -791,7 +794,8 @@ public class StorageProxy implements StorageProxyMBean Collections.singletonList(pairedEndpoint.get()), baseComplete, WriteType.BATCH, - cleanup)); + cleanup, + queryStartNanoTime)); } } else @@ -834,7 +838,8 @@ public class StorageProxy implements StorageProxyMBean @SuppressWarnings("unchecked") public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, - boolean mutateAtomically) + boolean mutateAtomically, + long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException { Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); @@ -844,13 +849,13 @@ public class StorageProxy implements StorageProxyMBean .updatesAffectView(mutations, true); if (augmented != null) - mutateAtomically(augmented, consistencyLevel, updatesView); + mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime); else { if (mutateAtomically || updatesView) - mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView); + mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView, queryStartNanoTime); else - mutate(mutations, consistencyLevel); + mutate(mutations, consistencyLevel, queryStartNanoTime); } } @@ -863,10 +868,12 @@ public class StorageProxy implements StorageProxyMBean * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog + * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level, - boolean requireQuorumForRemove) + boolean requireQuorumForRemove, + long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for atomic batch"); @@ -894,7 +901,7 @@ public class StorageProxy implements StorageProxyMBean final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -903,14 +910,15 @@ public class StorageProxy implements StorageProxyMBean consistency_level, batchConsistencyLevel, WriteType.BATCH, - cleanup); + cleanup, + queryStartNanoTime); // exit early if we can't fulfill the CL at this time. wrapper.handler.assureSufficientLiveNodes(); wrappers.add(wrapper); } // write to the batchlog - syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID); + syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime); // now actually perform the writes and wait for them to complete syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); @@ -950,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean return replica.equals(FBUtilities.getBroadcastAddress()); } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid) + private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all, @@ -958,7 +966,8 @@ public class StorageProxy implements StorageProxyMBean endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, Keyspace.open(SystemKeyspace.NAME), null, - WriteType.BATCH_LOG); + WriteType.BATCH_LOG, + queryStartNanoTime); Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); @@ -987,13 +996,13 @@ public class StorageProxy implements StorageProxyMBean } } - private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid) + private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime) { if (!endpoints.current.isEmpty()) asyncRemoveFromBatchlog(endpoints.current, uuid); if (!endpoints.legacy.isEmpty()) - LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid); + LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime); } private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) @@ -1054,14 +1063,15 @@ public class StorageProxy implements StorageProxyMBean * given the list of write endpoints (either standardWritePerformer for * standard writes or counterWritePerformer for counter writes). * @param callback an optional callback to be run if and when the write is - * successful. + * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation, - ConsistencyLevel consistency_level, - String localDataCenter, - WritePerformer performer, - Runnable callback, - WriteType writeType) + ConsistencyLevel consistency_level, + String localDataCenter, + WritePerformer performer, + Runnable callback, + WriteType writeType, + long queryStartNanoTime) throws UnavailableException, OverloadedException { String keyspaceName = mutation.getKeyspaceName(); @@ -1071,7 +1081,7 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); @@ -1085,7 +1095,8 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level, ConsistencyLevel batchConsistencyLevel, WriteType writeType, - BatchlogResponseHandler.BatchlogCleanup cleanup) + BatchlogResponseHandler.BatchlogCleanup cleanup, + long queryStartNanoTime) { Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); @@ -1093,8 +1104,8 @@ public class StorageProxy implements StorageProxyMBean Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); - BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime); + BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime); return new WriteResponseHandlerWrapper(batchHandler, mutation); } @@ -1108,7 +1119,8 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> naturalEndpoints, AtomicLong baseComplete, WriteType writeType, - BatchlogResponseHandler.BatchlogCleanup cleanup) + BatchlogResponseHandler.BatchlogCleanup cleanup, + long queryStartNanoTime) { Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); @@ -1118,8 +1130,8 @@ public class StorageProxy implements StorageProxyMBean AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> { long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get()); viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS); - }, writeType); - BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); + }, writeType, queryStartNanoTime); + BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime); return new WriteResponseHandlerWrapper(batchHandler, mutation); } @@ -1400,13 +1412,13 @@ public class StorageProxy implements StorageProxyMBean * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. */ - public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException + public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException { InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); if (endpoint.equals(FBUtilities.getBroadcastAddress())) { - return applyCounterMutationOnCoordinator(cm, localDataCenter); + return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); } else { @@ -1417,10 +1429,10 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes(); + rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes(); // Forward the actual update to the chosen leader replica - AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER); + AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime); Tracing.trace("Enqueuing counter update to {}", endpoint); MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false); @@ -1467,18 +1479,18 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) + public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime) throws UnavailableException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER); + return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) - public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) + public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER); + return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime); } private static Runnable counterWriteTask(final IMutation mutation, @@ -1512,31 +1524,31 @@ public class StorageProxy implements StorageProxyMBean return true; } - public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) + public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { - return readOne(command, consistencyLevel, null); + return readOne(command, consistencyLevel, null, queryStartNanoTime); } - public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state) + public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { - return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command); + return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state, queryStartNanoTime), command); } - public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel) + public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { // When using serial CL, the ClientState should be provided assert !consistencyLevel.isSerialConsistency(); - return read(group, consistencyLevel, null); + return read(group, consistencyLevel, null, queryStartNanoTime); } /** * Performs the actual reading of a row out of the StorageService, fetching * a specific set of column names from a given column family. */ - public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state) + public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands)) @@ -1547,11 +1559,11 @@ public class StorageProxy implements StorageProxyMBean } return consistencyLevel.isSerialConsistency() - ? readWithPaxos(group, consistencyLevel, state) - : readRegular(group, consistencyLevel); + ? readWithPaxos(group, consistencyLevel, state, queryStartNanoTime) + : readRegular(group, consistencyLevel, queryStartNanoTime); } - private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state) + private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { assert state != null; @@ -1591,7 +1603,7 @@ public class StorageProxy implements StorageProxyMBean throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); } - result = fetchRows(group.commands, consistencyForCommitOrFetch); + result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime); } catch (UnavailableException e) { @@ -1627,13 +1639,13 @@ public class StorageProxy implements StorageProxyMBean } @SuppressWarnings("resource") - private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel) + private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, ReadFailureException, ReadTimeoutException { long start = System.nanoTime(); try { - PartitionIterator result = fetchRows(group.commands, consistencyLevel); + PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime); // If we have more than one command, then despite each read command honoring the limit, the total result // might not honor it and so we should enforce it if (group.commands.size() > 1) @@ -1680,14 +1692,14 @@ public class StorageProxy implements StorageProxyMBean * 4. If the digests (if any) match the data return the data * 5. else carry out read repair by getting data from all the nodes. */ - private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel) + private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, ReadFailureException, ReadTimeoutException { int cmdCount = commands.size(); SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount]; for (int i = 0; i < cmdCount; i++) - reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel); + reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel, queryStartNanoTime); for (int i = 0; i < cmdCount; i++) reads[i].doInitialQueries(); @@ -1717,15 +1729,17 @@ public class StorageProxy implements StorageProxyMBean private final SinglePartitionReadCommand command; private final AbstractReadExecutor executor; private final ConsistencyLevel consistency; + private final long queryStartNanoTime; private PartitionIterator result; private ReadCallback repairHandler; - SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency) + SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency, long queryStartNanoTime) { this.command = command; - this.executor = AbstractReadExecutor.getReadExecutor(command, consistency); + this.executor = AbstractReadExecutor.getReadExecutor(command, consistency, queryStartNanoTime); this.consistency = consistency; + this.queryStartNanoTime = queryStartNanoTime; } boolean isDone() @@ -1757,13 +1771,14 @@ public class StorageProxy implements StorageProxyMBean // Do a full data read to resolve the correct response (and repair node that need be) Keyspace keyspace = Keyspace.open(command.metadata().ksName); - DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size()); + DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime); repairHandler = new ReadCallback(resolver, ConsistencyLevel.ALL, executor.getContactedReplicas().size(), command, keyspace, - executor.handler.endpoints); + executor.handler.endpoints, + queryStartNanoTime); for (InetAddress endpoint : executor.getContactedReplicas()) { @@ -2052,6 +2067,7 @@ public class StorageProxy implements StorageProxyMBean private final ConsistencyLevel consistency; private final long startTime; + private final long queryStartNanoTime; private DataLimits.Counter counter; private PartitionIterator sentQueryIterator; @@ -2061,7 +2077,7 @@ public class StorageProxy implements StorageProxyMBean private int liveReturned; private int rangesQueried; - public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime) { this.command = command; this.concurrencyFactor = concurrencyFactor; @@ -2070,6 +2086,7 @@ public class StorageProxy implements StorageProxyMBean this.totalRangeCount = ranges.rangeCount(); this.consistency = consistency; this.keyspace = keyspace; + this.queryStartNanoTime = queryStartNanoTime; } public RowIterator computeNext() @@ -2145,12 +2162,12 @@ public class StorageProxy implements StorageProxyMBean { PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst); - DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime); int blockFor = consistency.blockFor(keyspace); int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); - ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime); handler.assureSufficientLiveNodes(); @@ -2204,7 +2221,7 @@ public class StorageProxy implements StorageProxyMBean } @SuppressWarnings("resource") - public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) { Tracing.trace("Computing ranges to query"); @@ -2225,7 +2242,7 @@ public class StorageProxy implements StorageProxyMBean // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec()); + return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec()); } public Map<String, List<String>> getSchemaVersions() @@ -2485,9 +2502,9 @@ public class StorageProxy implements StorageProxyMBean */ private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation> { - public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup) + public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime) { - super(writeHandler, i, cleanup); + super(writeHandler, i, cleanup, queryStartNanoTime); viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 1dc03e0..46e4e93 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -47,20 +47,21 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, - WriteType writeType) + WriteType writeType, + long queryStartNanoTime) { - super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType); + super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime); responses = totalBlockFor(); } - public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback) + public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime) { - this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType); + this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime); } - public WriteResponseHandler(InetAddress endpoint, WriteType writeType) + public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime) { - this(endpoint, writeType, null); + this(endpoint, writeType, null, queryStartNanoTime); } public void response(MessageIn<T> m) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 01a56c4..d9b3632 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -55,7 +55,7 @@ abstract class AbstractQueryPager implements QueryPager return command.executionController(); } - public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) { if (isExhausted()) return EmptyIterators.partition(); @@ -63,7 +63,7 @@ abstract class AbstractQueryPager implements QueryPager pageSize = Math.min(pageSize, remaining); Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager); + return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager); } public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java index 5483d15..f9a8cda 100644 --- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java @@ -52,12 +52,13 @@ public final class AggregationQueryPager implements QueryPager @Override public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, - ClientState clientState) + ClientState clientState, + long queryStartNanoTime) { if (limits.isGroupByLimit()) - return new GroupByPartitionIterator(pageSize, consistency, clientState); + return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); - return new AggregationPartitionIterator(pageSize, consistency, clientState); + return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); } @Override @@ -70,9 +71,9 @@ public final class AggregationQueryPager implements QueryPager public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) { if (limits.isGroupByLimit()) - return new GroupByPartitionIterator(pageSize, executionController); + return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime()); - return new AggregationPartitionIterator(pageSize, executionController); + return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime()); } @Override @@ -152,28 +153,34 @@ public final class AggregationQueryPager implements QueryPager */ private int initialMaxRemaining; + private long queryStartNanoTime; + public GroupByPartitionIterator(int pageSize, ConsistencyLevel consistency, - ClientState clientState) + ClientState clientState, + long queryStartNanoTime) { - this(pageSize, consistency, clientState, null); + this(pageSize, consistency, clientState, null, queryStartNanoTime); } public GroupByPartitionIterator(int pageSize, - ReadExecutionController executionController) + ReadExecutionController executionController, + long queryStartNanoTime) { - this(pageSize, null, null, executionController); + this(pageSize, null, null, executionController, queryStartNanoTime); } private GroupByPartitionIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, - ReadExecutionController executionController) + ReadExecutionController executionController, + long queryStartNanoTime) { this.pageSize = handlePagingOff(pageSize); this.consistency = consistency; this.clientState = clientState; this.executionController = executionController; + this.queryStartNanoTime = queryStartNanoTime; } private int handlePagingOff(int pageSize) @@ -280,7 +287,7 @@ public final class AggregationQueryPager implements QueryPager */ private final PartitionIterator fetchSubPage(int subPageSize) { - return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState) + return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime) : subPager.fetchPageInternal(subPageSize, executionController); } @@ -393,15 +400,17 @@ public final class AggregationQueryPager implements QueryPager { public AggregationPartitionIterator(int pageSize, ConsistencyLevel consistency, - ClientState clientState) + ClientState clientState, + long queryStartNanoTime) { - super(pageSize, consistency, clientState); + super(pageSize, consistency, clientState, queryStartNanoTime); } public AggregationPartitionIterator(int pageSize, - ReadExecutionController executionController) + ReadExecutionController executionController, + long queryStartNanoTime) { - super(pageSize, executionController); + super(pageSize, executionController, queryStartNanoTime); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 9670f28..75cc71f 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -142,17 +142,17 @@ public class MultiPartitionPager implements QueryPager } @SuppressWarnings("resource") // iter closed via countingIter - public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { int toQuery = Math.min(remaining, pageSize); - return new PagersIterator(toQuery, consistency, clientState, null); + return new PagersIterator(toQuery, consistency, clientState, null, queryStartNanoTime); } @SuppressWarnings("resource") // iter closed via countingIter public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException { int toQuery = Math.min(remaining, pageSize); - return new PagersIterator(toQuery, null, null, executionController); + return new PagersIterator(toQuery, null, null, executionController, System.nanoTime()); } private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator @@ -160,6 +160,7 @@ public class MultiPartitionPager implements QueryPager private final int pageSize; private PartitionIterator result; private boolean closed; + private final long queryStartNanoTime; // For "normal" queries private final ConsistencyLevel consistency; @@ -171,12 +172,13 @@ public class MultiPartitionPager implements QueryPager private int pagerMaxRemaining; private int counted; - public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController) + public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController, long queryStartNanoTime) { this.pageSize = pageSize; this.consistency = consistency; this.clientState = clientState; this.executionController = executionController; + this.queryStartNanoTime = queryStartNanoTime; } protected RowIterator computeNext() @@ -205,7 +207,7 @@ public class MultiPartitionPager implements QueryPager int toQuery = pageSize - counted; result = consistency == null ? pagers[current].fetchPageInternal(toQuery, executionController) - : pagers[current].fetchPage(toQuery, consistency, clientState); + : pagers[current].fetchPage(toQuery, consistency, clientState, queryStartNanoTime); } return result.next(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java index edd2a55..5d23997 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java @@ -54,7 +54,7 @@ public interface QueryPager return ReadExecutionController.empty(); } - public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { return EmptyIterators.partition(); } @@ -94,7 +94,7 @@ public interface QueryPager * {@code consistency} is a serial consistency. * @return the page of result. */ - public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException; + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException; /** * Starts a new read operation. http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index 02b5de2..7fb4e70 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -45,7 +45,8 @@ public class QueryPagers ClientState state, final int pageSize, int nowInSec, - boolean isForThrift) throws RequestValidationException, RequestExecutionException + boolean isForThrift, + long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter); final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION); @@ -53,7 +54,7 @@ public class QueryPagers int count = 0; while (!pager.isExhausted()) { - try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) + try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime)) { DataLimits.Counter counter = limits.newCounter(nowInSec, true); PartitionIterators.consume(counter.applyTo(iter)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java index 37defde..90bfc5d 100644 --- a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java @@ -35,12 +35,14 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T> protected final CountDownLatch latch; protected final int targets; private final ConsistencyLevel consistency; + private final long queryStartNanoTime; - public AbstractPaxosCallback(int targets, ConsistencyLevel consistency) + public AbstractPaxosCallback(int targets, ConsistencyLevel consistency, long queryStartNanoTime) { this.targets = targets; this.consistency = consistency; latch = new CountDownLatch(targets); + this.queryStartNanoTime = queryStartNanoTime; } public boolean isLatencyForSnitch() @@ -57,7 +59,8 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T> { try { - if (!latch.await(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS)) + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - queryStartNanoTime); + if (!latch.await(timeout, TimeUnit.NANOSECONDS)) throw new WriteTimeoutException(WriteType.CAS, consistency, getResponseCount(), targets); } catch (InterruptedException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 544403a..5915eab 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -49,9 +49,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); - public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency) + public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime) { - super(targets, consistency); + super(targets, consistency, queryStartNanoTime); // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected mostRecentCommit = Commit.emptyCommit(key, metadata); mostRecentInProgressCommit = Commit.emptyCommit(key, metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java index b0bd163..c9cb1f0 100644 --- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java @@ -50,9 +50,9 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean> private final int requiredAccepts; private final boolean failFast; - public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency) + public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency, long queryStartNanoTime) { - super(totalTargets, consistency); + super(totalTargets, consistency, queryStartNanoTime); this.requiredAccepts = requiredTargets; this.failFast = failFast; }
