Count entire coordinated request against timeout Patch by Geoffrey Yu; reviewed by Tyler Hobbs for CASSANDRA-12256
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa83c942 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa83c942 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa83c942 Branch: refs/heads/trunk Commit: aa83c942a51323d4a38bc023979ba70801c875b3 Parents: e83f9e6 Author: Geoffrey Yu <[email protected]> Authored: Tue Aug 16 14:35:11 2016 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Tue Aug 16 14:35:11 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 9 + .../cassandra/auth/CassandraAuthorizer.java | 11 +- .../cassandra/auth/CassandraRoleManager.java | 3 +- .../cassandra/auth/PasswordAuthenticator.java | 3 +- .../cassandra/batchlog/BatchlogManager.java | 6 +- .../batchlog/LegacyBatchlogMigrator.java | 5 +- .../org/apache/cassandra/cql3/CQLStatement.java | 3 +- .../CustomPayloadMirroringQueryHandler.java | 15 +- .../org/apache/cassandra/cql3/QueryHandler.java | 9 +- .../apache/cassandra/cql3/QueryProcessor.java | 44 ++--- .../statements/AuthenticationStatement.java | 2 +- .../cql3/statements/AuthorizationStatement.java | 2 +- .../cql3/statements/BatchStatement.java | 33 ++-- .../cql3/statements/DropIndexStatement.java | 2 +- .../cql3/statements/ModificationStatement.java | 54 +++--- .../statements/SchemaAlteringStatement.java | 2 +- .../cql3/statements/SelectStatement.java | 29 +-- .../cql3/statements/TruncateStatement.java | 2 +- .../cassandra/cql3/statements/UseStatement.java | 4 +- .../db/CounterMutationVerbHandler.java | 3 +- .../cassandra/db/PartitionRangeReadCommand.java | 4 +- src/java/org/apache/cassandra/db/ReadQuery.java | 4 +- .../db/SinglePartitionReadCommand.java | 8 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../apache/cassandra/db/view/TableViews.java | 3 +- .../apache/cassandra/db/view/ViewBuilder.java | 2 +- .../locator/AbstractReplicationStrategy.java | 15 +- .../apache/cassandra/repair/RepairRunnable.java | 2 +- .../cassandra/service/AbstractReadExecutor.java | 28 +-- .../service/AbstractWriteResponseHandler.java | 10 +- .../service/BatchlogResponseHandler.java | 4 +- .../apache/cassandra/service/DataResolver.java | 14 +- .../DatacenterSyncWriteResponseHandler.java | 5 +- .../service/DatacenterWriteResponseHandler.java | 5 +- .../apache/cassandra/service/ReadCallback.java | 23 ++- .../apache/cassandra/service/StorageProxy.java | 183 ++++++++++--------- .../cassandra/service/WriteResponseHandler.java | 13 +- .../service/pager/AbstractQueryPager.java | 4 +- .../service/pager/AggregationQueryPager.java | 39 ++-- .../service/pager/MultiPartitionPager.java | 12 +- .../cassandra/service/pager/QueryPager.java | 4 +- .../cassandra/service/pager/QueryPagers.java | 5 +- .../service/paxos/AbstractPaxosCallback.java | 7 +- .../service/paxos/PrepareCallback.java | 4 +- .../service/paxos/ProposeCallback.java | 4 +- .../cassandra/thrift/CassandraServer.java | 98 ++++++---- .../cassandra/tracing/TraceStateImpl.java | 2 +- .../org/apache/cassandra/transport/Message.java | 5 +- .../transport/messages/AuthResponse.java | 2 +- .../transport/messages/BatchMessage.java | 4 +- .../transport/messages/CredentialsMessage.java | 2 +- .../transport/messages/ExecuteMessage.java | 4 +- .../transport/messages/OptionsMessage.java | 2 +- .../transport/messages/PrepareMessage.java | 2 +- .../transport/messages/QueryMessage.java | 4 +- .../transport/messages/RegisterMessage.java | 2 +- .../transport/messages/StartupMessage.java | 2 +- .../cassandra/cql3/PstmtPersistenceTest.java | 2 +- .../cassandra/service/DataResolverTest.java | 26 +-- .../cassandra/transport/MessagePayloadTest.java | 19 +- 61 files changed, 468 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 700dd48..5fbcc6b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Count full coordinated request against timeout (CASSANDRA-12256) * Allow TTL with null value on insert and update (CASSANDRA-12216) * Make decommission operation resumable (CASSANDRA-12008) * Add support to one-way targeted repair (CASSANDRA-9876) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 9cfc58b..d25be0d 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -62,6 +62,15 @@ New features Upgrading --------- + - Request timeouts in cassandra.yaml (read_request_timeout_in_ms, etc) now apply to the + "full" request time on the coordinator. Previously, they only covered the time from + when the coordinator sent a message to a replica until the time that the replica + responded. Additionally, the previous behavior was to reset the timeout when performing + a read repair, making a second read to fix a short read, and when subranges were read + as part of a range scan or secondary index query. In 3.10 and higher, the timeout + is no longer reset for these "subqueries". The entire request must complete within + the specified timeout. As a consequence, your timeouts may need to be adjusted + to account for this. See CASSANDRA-12256 for more details. - Logs written to stdout are now consistent with logs written to files. Time is now local (it was UTC on the console and local in files). Date, thread, file and line info where added to stdout. (see CASSANDRA-12004) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java index a6c11d2..65ee7ec 100644 --- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java +++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java @@ -201,7 +201,8 @@ public class CassandraAuthorizer implements IAuthorizer Attributes.none()); QueryProcessor.instance.processBatch(batch, QueryState.forInternalCalls(), - BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT)); + BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT), + System.nanoTime()); } @@ -218,7 +219,7 @@ public class CassandraAuthorizer implements IAuthorizer SelectStatement statement = Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) == null ? authorizeRoleStatement : legacyAuthorizeRoleStatement; - ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options) ; + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime()); UntypedResultSet result = UntypedResultSet.create(rows.result); if (!result.isEmpty() && result.one().has(PERMISSIONS)) @@ -428,12 +429,14 @@ public class CassandraAuthorizer implements IAuthorizer QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("username"), row.getBytes("resource"), - serializer.serialize(filteredPerms)))); + serializer.serialize(filteredPerms))), + System.nanoTime()); indexStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("resource"), - row.getBytes("username")))); + row.getBytes("username"))), + System.nanoTime()); } logger.info("Completed conversion of legacy permissions"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraRoleManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java index 826e89d..f2a2cfb 100644 --- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java +++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java @@ -496,7 +496,8 @@ public class CassandraRoleManager implements IRoleManager ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(consistencyForRole(name), - Collections.singletonList(ByteBufferUtil.bytes(name)))); + Collections.singletonList(ByteBufferUtil.bytes(name))), + System.nanoTime()); if (rows.result.isEmpty()) return NULL_ROLE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java index 74eb10d..0f79cd2 100644 --- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java +++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java @@ -121,7 +121,8 @@ public class PasswordAuthenticator implements IAuthenticator ResultMessage.Rows rows = authenticationStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(consistencyForRole(username), - Lists.newArrayList(ByteBufferUtil.bytes(username)))); + Lists.newArrayList(ByteBufferUtil.bytes(username))), + System.nanoTime()); // If either a non-existent role name was supplied, or no credentials // were found for that role we don't want to cache the result so we throw http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 0bc9185..ffff235 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -442,7 +442,7 @@ public class BatchlogManager implements BatchlogManagerMBean if (liveEndpoints.isEmpty()) return null; - ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints); + ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime()); MessageOut<Mutation> message = mutation.createMessage(); for (InetAddress endpoint : liveEndpoints) MessagingService.instance().sendRR(message, endpoint, handler, false); @@ -465,9 +465,9 @@ public class BatchlogManager implements BatchlogManagerMBean { private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) + ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints, long queryStartNanoTime) { - super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); + super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); undelivered.addAll(writeEndpoints); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java index 3a8bf83..1a70f9f 100644 --- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java +++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java @@ -137,14 +137,15 @@ public final class LegacyBatchlogMigrator } } - public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) + public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime) { AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Keyspace.open(SystemKeyspace.NAME), null, - WriteType.SIMPLE); + WriteType.SIMPLE, + queryStartNanoTime); Mutation mutation = getRemoveMutation(uuid); for (InetAddress target : endpoints) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CQLStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java index 02292ad..901ecd4 100644 --- a/src/java/org/apache/cassandra/cql3/CQLStatement.java +++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java @@ -50,8 +50,9 @@ public interface CQLStatement * * @param state the current query state * @param options options for this query (consistency, variables, pageSize, ...) + * @param queryStartNanoTime the timestamp returned by System.nanoTime() when this statement was received */ - public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException; + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException; /** * Variant of execute used for internal query against the system tables, and thus only query the local node. http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java index 02a6df9..643c54b 100644 --- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java @@ -38,9 +38,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler public ResultMessage process(String query, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) { - ResultMessage result = queryProcessor.process(query, state, options, customPayload); + ResultMessage result = queryProcessor.process(query, state, options, customPayload, queryStartNanoTime); result.setCustomPayload(customPayload); return result; } @@ -65,9 +66,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) { - ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload); + ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload, queryStartNanoTime); result.setCustomPayload(customPayload); return result; } @@ -75,9 +77,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) { - ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload); + ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload, queryStartNanoTime); result.setCustomPayload(customPayload); return result; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java index 3c11c0e..2108d4c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -33,7 +33,8 @@ public interface QueryHandler ResultMessage process(String query, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException; ResultMessage.Prepared prepare(String query, QueryState state, @@ -46,10 +47,12 @@ public interface QueryHandler ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException; ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, - Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 899b36d..47462e4 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -203,7 +203,7 @@ public class QueryProcessor implements QueryHandler } } - public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options) + public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { logger.trace("Process {} @CL.{}", statement, options.getConsistency()); @@ -211,26 +211,26 @@ public class QueryProcessor implements QueryHandler statement.checkAccess(clientState); statement.validate(clientState); - ResultMessage result = statement.execute(queryState, options); + ResultMessage result = statement.execute(queryState, options, queryStartNanoTime); return result == null ? new ResultMessage.Void() : result; } - public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState) + public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList())); + return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()), queryStartNanoTime); } public ResultMessage process(String query, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) - throws RequestExecutionException, RequestValidationException + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return process(query, state, options); + return process(query, state, options, queryStartNanoTime); } - public ResultMessage process(String queryString, QueryState queryState, QueryOptions options) + public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState()); @@ -242,7 +242,7 @@ public class QueryProcessor implements QueryHandler if (!queryState.getClientState().isInternal) metrics.regularStatementsExecuted.inc(); - return processStatement(prepared, queryState, options); + return processStatement(prepared, queryState, options, queryStartNanoTime); } public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException @@ -257,7 +257,7 @@ public class QueryProcessor implements QueryHandler public static UntypedResultSet process(String query, ConsistencyLevel cl, List<ByteBuffer> values) throws RequestExecutionException { - ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values)); + ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values), System.nanoTime()); if (result instanceof ResultMessage.Rows) return UntypedResultSet.create(((ResultMessage.Rows)result).result); else @@ -319,7 +319,7 @@ public class QueryProcessor implements QueryHandler try { ParsedStatement.Prepared prepared = prepareInternal(query); - ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values, cl)); + ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values, cl), System.nanoTime()); if (result instanceof ResultMessage.Rows) return UntypedResultSet.create(((ResultMessage.Rows)result).result); else @@ -362,12 +362,12 @@ public class QueryProcessor implements QueryHandler * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare * cases. */ - public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values) + public static UntypedResultSet executeInternalWithNow(int nowInSec, long queryStartNanoTime, String query, Object... values) { ParsedStatement.Prepared prepared = prepareInternal(query); assert prepared.statement instanceof SelectStatement; SelectStatement select = (SelectStatement)prepared.statement; - ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec); + ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec, queryStartNanoTime); assert result instanceof ResultMessage.Rows; return UntypedResultSet.create(((ResultMessage.Rows)result).result); } @@ -480,13 +480,14 @@ public class QueryProcessor implements QueryHandler public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return processPrepared(statement, state, options); + return processPrepared(statement, state, options, queryStartNanoTime); } - public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options) + public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> variables = options.getValues(); @@ -506,26 +507,27 @@ public class QueryProcessor implements QueryHandler } metrics.preparedStatementsExecuted.inc(); - return processStatement(statement, queryState, options); + return processStatement(statement, queryState, options, queryStartNanoTime); } public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return processBatch(statement, state, options); + return processBatch(statement, state, options, queryStartNanoTime); } - public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options) + public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { ClientState clientState = queryState.getClientState(); batch.checkAccess(clientState); batch.validate(); batch.validate(clientState); - return batch.execute(queryState, options); + return batch.execute(queryState, options, queryStartNanoTime); } public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java index 151e4f0..0283009 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java @@ -41,7 +41,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements return 0; } - public ResultMessage execute(QueryState state, QueryOptions options) + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { return execute(state.getClientState()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java index 098e22c..83081c8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java @@ -42,7 +42,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements return 0; } - public ResultMessage execute(QueryState state, QueryOptions options) + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { return execute(state.getClientState()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 14638e2..ae64e7a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -217,7 +217,7 @@ public class BatchStatement implements CQLStatement return statements; } - private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now) + private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { Set<String> tablesWithZeroGcGs = null; @@ -233,7 +233,7 @@ public class BatchStatement implements CQLStatement } QueryOptions statementOptions = options.forStatement(i); long timestamp = attrs.getTimestamp(now, statementOptions); - statement.addUpdates(collector, statementOptions, local, timestamp); + statement.addUpdates(collector, statementOptions, local, timestamp, queryStartNanoTime); } if (tablesWithZeroGcGs != null) @@ -335,17 +335,17 @@ public class BatchStatement implements CQLStatement } - public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException + public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options)); + return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options), queryStartNanoTime); } - public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException + public ResultMessage execute(QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - return execute(queryState, options, false, options.getTimestamp(queryState)); + return execute(queryState, options, false, options.getTimestamp(queryState), queryStartNanoTime); } - private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now) + private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { if (options.getConsistency() == null) @@ -354,13 +354,13 @@ public class BatchStatement implements CQLStatement throw new InvalidRequestException("Invalid empty serial consistency level"); if (hasConditions) - return executeWithConditions(options, queryState); + return executeWithConditions(options, queryState, queryStartNanoTime); - executeWithoutConditions(getMutations(options, local, now), options.getConsistency()); + executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); return new ResultMessage.Void(); } - private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException + private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { if (mutations.isEmpty()) return; @@ -369,10 +369,10 @@ public class BatchStatement implements CQLStatement verifyBatchType(mutations); boolean mutateAtomic = (isLogged() && mutations.size() > 1); - StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); + StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime); } - private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state) + private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { Pair<CQL3CasRequest, Set<ColumnDefinition>> p = makeCasRequest(options, state); @@ -388,7 +388,8 @@ public class BatchStatement implements CQLStatement casRequest, options.getSerialConsistency(), options.getConsistency(), - state.getClientState())) + state.getClientState(), + queryStartNanoTime)) { return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0))); } @@ -447,13 +448,13 @@ public class BatchStatement implements CQLStatement if (hasConditions) return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState); - executeInternalWithoutCondition(queryState, options); + executeInternalWithoutCondition(queryState, options, System.nanoTime()); return new ResultMessage.Void(); } - private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException + private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { - for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp())) + for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryStartNanoTime)) mutation.apply(); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java index 85f5f0d..70a86db 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java @@ -64,7 +64,7 @@ public class DropIndexStatement extends SchemaAlteringStatement } @Override - public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException { Event.SchemaChange ce = announceMigration(false); return ce == null ? null : new ResultMessage.SchemaChange(ce); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 822664a..d32a689 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -345,7 +345,8 @@ public abstract class ModificationStatement implements CQLStatement ClusteringIndexFilter filter, DataLimits limits, boolean local, - ConsistencyLevel cl) + ConsistencyLevel cl, + long queryStartNanoTime) { if (!requiresRead()) return null; @@ -381,7 +382,7 @@ public abstract class ModificationStatement implements CQLStatement } } - try (PartitionIterator iter = group.execute(cl, null)) + try (PartitionIterator iter = group.execute(cl, null, queryStartNanoTime)) { return asMaterializedMap(iter); } @@ -405,18 +406,18 @@ public abstract class ModificationStatement implements CQLStatement return !conditions.isEmpty(); } - public ResultMessage execute(QueryState queryState, QueryOptions options) + public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { if (options.getConsistency() == null) throw new InvalidRequestException("Invalid empty consistency level"); return hasConditions() - ? executeWithCondition(queryState, options) - : executeWithoutCondition(queryState, options); + ? executeWithCondition(queryState, options, queryStartNanoTime) + : executeWithoutCondition(queryState, options, queryStartNanoTime); } - private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options) + private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { ConsistencyLevel cl = options.getConsistency(); @@ -425,14 +426,14 @@ public abstract class ModificationStatement implements CQLStatement else cl.validateForWrite(cfm.ksName); - Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState)); + Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryStartNanoTime); if (!mutations.isEmpty()) - StorageProxy.mutateWithTriggers(mutations, cl, false); + StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime); return null; } - public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options) + public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { CQL3CasRequest request = makeCasRequest(queryState, options); @@ -443,7 +444,8 @@ public abstract class ModificationStatement implements CQLStatement request, options.getSerialConsistency(), options.getConsistency(), - queryState.getClientState())) + queryState.getClientState(), + queryStartNanoTime)) { return new ResultMessage.Rows(buildCasResultSet(result, options)); } @@ -561,12 +563,12 @@ public abstract class ModificationStatement implements CQLStatement { return hasConditions() ? executeInternalWithCondition(queryState, options) - : executeInternalWithoutCondition(queryState, options); + : executeInternalWithoutCondition(queryState, options, System.nanoTime()); } - public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException + public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { - for (IMutation mutation : getMutations(options, true, queryState.getTimestamp())) + for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryStartNanoTime)) mutation.apply(); return null; } @@ -612,10 +614,10 @@ public abstract class ModificationStatement implements CQLStatement * * @return list of the mutations */ - private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now) + private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime) { UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1); - addUpdates(collector, options, local, now); + addUpdates(collector, options, local, now, queryStartNanoTime); collector.validateIndexedColumns(); return collector.toMutations(); @@ -624,7 +626,8 @@ public abstract class ModificationStatement implements CQLStatement final void addUpdates(UpdatesCollector collector, QueryOptions options, boolean local, - long now) + long now, + long queryStartNanoTime) { List<ByteBuffer> keys = buildPartitionKeyNames(options); @@ -643,7 +646,8 @@ public abstract class ModificationStatement implements CQLStatement options, DataLimits.NONE, local, - now); + now, + queryStartNanoTime); for (ByteBuffer key : keys) { ThriftValidation.validateKey(cfm, key); @@ -659,7 +663,7 @@ public abstract class ModificationStatement implements CQLStatement { NavigableSet<Clustering> clusterings = createClustering(options); - UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now); + UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime); for (ByteBuffer key : keys) { @@ -703,7 +707,8 @@ public abstract class ModificationStatement implements CQLStatement NavigableSet<Clustering> clusterings, QueryOptions options, boolean local, - long now) + long now, + long queryStartNanoTime) { if (clusterings.contains(Clustering.STATIC_CLUSTERING)) return makeUpdateParameters(keys, @@ -711,14 +716,16 @@ public abstract class ModificationStatement implements CQLStatement options, DataLimits.cqlLimits(1), local, - now); + now, + queryStartNanoTime); return makeUpdateParameters(keys, new ClusteringIndexNamesFilter(clusterings, false), options, DataLimits.NONE, local, - now); + now, + queryStartNanoTime); } private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys, @@ -726,10 +733,11 @@ public abstract class ModificationStatement implements CQLStatement QueryOptions options, DataLimits limits, boolean local, - long now) + long now, + long queryStartNanoTime) { // Some lists operation requires reading - Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency()); + Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime); return new UpdateParameters(cfm, updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index 10c004c..139c566 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@ -86,7 +86,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL */ public abstract Event.SchemaChange announceMigration(boolean isLocalOnly) throws RequestValidationException; - public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException { // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing // extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 7cd2be0..6afaa20 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -228,7 +228,7 @@ public class SelectStatement implements CQLStatement // Nothing to do, all validation has been done by RawStatement.prepare() } - public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException + public ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { ConsistencyLevel cl = options.getConsistency(); checkNotNull(cl, "Invalid empty consistency level"); @@ -242,11 +242,11 @@ public class SelectStatement implements CQLStatement ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize); if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize))) - return execute(query, options, state, nowInSec, userLimit); + return execute(query, options, state, nowInSec, userLimit, queryStartNanoTime); QueryPager pager = getPager(query, options); - return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit); + return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit, queryStartNanoTime); } public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException @@ -270,9 +270,9 @@ public class SelectStatement implements CQLStatement QueryOptions options, QueryState state, int nowInSec, - int userLimit) throws RequestValidationException, RequestExecutionException + int userLimit, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { - try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState())) + try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState(), queryStartNanoTime)) { return processResults(data, options, nowInSec, userLimit); } @@ -308,7 +308,7 @@ public class SelectStatement implements CQLStatement return pager.state(); } - public abstract PartitionIterator fetchPage(int pageSize); + public abstract PartitionIterator fetchPage(int pageSize, long queryStartNanoTime); public static class NormalPager extends Pager { @@ -322,9 +322,9 @@ public class SelectStatement implements CQLStatement this.clientState = clientState; } - public PartitionIterator fetchPage(int pageSize) + public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime) { - return pager.fetchPage(pageSize, consistency, clientState); + return pager.fetchPage(pageSize, consistency, clientState, queryStartNanoTime); } } @@ -338,7 +338,7 @@ public class SelectStatement implements CQLStatement this.executionController = executionController; } - public PartitionIterator fetchPage(int pageSize) + public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime) { return pager.fetchPageInternal(pageSize, executionController); } @@ -349,7 +349,8 @@ public class SelectStatement implements CQLStatement QueryOptions options, int pageSize, int nowInSec, - int userLimit) throws RequestValidationException, RequestExecutionException + int userLimit, + long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { if (aggregationSpec != null) { @@ -370,7 +371,7 @@ public class SelectStatement implements CQLStatement + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); ResultMessage.Rows msg; - try (PartitionIterator page = pager.fetchPage(pageSize)) + try (PartitionIterator page = pager.fetchPage(pageSize, queryStartNanoTime)) { msg = processResults(page, options, nowInSec, userLimit); } @@ -400,10 +401,10 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - return executeInternal(state, options, FBUtilities.nowInSeconds()); + return executeInternal(state, options, FBUtilities.nowInSeconds(), System.nanoTime()); } - public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException + public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { int userLimit = getLimit(options); int userPerPartitionLimit = getPerPartitionLimit(options); @@ -423,7 +424,7 @@ public class SelectStatement implements CQLStatement { QueryPager pager = getPager(query, options); - return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit); + return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit, queryStartNanoTime); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index fa3c0f3..1478efd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@ -59,7 +59,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); } - public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException, TruncateException + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws InvalidRequestException, TruncateException { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/UseStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java index fe3d518..02a678a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java @@ -53,7 +53,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement { } - public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws InvalidRequestException { state.getClientState().setKeyspace(keyspace); return new ResultMessage.SetKeyspace(keyspace); @@ -63,6 +63,6 @@ public class UseStatement extends ParsedStatement implements CQLStatement { // In production, internal queries are exclusively on the system keyspace and 'use' is thus useless // but for some unit tests we need to set the keyspace (e.g. for tests with DROP INDEX) - return execute(state, options); + return execute(state, options, System.nanoTime()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index d1b1fa2..bd273e4 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -33,6 +33,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> public void doVerb(final MessageIn<CounterMutation> message, final int id) { + long queryStartNanoTime = System.nanoTime(); final CounterMutation cm = message.payload; logger.trace("Applying forwarded {}", cm); @@ -50,6 +51,6 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> { MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from); } - }); + }, queryStartNanoTime); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 439dc30..9e7a9d0 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -179,9 +179,9 @@ public class PartitionRangeReadCommand extends ReadCommand return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { - return StorageProxy.getRangeSlice(this, consistency); + return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime); } public QueryPager getPager(PagingState pagingState, int protocolVersion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java index 39b0662..d74834c 100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -40,7 +40,7 @@ public interface ReadQuery return ReadExecutionController.empty(); } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { return EmptyIterators.partition(); } @@ -94,7 +94,7 @@ public interface ReadQuery * * @return the result of the query. */ - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException; + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException; /** * Execute the query for internal queries (that is, it basically executes the query locally). http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index c6cbdb4..cea8c0c 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -343,9 +343,9 @@ public class SinglePartitionReadCommand extends ReadCommand clusteringIndexFilter); } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { - return StorageProxy.read(Group.one(this), consistency, clientState); + return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime); } public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion) @@ -983,9 +983,9 @@ public class SinglePartitionReadCommand extends ReadCommand return new Group(Collections.singletonList(command), command.limits()); } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { - return StorageProxy.read(this, consistency, clientState); + return StorageProxy.read(this, consistency, clientState, queryStartNanoTime); } public int nowInSec() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 2083d54..def21bf 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1131,7 +1131,7 @@ public final class SystemKeyspace public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId); + UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), String.format(req, PAXOS), key.getKey(), metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java index e4cdde3..a57a949 100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -126,6 +126,7 @@ public class TableViews extends AbstractCollection<View> // Read modified rows int nowInSec = FBUtilities.nowInSeconds(); + long queryStartNanoTime = System.nanoTime(); SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec); if (command == null) return; @@ -142,7 +143,7 @@ public class TableViews extends AbstractCollection<View> Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); if (!mutations.isEmpty()) - StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete); + StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete, queryStartNanoTime); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 4bd95d4..8ce3d9f 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -91,7 +91,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (!mutations.isEmpty()) { AtomicLong noBase = new AtomicLong(Long.MAX_VALUE); - StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); + StorageProxy.mutateMV(key.getKey(), mutations, true, noBase, System.nanoTime()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index c90c6a1..038ac9f 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -129,21 +129,22 @@ public abstract class AbstractReplicationStrategy public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, - ConsistencyLevel consistency_level, - Runnable callback, - WriteType writeType) + Collection<InetAddress> pendingEndpoints, + ConsistencyLevel consistency_level, + Runnable callback, + WriteType writeType, + long queryStartNanoTime) { if (consistency_level.isDatacenterLocal()) { // block for in this context will be localnodes block. - return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } - return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } private Keyspace getKeyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index b69d8ce..a34401a 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -410,7 +410,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes)); - ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime()); UntypedResultSet result = UntypedResultSet.create(rows.result); for (UntypedResultSet.Row r : result) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index cae1f1a..7aa926e 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -63,11 +63,11 @@ public abstract class AbstractReadExecutor protected final ReadCallback handler; protected final TraceState traceState; - AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) + AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) { this.command = command; this.targetReplicas = targetReplicas; - this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas); + this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime); this.traceState = Tracing.instance.get(); // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes @@ -148,7 +148,7 @@ public abstract class AbstractReadExecutor /** * @return an executor appropriate for the configured speculative read policy */ - public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException + public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException { Keyspace keyspace = Keyspace.open(command.metadata().ksName); List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); @@ -175,14 +175,14 @@ public abstract class AbstractReadExecutor if (retry.equals(SpeculativeRetryParam.NONE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM || consistencyLevel.blockFor(keyspace) == allReplicas.size()) - return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas); + return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); if (targetReplicas.size() == allReplicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. @@ -203,16 +203,16 @@ public abstract class AbstractReadExecutor targetReplicas.add(extraReplica); if (retry.equals(SpeculativeRetryParam.ALWAYS)) - return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); else // PERCENTILE or CUSTOM. - return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); + return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); } public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { - public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) + public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) { - super(keyspace, command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); } public void executeAsync() @@ -242,9 +242,10 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, - List<InetAddress> targetReplicas) + List<InetAddress> targetReplicas, + long queryStartNanoTime) { - super(keyspace, command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); this.cfs = cfs; } @@ -314,9 +315,10 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, - List<InetAddress> targetReplicas) + List<InetAddress> targetReplicas, + long queryStartNanoTime) { - super(keyspace, command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); this.cfs = cfs; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 19b3de0..f412515 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -41,7 +41,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private final SimpleCondition condition = new SimpleCondition(); protected final Keyspace keyspace; - protected final long start; protected final Collection<InetAddress> naturalEndpoints; public final ConsistencyLevel consistencyLevel; protected final Runnable callback; @@ -50,24 +49,27 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; + private final long queryStartNanoTime; /** * @param callback A callback to be called when the write is successful. + * @param queryStartNanoTime */ protected AbstractWriteResponseHandler(Keyspace keyspace, Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable callback, - WriteType writeType) + WriteType writeType, + long queryStartNanoTime) { this.keyspace = keyspace; this.pendingEndpoints = pendingEndpoints; - this.start = System.nanoTime(); this.consistencyLevel = consistencyLevel; this.naturalEndpoints = naturalEndpoints; this.callback = callback; this.writeType = writeType; + this.queryStartNanoTime = queryStartNanoTime; } public void get() throws WriteTimeoutException, WriteFailureException @@ -76,7 +78,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW ? DatabaseDescriptor.getCounterWriteRpcTimeout() : DatabaseDescriptor.getWriteRpcTimeout(); - long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - start); + long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime); boolean success; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index ac44923..3b31794 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -33,9 +33,9 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish"); - public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup) + public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime) { - super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType); + super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType, queryStartNanoTime); this.wrapped = wrapped; this.requiredBeforeFinish = requiredBeforeFinish; this.cleanup = cleanup; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index b9ae933..be8eca1 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -43,10 +43,12 @@ import org.apache.cassandra.utils.FBUtilities; public class DataResolver extends ResponseResolver { private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + private final long queryStartNanoTime; - public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) { super(keyspace, command, consistency, maxResponseCount); + this.queryStartNanoTime = queryStartNanoTime; } public PartitionIterator getData() @@ -88,7 +90,7 @@ public class DataResolver extends ResponseResolver if (!command.limits().isUnlimited()) { for (int i = 0; i < results.size(); i++) - results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); + results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime))); } return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); @@ -385,12 +387,14 @@ public class DataResolver extends ResponseResolver private final InetAddress source; private final DataLimits.Counter counter; private final DataLimits.Counter postReconciliationCounter; + private final long queryStartNanoTime; - private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter) + private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter, long queryStartNanoTime) { this.source = source; this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount(); this.postReconciliationCounter = postReconciliationCounter; + this.queryStartNanoTime = queryStartNanoTime; } @Override @@ -503,8 +507,8 @@ public class DataResolver extends ResponseResolver private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) { - DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); + DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime); if (StorageProxy.canDoLocalRequest(source)) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index b095c7f..9584611 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -46,10 +46,11 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, - WriteType writeType) + WriteType writeType, + long queryStartNanoTime) { // Response is been managed by the map so make it 1 for the superclass. - super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType); + super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime); assert consistencyLevel == ConsistencyLevel.EACH_QUORUM; NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
