Count entire coordinated request against timeout

Patch by Geoffrey Yu; reviewed by Tyler Hobbs for CASSANDRA-12256


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa83c942
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa83c942
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa83c942

Branch: refs/heads/trunk
Commit: aa83c942a51323d4a38bc023979ba70801c875b3
Parents: e83f9e6
Author: Geoffrey Yu <[email protected]>
Authored: Tue Aug 16 14:35:11 2016 -0500
Committer: Tyler Hobbs <[email protected]>
Committed: Tue Aug 16 14:35:11 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   9 +
 .../cassandra/auth/CassandraAuthorizer.java     |  11 +-
 .../cassandra/auth/CassandraRoleManager.java    |   3 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   3 +-
 .../cassandra/batchlog/BatchlogManager.java     |   6 +-
 .../batchlog/LegacyBatchlogMigrator.java        |   5 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |   3 +-
 .../CustomPayloadMirroringQueryHandler.java     |  15 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |   9 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  44 ++---
 .../statements/AuthenticationStatement.java     |   2 +-
 .../cql3/statements/AuthorizationStatement.java |   2 +-
 .../cql3/statements/BatchStatement.java         |  33 ++--
 .../cql3/statements/DropIndexStatement.java     |   2 +-
 .../cql3/statements/ModificationStatement.java  |  54 +++---
 .../statements/SchemaAlteringStatement.java     |   2 +-
 .../cql3/statements/SelectStatement.java        |  29 +--
 .../cql3/statements/TruncateStatement.java      |   2 +-
 .../cassandra/cql3/statements/UseStatement.java |   4 +-
 .../db/CounterMutationVerbHandler.java          |   3 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   4 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |   4 +-
 .../db/SinglePartitionReadCommand.java          |   8 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../apache/cassandra/db/view/TableViews.java    |   3 +-
 .../apache/cassandra/db/view/ViewBuilder.java   |   2 +-
 .../locator/AbstractReplicationStrategy.java    |  15 +-
 .../apache/cassandra/repair/RepairRunnable.java |   2 +-
 .../cassandra/service/AbstractReadExecutor.java |  28 +--
 .../service/AbstractWriteResponseHandler.java   |  10 +-
 .../service/BatchlogResponseHandler.java        |   4 +-
 .../apache/cassandra/service/DataResolver.java  |  14 +-
 .../DatacenterSyncWriteResponseHandler.java     |   5 +-
 .../service/DatacenterWriteResponseHandler.java |   5 +-
 .../apache/cassandra/service/ReadCallback.java  |  23 ++-
 .../apache/cassandra/service/StorageProxy.java  | 183 ++++++++++---------
 .../cassandra/service/WriteResponseHandler.java |  13 +-
 .../service/pager/AbstractQueryPager.java       |   4 +-
 .../service/pager/AggregationQueryPager.java    |  39 ++--
 .../service/pager/MultiPartitionPager.java      |  12 +-
 .../cassandra/service/pager/QueryPager.java     |   4 +-
 .../cassandra/service/pager/QueryPagers.java    |   5 +-
 .../service/paxos/AbstractPaxosCallback.java    |   7 +-
 .../service/paxos/PrepareCallback.java          |   4 +-
 .../service/paxos/ProposeCallback.java          |   4 +-
 .../cassandra/thrift/CassandraServer.java       |  98 ++++++----
 .../cassandra/tracing/TraceStateImpl.java       |   2 +-
 .../org/apache/cassandra/transport/Message.java |   5 +-
 .../transport/messages/AuthResponse.java        |   2 +-
 .../transport/messages/BatchMessage.java        |   4 +-
 .../transport/messages/CredentialsMessage.java  |   2 +-
 .../transport/messages/ExecuteMessage.java      |   4 +-
 .../transport/messages/OptionsMessage.java      |   2 +-
 .../transport/messages/PrepareMessage.java      |   2 +-
 .../transport/messages/QueryMessage.java        |   4 +-
 .../transport/messages/RegisterMessage.java     |   2 +-
 .../transport/messages/StartupMessage.java      |   2 +-
 .../cassandra/cql3/PstmtPersistenceTest.java    |   2 +-
 .../cassandra/service/DataResolverTest.java     |  26 +--
 .../cassandra/transport/MessagePayloadTest.java |  19 +-
 61 files changed, 468 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 700dd48..5fbcc6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Count full coordinated request against timeout (CASSANDRA-12256)
  * Allow TTL with null value on insert and update (CASSANDRA-12216)
  * Make decommission operation resumable (CASSANDRA-12008)
  * Add support to one-way targeted repair (CASSANDRA-9876)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9cfc58b..d25be0d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -62,6 +62,15 @@ New features
 
 Upgrading
 ---------
+    - Request timeouts in cassandra.yaml (read_request_timeout_in_ms, etc) now 
apply to the
+      "full" request time on the coordinator.  Previously, they only covered 
the time from
+      when the coordinator sent a message to a replica until the time that the 
replica
+      responded.  Additionally, the previous behavior was to reset the timeout 
when performing
+      a read repair, making a second read to fix a short read, and when 
subranges were read
+      as part of a range scan or secondary index query.  In 3.10 and higher, 
the timeout
+      is no longer reset for these "subqueries".  The entire request must 
complete within
+      the specified timeout.  As a consequence, your timeouts may need to be 
adjusted
+      to account for this.  See CASSANDRA-12256 for more details.
     - Logs written to stdout are now consistent with logs written to files.
       Time is now local (it was UTC on the console and local in files). Date, 
thread, file
       and line info where added to stdout. (see CASSANDRA-12004)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java 
b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index a6c11d2..65ee7ec 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -201,7 +201,8 @@ public class CassandraAuthorizer implements IAuthorizer
                                                   Attributes.none());
         QueryProcessor.instance.processBatch(batch,
                                              QueryState.forInternalCalls(),
-                                             
BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT));
+                                             
BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT),
+                                             System.nanoTime());
 
     }
 
@@ -218,7 +219,7 @@ public class CassandraAuthorizer implements IAuthorizer
         SelectStatement statement = 
Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) == null
                                     ? authorizeRoleStatement
                                     : legacyAuthorizeRoleStatement;
-        ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options) ;
+        ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
         UntypedResultSet result = UntypedResultSet.create(rows.result);
 
         if (!result.isEmpty() && result.one().has(PERMISSIONS))
@@ -428,12 +429,14 @@ public class CassandraAuthorizer implements IAuthorizer
                                             
QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
                                                                           
Lists.newArrayList(row.getBytes("username"),
                                                                                
              row.getBytes("resource"),
-                                                                               
              serializer.serialize(filteredPerms))));
+                                                                               
              serializer.serialize(filteredPerms))),
+                                            System.nanoTime());
 
                     indexStatement.execute(QueryState.forInternalCalls(),
                                            
QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
                                                                          
Lists.newArrayList(row.getBytes("resource"),
-                                                                               
             row.getBytes("username"))));
+                                                                               
             row.getBytes("username"))),
+                                           System.nanoTime());
 
                 }
                 logger.info("Completed conversion of legacy permissions");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java 
b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 826e89d..f2a2cfb 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -496,7 +496,8 @@ public class CassandraRoleManager implements IRoleManager
         ResultMessage.Rows rows =
             statement.execute(QueryState.forInternalCalls(),
                               
QueryOptions.forInternalCalls(consistencyForRole(name),
-                                                            
Collections.singletonList(ByteBufferUtil.bytes(name))));
+                                                            
Collections.singletonList(ByteBufferUtil.bytes(name))),
+                              System.nanoTime());
         if (rows.result.isEmpty())
             return NULL_ROLE;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java 
b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 74eb10d..0f79cd2 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -121,7 +121,8 @@ public class PasswordAuthenticator implements IAuthenticator
             ResultMessage.Rows rows =
                 authenticationStatement.execute(QueryState.forInternalCalls(),
                                                 
QueryOptions.forInternalCalls(consistencyForRole(username),
-                                                                              
Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                              
Lists.newArrayList(ByteBufferUtil.bytes(username))),
+                                                System.nanoTime());
 
             // If either a non-existent role name was supplied, or no 
credentials
             // were found for that role we don't want to cache the result so 
we throw

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 0bc9185..ffff235 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -442,7 +442,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             if (liveEndpoints.isEmpty())
                 return null;
 
-            ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(liveEndpoints);
+            ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
             for (InetAddress endpoint : liveEndpoints)
                 MessagingService.instance().sendRR(message, endpoint, handler, 
false);
@@ -465,9 +465,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             private final Set<InetAddress> undelivered = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints, 
long queryStartNanoTime)
             {
-                super(writeEndpoints, Collections.<InetAddress>emptySet(), 
null, null, null, WriteType.UNLOGGED_BATCH);
+                super(writeEndpoints, Collections.<InetAddress>emptySet(), 
null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
                 undelivered.addAll(writeEndpoints);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java 
b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
index 3a8bf83..1a70f9f 100644
--- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -137,14 +137,15 @@ public final class LegacyBatchlogMigrator
         }
     }
 
-    public static void asyncRemoveFromBatchlog(Collection<InetAddress> 
endpoints, UUID uuid)
+    public static void asyncRemoveFromBatchlog(Collection<InetAddress> 
endpoints, UUID uuid, long queryStartNanoTime)
     {
         AbstractWriteResponseHandler<IMutation> handler = new 
WriteResponseHandler<>(endpoints,
                                                                                
      Collections.<InetAddress>emptyList(),
                                                                                
      ConsistencyLevel.ANY,
                                                                                
      Keyspace.open(SystemKeyspace.NAME),
                                                                                
      null,
-                                                                               
      WriteType.SIMPLE);
+                                                                               
      WriteType.SIMPLE,
+                                                                               
      queryStartNanoTime);
         Mutation mutation = getRemoveMutation(uuid);
 
         for (InetAddress target : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java 
b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 02292ad..901ecd4 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -50,8 +50,9 @@ public interface CQLStatement
      *
      * @param state the current query state
      * @param options options for this query (consistency, variables, 
pageSize, ...)
+     * @param queryStartNanoTime the timestamp returned by System.nanoTime() 
when this statement was received
      */
-    public ResultMessage execute(QueryState state, QueryOptions options) 
throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws RequestValidationException, 
RequestExecutionException;
 
     /**
      * Variant of execute used for internal query against the system tables, 
and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java 
b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
index 02a6df9..643c54b 100644
--- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
@@ -38,9 +38,10 @@ public class CustomPayloadMirroringQueryHandler implements 
QueryHandler
     public ResultMessage process(String query,
                                  QueryState state,
                                  QueryOptions options,
-                                 Map<String, ByteBuffer> customPayload)
+                                 Map<String, ByteBuffer> customPayload,
+                                 long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.process(query, state, options, 
customPayload);
+        ResultMessage result = queryProcessor.process(query, state, options, 
customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }
@@ -65,9 +66,10 @@ public class CustomPayloadMirroringQueryHandler implements 
QueryHandler
     public ResultMessage processPrepared(CQLStatement statement,
                                          QueryState state,
                                          QueryOptions options,
-                                         Map<String, ByteBuffer> customPayload)
+                                         Map<String, ByteBuffer> customPayload,
+                                         long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.processPrepared(statement, 
state, options, customPayload);
+        ResultMessage result = queryProcessor.processPrepared(statement, 
state, options, customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }
@@ -75,9 +77,10 @@ public class CustomPayloadMirroringQueryHandler implements 
QueryHandler
     public ResultMessage processBatch(BatchStatement statement,
                                       QueryState state,
                                       BatchQueryOptions options,
-                                      Map<String, ByteBuffer> customPayload)
+                                      Map<String, ByteBuffer> customPayload,
+                                      long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.processBatch(statement, state, 
options, customPayload);
+        ResultMessage result = queryProcessor.processBatch(statement, state, 
options, customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java 
b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 3c11c0e..2108d4c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -33,7 +33,8 @@ public interface QueryHandler
     ResultMessage process(String query,
                           QueryState state,
                           QueryOptions options,
-                          Map<String, ByteBuffer> customPayload) throws 
RequestExecutionException, RequestValidationException;
+                          Map<String, ByteBuffer> customPayload,
+                          long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException;
 
     ResultMessage.Prepared prepare(String query,
                                    QueryState state,
@@ -46,10 +47,12 @@ public interface QueryHandler
     ResultMessage processPrepared(CQLStatement statement,
                                   QueryState state,
                                   QueryOptions options,
-                                  Map<String, ByteBuffer> customPayload) 
throws RequestExecutionException, RequestValidationException;
+                                  Map<String, ByteBuffer> customPayload,
+                                  long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException;
 
     ResultMessage processBatch(BatchStatement statement,
                                QueryState state,
                                BatchQueryOptions options,
-                               Map<String, ByteBuffer> customPayload) throws 
RequestExecutionException, RequestValidationException;
+                               Map<String, ByteBuffer> customPayload,
+                               long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 899b36d..47462e4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -203,7 +203,7 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
-    public ResultMessage processStatement(CQLStatement statement, QueryState 
queryState, QueryOptions options)
+    public ResultMessage processStatement(CQLStatement statement, QueryState 
queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("Process {} @CL.{}", statement, options.getConsistency());
@@ -211,26 +211,26 @@ public class QueryProcessor implements QueryHandler
         statement.checkAccess(clientState);
         statement.validate(clientState);
 
-        ResultMessage result = statement.execute(queryState, options);
+        ResultMessage result = statement.execute(queryState, options, 
queryStartNanoTime);
         return result == null ? new ResultMessage.Void() : result;
     }
 
-    public static ResultMessage process(String queryString, ConsistencyLevel 
cl, QueryState queryState)
+    public static ResultMessage process(String queryString, ConsistencyLevel 
cl, QueryState queryState, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
-        return instance.process(queryString, queryState, 
QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
+        return instance.process(queryString, queryState, 
QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()), 
queryStartNanoTime);
     }
 
     public ResultMessage process(String query,
                                  QueryState state,
                                  QueryOptions options,
-                                 Map<String, ByteBuffer> customPayload)
-                                         throws RequestExecutionException, 
RequestValidationException
+                                 Map<String, ByteBuffer> customPayload,
+                                 long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException
     {
-        return process(query, state, options);
+        return process(query, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage process(String queryString, QueryState queryState, 
QueryOptions options)
+    public ResultMessage process(String queryString, QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ParsedStatement.Prepared p = getStatement(queryString, 
queryState.getClientState());
@@ -242,7 +242,7 @@ public class QueryProcessor implements QueryHandler
         if (!queryState.getClientState().isInternal)
             metrics.regularStatementsExecuted.inc();
 
-        return processStatement(prepared, queryState, options);
+        return processStatement(prepared, queryState, options, 
queryStartNanoTime);
     }
 
     public static ParsedStatement.Prepared parseStatement(String queryStr, 
QueryState queryState) throws RequestValidationException
@@ -257,7 +257,7 @@ public class QueryProcessor implements QueryHandler
 
     public static UntypedResultSet process(String query, ConsistencyLevel cl, 
List<ByteBuffer> values) throws RequestExecutionException
     {
-        ResultMessage result = instance.process(query, 
QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values));
+        ResultMessage result = instance.process(query, 
QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values), 
System.nanoTime());
         if (result instanceof ResultMessage.Rows)
             return 
UntypedResultSet.create(((ResultMessage.Rows)result).result);
         else
@@ -319,7 +319,7 @@ public class QueryProcessor implements QueryHandler
         try
         {
             ParsedStatement.Prepared prepared = prepareInternal(query);
-            ResultMessage result = prepared.statement.execute(state, 
makeInternalOptions(prepared, values, cl));
+            ResultMessage result = prepared.statement.execute(state, 
makeInternalOptions(prepared, values, cl), System.nanoTime());
             if (result instanceof ResultMessage.Rows)
                 return 
UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else
@@ -362,12 +362,12 @@ public class QueryProcessor implements QueryHandler
      * Note that this only make sense for Selects so this only accept SELECT 
statements and is only useful in rare
      * cases.
      */
-    public static UntypedResultSet executeInternalWithNow(int nowInSec, String 
query, Object... values)
+    public static UntypedResultSet executeInternalWithNow(int nowInSec, long 
queryStartNanoTime, String query, Object... values)
     {
         ParsedStatement.Prepared prepared = prepareInternal(query);
         assert prepared.statement instanceof SelectStatement;
         SelectStatement select = (SelectStatement)prepared.statement;
-        ResultMessage result = select.executeInternal(internalQueryState(), 
makeInternalOptions(prepared, values), nowInSec);
+        ResultMessage result = select.executeInternal(internalQueryState(), 
makeInternalOptions(prepared, values), nowInSec, queryStartNanoTime);
         assert result instanceof ResultMessage.Rows;
         return UntypedResultSet.create(((ResultMessage.Rows)result).result);
     }
@@ -480,13 +480,14 @@ public class QueryProcessor implements QueryHandler
     public ResultMessage processPrepared(CQLStatement statement,
                                          QueryState state,
                                          QueryOptions options,
-                                         Map<String, ByteBuffer> customPayload)
+                                         Map<String, ByteBuffer> customPayload,
+                                         long queryStartNanoTime)
                                                  throws 
RequestExecutionException, RequestValidationException
     {
-        return processPrepared(statement, state, options);
+        return processPrepared(statement, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage processPrepared(CQLStatement statement, QueryState 
queryState, QueryOptions options)
+    public ResultMessage processPrepared(CQLStatement statement, QueryState 
queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> variables = options.getValues();
@@ -506,26 +507,27 @@ public class QueryProcessor implements QueryHandler
         }
 
         metrics.preparedStatementsExecuted.inc();
-        return processStatement(statement, queryState, options);
+        return processStatement(statement, queryState, options, 
queryStartNanoTime);
     }
 
     public ResultMessage processBatch(BatchStatement statement,
                                       QueryState state,
                                       BatchQueryOptions options,
-                                      Map<String, ByteBuffer> customPayload)
+                                      Map<String, ByteBuffer> customPayload,
+                                      long queryStartNanoTime)
                                               throws 
RequestExecutionException, RequestValidationException
     {
-        return processBatch(statement, state, options);
+        return processBatch(statement, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage processBatch(BatchStatement batch, QueryState 
queryState, BatchQueryOptions options)
+    public ResultMessage processBatch(BatchStatement batch, QueryState 
queryState, BatchQueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ClientState clientState = queryState.getClientState();
         batch.checkAccess(clientState);
         batch.validate();
         batch.validate(clientState);
-        return batch.execute(queryState, options);
+        return batch.execute(queryState, options, queryStartNanoTime);
     }
 
     public static ParsedStatement.Prepared getStatement(String queryStr, 
ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 151e4f0..0283009 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -41,7 +41,7 @@ public abstract class AuthenticationStatement extends 
ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options)
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index 098e22c..83081c8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -42,7 +42,7 @@ public abstract class AuthorizationStatement extends 
ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options)
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime)
     throws RequestValidationException, RequestExecutionException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 14638e2..ae64e7a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -217,7 +217,7 @@ public class BatchStatement implements CQLStatement
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions 
options, boolean local, long now)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions 
options, boolean local, long now, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         Set<String> tablesWithZeroGcGs = null;
@@ -233,7 +233,7 @@ public class BatchStatement implements CQLStatement
             }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
-            statement.addUpdates(collector, statementOptions, local, 
timestamp);
+            statement.addUpdates(collector, statementOptions, local, 
timestamp, queryStartNanoTime);
         }
 
         if (tablesWithZeroGcGs != null)
@@ -335,17 +335,17 @@ public class BatchStatement implements CQLStatement
     }
 
 
-    public ResultMessage execute(QueryState queryState, QueryOptions options) 
throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, QueryOptions options, 
long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
     {
-        return execute(queryState, 
BatchQueryOptions.withoutPerStatementVariables(options));
+        return execute(queryState, 
BatchQueryOptions.withoutPerStatementVariables(options), queryStartNanoTime);
     }
 
-    public ResultMessage execute(QueryState queryState, BatchQueryOptions 
options) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
     {
-        return execute(queryState, options, false, 
options.getTimestamp(queryState));
+        return execute(queryState, options, false, 
options.getTimestamp(queryState), queryStartNanoTime);
     }
 
-    private ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, boolean local, long now)
+    private ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, boolean local, long now, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -354,13 +354,13 @@ public class BatchStatement implements CQLStatement
             throw new InvalidRequestException("Invalid empty serial 
consistency level");
 
         if (hasConditions)
-            return executeWithConditions(options, queryState);
+            return executeWithConditions(options, queryState, 
queryStartNanoTime);
 
-        executeWithoutConditions(getMutations(options, local, now), 
options.getConsistency());
+        executeWithoutConditions(getMutations(options, local, now, 
queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
         return new ResultMessage.Void();
     }
 
-    private void executeWithoutConditions(Collection<? extends IMutation> 
mutations, ConsistencyLevel cl) throws RequestExecutionException, 
RequestValidationException
+    private void executeWithoutConditions(Collection<? extends IMutation> 
mutations, ConsistencyLevel cl, long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException
     {
         if (mutations.isEmpty())
             return;
@@ -369,10 +369,10 @@ public class BatchStatement implements CQLStatement
         verifyBatchType(mutations);
 
         boolean mutateAtomic = (isLogged() && mutations.size() > 1);
-        StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
+        StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, 
queryStartNanoTime);
     }
 
-    private ResultMessage executeWithConditions(BatchQueryOptions options, 
QueryState state)
+    private ResultMessage executeWithConditions(BatchQueryOptions options, 
QueryState state, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         Pair<CQL3CasRequest, Set<ColumnDefinition>> p = 
makeCasRequest(options, state);
@@ -388,7 +388,8 @@ public class BatchStatement implements CQLStatement
                                                    casRequest,
                                                    
options.getSerialConsistency(),
                                                    options.getConsistency(),
-                                                   state.getClientState()))
+                                                   state.getClientState(),
+                                                   queryStartNanoTime))
         {
             return new 
ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, 
result, columnsWithConditions, true, options.forStatement(0)));
         }
@@ -447,13 +448,13 @@ public class BatchStatement implements CQLStatement
         if (hasConditions)
             return 
executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options),
 queryState);
 
-        executeInternalWithoutCondition(queryState, options);
+        executeInternalWithoutCondition(queryState, options, 
System.nanoTime());
         return new ResultMessage.Void();
     }
 
-    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options) throws RequestValidationException, 
RequestExecutionException
+    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : 
getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, 
queryState.getTimestamp()))
+        for (IMutation mutation : 
getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, 
queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 85f5f0d..70a86db 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -64,7 +64,7 @@ public class DropIndexStatement extends 
SchemaAlteringStatement
     }
 
     @Override
-    public ResultMessage execute(QueryState state, QueryOptions options) 
throws RequestValidationException
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws RequestValidationException
     {
         Event.SchemaChange ce = announceMigration(false);
         return ce == null ? null : new ResultMessage.SchemaChange(ce);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 822664a..d32a689 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -345,7 +345,8 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                            
ClusteringIndexFilter filter,
                                                            DataLimits limits,
                                                            boolean local,
-                                                           ConsistencyLevel cl)
+                                                           ConsistencyLevel cl,
+                                                           long 
queryStartNanoTime)
     {
         if (!requiresRead())
             return null;
@@ -381,7 +382,7 @@ public abstract class ModificationStatement implements 
CQLStatement
             }
         }
 
-        try (PartitionIterator iter = group.execute(cl, null))
+        try (PartitionIterator iter = group.execute(cl, null, 
queryStartNanoTime))
         {
             return asMaterializedMap(iter);
         }
@@ -405,18 +406,18 @@ public abstract class ModificationStatement implements 
CQLStatement
         return !conditions.isEmpty();
     }
 
-    public ResultMessage execute(QueryState queryState, QueryOptions options)
+    public ResultMessage execute(QueryState queryState, QueryOptions options, 
long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency 
level");
 
         return hasConditions()
-             ? executeWithCondition(queryState, options)
-             : executeWithoutCondition(queryState, options);
+             ? executeWithCondition(queryState, options, queryStartNanoTime)
+             : executeWithoutCondition(queryState, options, 
queryStartNanoTime);
     }
 
-    private ResultMessage executeWithoutCondition(QueryState queryState, 
QueryOptions options)
+    private ResultMessage executeWithoutCondition(QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
@@ -425,14 +426,14 @@ public abstract class ModificationStatement implements 
CQLStatement
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState));
+        Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState), queryStartNanoTime);
         if (!mutations.isEmpty())
-            StorageProxy.mutateWithTriggers(mutations, cl, false);
+            StorageProxy.mutateWithTriggers(mutations, cl, false, 
queryStartNanoTime);
 
         return null;
     }
 
-    public ResultMessage executeWithCondition(QueryState queryState, 
QueryOptions options)
+    public ResultMessage executeWithCondition(QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         CQL3CasRequest request = makeCasRequest(queryState, options);
@@ -443,7 +444,8 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                    request,
                                                    
options.getSerialConsistency(),
                                                    options.getConsistency(),
-                                                   
queryState.getClientState()))
+                                                   queryState.getClientState(),
+                                                   queryStartNanoTime))
         {
             return new ResultMessage.Rows(buildCasResultSet(result, options));
         }
@@ -561,12 +563,12 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         return hasConditions()
                ? executeInternalWithCondition(queryState, options)
-               : executeInternalWithoutCondition(queryState, options);
+               : executeInternalWithoutCondition(queryState, options, 
System.nanoTime());
     }
 
-    public ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options) throws RequestValidationException, 
RequestExecutionException
+    public ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(options, true, 
queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(options, true, 
queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }
@@ -612,10 +614,10 @@ public abstract class ModificationStatement implements 
CQLStatement
      *
      * @return list of the mutations
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long now, long queryStartNanoTime)
     {
         UpdatesCollector collector = new 
UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1);
-        addUpdates(collector, options, local, now);
+        addUpdates(collector, options, local, now, queryStartNanoTime);
         collector.validateIndexedColumns();
 
         return collector.toMutations();
@@ -624,7 +626,8 @@ public abstract class ModificationStatement implements 
CQLStatement
     final void addUpdates(UpdatesCollector collector,
                           QueryOptions options,
                           boolean local,
-                          long now)
+                          long now,
+                          long queryStartNanoTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
 
@@ -643,7 +646,8 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                            options,
                                                            DataLimits.NONE,
                                                            local,
-                                                           now);
+                                                           now,
+                                                           queryStartNanoTime);
             for (ByteBuffer key : keys)
             {
                 ThriftValidation.validateKey(cfm, key);
@@ -659,7 +663,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         {
             NavigableSet<Clustering> clusterings = createClustering(options);
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
options, local, now);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
options, local, now, queryStartNanoTime);
 
             for (ByteBuffer key : keys)
             {
@@ -703,7 +707,8 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                   NavigableSet<Clustering> 
clusterings,
                                                   QueryOptions options,
                                                   boolean local,
-                                                  long now)
+                                                  long now,
+                                                  long queryStartNanoTime)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
             return makeUpdateParameters(keys,
@@ -711,14 +716,16 @@ public abstract class ModificationStatement implements 
CQLStatement
                                         options,
                                         DataLimits.cqlLimits(1),
                                         local,
-                                        now);
+                                        now,
+                                        queryStartNanoTime);
 
         return makeUpdateParameters(keys,
                                     new 
ClusteringIndexNamesFilter(clusterings, false),
                                     options,
                                     DataLimits.NONE,
                                     local,
-                                    now);
+                                    now,
+                                    queryStartNanoTime);
     }
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
@@ -726,10 +733,11 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                   QueryOptions options,
                                                   DataLimits limits,
                                                   boolean local,
-                                                  long now)
+                                                  long now,
+                                                  long queryStartNanoTime)
     {
         // Some lists operation requires reading
-        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, 
limits, local, options.getConsistency());
+        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, 
limits, local, options.getConsistency(), queryStartNanoTime);
         return new UpdateParameters(cfm, updatedColumns(), options, 
getTimestamp(now, options), getTimeToLive(options), lists);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 10c004c..139c566 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -86,7 +86,7 @@ public abstract class SchemaAlteringStatement extends 
CFStatement implements CQL
      */
     public abstract Event.SchemaChange announceMigration(boolean isLocalOnly) 
throws RequestValidationException;
 
-    public ResultMessage execute(QueryState state, QueryOptions options) 
throws RequestValidationException
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws RequestValidationException
     {
         // If an IF [NOT] EXISTS clause was used, this may not result in an 
actual schema change.  To avoid doing
         // extra work in the drivers to handle schema changes, we return an 
empty message in this case. (CASSANDRA-7600)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7cd2be0..6afaa20 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -228,7 +228,7 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by 
RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(QueryState state, QueryOptions options) 
throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(QueryState state, QueryOptions options, 
long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
         checkNotNull(cl, "Invalid empty consistency level");
@@ -242,11 +242,11 @@ public class SelectStatement implements CQLStatement
         ReadQuery query = getQuery(options, nowInSec, userLimit, 
userPerPartitionLimit, pageSize);
 
         if (aggregationSpec == null && (pageSize <= 0 || 
(query.limits().count() <= pageSize)))
-            return execute(query, options, state, nowInSec, userLimit);
+            return execute(query, options, state, nowInSec, userLimit, 
queryStartNanoTime);
 
         QueryPager pager = getPager(query, options);
 
-        return execute(Pager.forDistributedQuery(pager, cl, 
state.getClientState()), options, pageSize, nowInSec, userLimit);
+        return execute(Pager.forDistributedQuery(pager, cl, 
state.getClientState()), options, pageSize, nowInSec, userLimit, 
queryStartNanoTime);
     }
 
     public ReadQuery getQuery(QueryOptions options, int nowInSec) throws 
RequestValidationException
@@ -270,9 +270,9 @@ public class SelectStatement implements CQLStatement
                                        QueryOptions options,
                                        QueryState state,
                                        int nowInSec,
-                                       int userLimit) throws 
RequestValidationException, RequestExecutionException
+                                       int userLimit, long queryStartNanoTime) 
throws RequestValidationException, RequestExecutionException
     {
-        try (PartitionIterator data = query.execute(options.getConsistency(), 
state.getClientState()))
+        try (PartitionIterator data = query.execute(options.getConsistency(), 
state.getClientState(), queryStartNanoTime))
         {
             return processResults(data, options, nowInSec, userLimit);
         }
@@ -308,7 +308,7 @@ public class SelectStatement implements CQLStatement
             return pager.state();
         }
 
-        public abstract PartitionIterator fetchPage(int pageSize);
+        public abstract PartitionIterator fetchPage(int pageSize, long 
queryStartNanoTime);
 
         public static class NormalPager extends Pager
         {
@@ -322,9 +322,9 @@ public class SelectStatement implements CQLStatement
                 this.clientState = clientState;
             }
 
-            public PartitionIterator fetchPage(int pageSize)
+            public PartitionIterator fetchPage(int pageSize, long 
queryStartNanoTime)
             {
-                return pager.fetchPage(pageSize, consistency, clientState);
+                return pager.fetchPage(pageSize, consistency, clientState, 
queryStartNanoTime);
             }
         }
 
@@ -338,7 +338,7 @@ public class SelectStatement implements CQLStatement
                 this.executionController = executionController;
             }
 
-            public PartitionIterator fetchPage(int pageSize)
+            public PartitionIterator fetchPage(int pageSize, long 
queryStartNanoTime)
             {
                 return pager.fetchPageInternal(pageSize, executionController);
             }
@@ -349,7 +349,8 @@ public class SelectStatement implements CQLStatement
                                        QueryOptions options,
                                        int pageSize,
                                        int nowInSec,
-                                       int userLimit) throws 
RequestValidationException, RequestExecutionException
+                                       int userLimit,
+                                       long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
     {
         if (aggregationSpec != null)
         {
@@ -370,7 +371,7 @@ public class SelectStatement implements CQLStatement
                   + " you must either remove the ORDER BY or the IN and sort 
client side, or disable paging for this query");
 
         ResultMessage.Rows msg;
-        try (PartitionIterator page = pager.fetchPage(pageSize))
+        try (PartitionIterator page = pager.fetchPage(pageSize, 
queryStartNanoTime))
         {
             msg = processResults(page, options, nowInSec, userLimit);
         }
@@ -400,10 +401,10 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options) throws RequestExecutionException, RequestValidationException
     {
-        return executeInternal(state, options, FBUtilities.nowInSeconds());
+        return executeInternal(state, options, FBUtilities.nowInSeconds(), 
System.nanoTime());
     }
 
-    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options, int nowInSec) throws RequestExecutionException, 
RequestValidationException
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options, int nowInSec, long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException
     {
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
@@ -423,7 +424,7 @@ public class SelectStatement implements CQLStatement
             {
                 QueryPager pager = getPager(query, options);
 
-                return execute(Pager.forInternalQuery(pager, 
executionController), options, pageSize, nowInSec, userLimit);
+                return execute(Pager.forInternalQuery(pager, 
executionController), options, pageSize, nowInSec, userLimit, 
queryStartNanoTime);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index fa3c0f3..1478efd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -59,7 +59,7 @@ public class TruncateStatement extends CFStatement implements 
CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options) 
throws InvalidRequestException, TruncateException
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index fe3d518..02a678a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -53,7 +53,7 @@ public class UseStatement extends ParsedStatement implements 
CQLStatement
     {
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options) 
throws InvalidRequestException
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);
@@ -63,6 +63,6 @@ public class UseStatement extends ParsedStatement implements 
CQLStatement
     {
         // In production, internal queries are exclusively on the system 
keyspace and 'use' is thus useless
         // but for some unit tests we need to set the keyspace (e.g. for tests 
with DROP INDEX)
-        return execute(state, options);
+        return execute(state, options, System.nanoTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d1b1fa2..bd273e4 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -33,6 +33,7 @@ public class CounterMutationVerbHandler implements 
IVerbHandler<CounterMutation>
 
     public void doVerb(final MessageIn<CounterMutation> message, final int id)
     {
+        long queryStartNanoTime = System.nanoTime();
         final CounterMutation cm = message.payload;
         logger.trace("Applying forwarded {}", cm);
 
@@ -50,6 +51,6 @@ public class CounterMutationVerbHandler implements 
IVerbHandler<CounterMutation>
             {
                 
MessagingService.instance().sendReply(WriteResponse.createMessage(), id, 
message.from);
             }
-        });
+        }, queryStartNanoTime);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 439dc30..9e7a9d0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -179,9 +179,9 @@ public class PartitionRangeReadCommand extends ReadCommand
         return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
     }
 
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState) throws RequestExecutionException
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState, long queryStartNanoTime) throws RequestExecutionException
     {
-        return StorageProxy.getRangeSlice(this, consistency);
+        return StorageProxy.getRangeSlice(this, consistency, 
queryStartNanoTime);
     }
 
     public QueryPager getPager(PagingState pagingState, int protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java 
b/src/java/org/apache/cassandra/db/ReadQuery.java
index 39b0662..d74834c 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -40,7 +40,7 @@ public interface ReadQuery
             return ReadExecutionController.empty();
         }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState) throws RequestExecutionException
+        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState, long queryStartNanoTime) throws 
RequestExecutionException
         {
             return EmptyIterators.partition();
         }
@@ -94,7 +94,7 @@ public interface ReadQuery
      *
      * @return the result of the query.
      */
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState) throws RequestExecutionException;
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState, long queryStartNanoTime) throws RequestExecutionException;
 
     /**
      * Execute the query for internal queries (that is, it basically executes 
the query locally).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c6cbdb4..cea8c0c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -343,9 +343,9 @@ public class SinglePartitionReadCommand extends ReadCommand
                                               clusteringIndexFilter);
     }
 
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState) throws RequestExecutionException
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState, long queryStartNanoTime) throws RequestExecutionException
     {
-        return StorageProxy.read(Group.one(this), consistency, clientState);
+        return StorageProxy.read(Group.one(this), consistency, clientState, 
queryStartNanoTime);
     }
 
     public SinglePartitionPager getPager(PagingState pagingState, int 
protocolVersion)
@@ -983,9 +983,9 @@ public class SinglePartitionReadCommand extends ReadCommand
             return new Group(Collections.singletonList(command), 
command.limits());
         }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState) throws RequestExecutionException
+        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState, long queryStartNanoTime) throws 
RequestExecutionException
         {
-            return StorageProxy.read(this, consistency, clientState);
+            return StorageProxy.read(this, consistency, clientState, 
queryStartNanoTime);
         }
 
         public int nowInSec()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2083d54..def21bf 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1131,7 +1131,7 @@ public final class SystemKeyspace
     public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData 
metadata, int nowInSec)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = 
QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), 
key.getKey(), metadata.cfId);
+        UntypedResultSet results = 
QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), 
String.format(req, PAXOS), key.getKey(), metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index e4cdde3..a57a949 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -126,6 +126,7 @@ public class TableViews extends AbstractCollection<View>
 
         // Read modified rows
         int nowInSec = FBUtilities.nowInSeconds();
+        long queryStartNanoTime = System.nanoTime();
         SinglePartitionReadCommand command = readExistingRowsCommand(update, 
views, nowInSec);
         if (command == null)
             return;
@@ -142,7 +143,7 @@ public class TableViews extends AbstractCollection<View>
         
Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime()
 - start, TimeUnit.NANOSECONDS);
 
         if (!mutations.isEmpty())
-            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, 
writeCommitLog, baseComplete);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, 
writeCommitLog, baseComplete, queryStartNanoTime);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 4bd95d4..8ce3d9f 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -91,7 +91,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         if (!mutations.isEmpty())
         {
             AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
-            StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
+            StorageProxy.mutateMV(key.getKey(), mutations, true, noBase, 
System.nanoTime());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c90c6a1..038ac9f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -129,21 +129,22 @@ public abstract class AbstractReplicationStrategy
     public abstract List<InetAddress> calculateNaturalEndpoints(Token 
searchToken, TokenMetadata tokenMetadata);
 
     public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
-                                                                
Collection<InetAddress> pendingEndpoints,
-                                                                
ConsistencyLevel consistency_level,
-                                                                Runnable 
callback,
-                                                                WriteType 
writeType)
+                                                                       
Collection<InetAddress> pendingEndpoints,
+                                                                       
ConsistencyLevel consistency_level,
+                                                                       
Runnable callback,
+                                                                       
WriteType writeType,
+                                                                       long 
queryStartNanoTime)
     {
         if (consistency_level.isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, 
queryStartNanoTime);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this 
instanceof NetworkTopologyStrategy))
         {
-            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, 
queryStartNanoTime);
         }
-        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, 
consistency_level, getKeyspace(), callback, writeType);
+        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, 
consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
     }
 
     private Keyspace getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index b69d8ce..a34401a 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -410,7 +410,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                     QueryOptions options = 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, 
Lists.newArrayList(sessionIdBytes,
                                                                                
                                   tminBytes,
                                                                                
                                   tmaxBytes));
-                    ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options);
+                    ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
                     UntypedResultSet result = 
UntypedResultSet.create(rows.result);
 
                     for (UntypedResultSet.Row r : result)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index cae1f1a..7aa926e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -63,11 +63,11 @@ public abstract class AbstractReadExecutor
     protected final ReadCallback handler;
     protected final TraceState traceState;
 
-    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long 
queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
-        this.handler = new ReadCallback(new DigestResolver(keyspace, command, 
consistencyLevel, targetReplicas.size()), consistencyLevel, command, 
targetReplicas);
+        this.handler = new ReadCallback(new DigestResolver(keyspace, command, 
consistencyLevel, targetReplicas.size()), consistencyLevel, command, 
targetReplicas, queryStartNanoTime);
         this.traceState = Tracing.instance.get();
 
         // Set the digest version (if we request some digests). This is the 
smallest version amongst all our target replicas since new nodes
@@ -148,7 +148,7 @@ public abstract class AbstractReadExecutor
     /**
      * @return an executor appropriate for the configured speculative read 
policy
      */
-    public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel) throws UnavailableException
+    public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().ksName);
         List<InetAddress> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
@@ -175,14 +175,14 @@ public abstract class AbstractReadExecutor
         if (retry.equals(SpeculativeRetryParam.NONE)
             || consistencyLevel == ConsistencyLevel.EACH_QUORUM
             || consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, command, 
consistencyLevel, targetReplicas);
+            return new NeverSpeculatingReadExecutor(keyspace, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
 
         if (targetReplicas.size() == allReplicas.size())
         {
             // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
             // We are going to contact every node anyway, so ask for 2 full 
data requests instead of 1, for redundancy
             // (same amount of requests in total, but we turn 1 digest request 
into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -203,16 +203,16 @@ public abstract class AbstractReadExecutor
         targetReplicas.add(extraReplica);
 
         if (retry.equals(SpeculativeRetryParam.ALWAYS))
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
+            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
     }
 
     public static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, 
long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
         }
 
         public void executeAsync()
@@ -242,9 +242,10 @@ public abstract class AbstractReadExecutor
                                        ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
-                                       List<InetAddress> targetReplicas)
+                                       List<InetAddress> targetReplicas,
+                                       long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
             this.cfs = cfs;
         }
 
@@ -314,9 +315,10 @@ public abstract class AbstractReadExecutor
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
-                                             List<InetAddress> targetReplicas)
+                                             List<InetAddress> targetReplicas,
+                                             long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
             this.cfs = cfs;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 19b3de0..f412515 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -41,7 +41,6 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
-    protected final long start;
     protected final Collection<InetAddress> naturalEndpoints;
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
@@ -50,24 +49,27 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
         = 
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;
+    private final long queryStartNanoTime;
 
     /**
      * @param callback A callback to be called when the write is successful.
+     * @param queryStartNanoTime
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
                                            Collection<InetAddress> 
naturalEndpoints,
                                            Collection<InetAddress> 
pendingEndpoints,
                                            ConsistencyLevel consistencyLevel,
                                            Runnable callback,
-                                           WriteType writeType)
+                                           WriteType writeType,
+                                           long queryStartNanoTime)
     {
         this.keyspace = keyspace;
         this.pendingEndpoints = pendingEndpoints;
-        this.start = System.nanoTime();
         this.consistencyLevel = consistencyLevel;
         this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
         this.writeType = writeType;
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public void get() throws WriteTimeoutException, WriteFailureException
@@ -76,7 +78,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
                             ? DatabaseDescriptor.getCounterWriteRpcTimeout()
                             : DatabaseDescriptor.getWriteRpcTimeout();
 
-        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - 
(System.nanoTime() - start);
+        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - 
(System.nanoTime() - queryStartNanoTime);
 
         boolean success;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ac44923..3b31794 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -33,9 +33,9 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> 
requiredBeforeFinishUpdater
             = 
AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, 
"requiredBeforeFinish");
 
-    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, 
int requiredBeforeFinish, BatchlogCleanup cleanup)
+    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, 
int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.keyspace, wrapped.naturalEndpoints, 
wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, 
wrapped.writeType);
+        super(wrapped.keyspace, wrapped.naturalEndpoints, 
wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, 
wrapped.writeType, queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index b9ae933..be8eca1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -43,10 +43,12 @@ import org.apache.cassandra.utils.FBUtilities;
 public class DataResolver extends ResponseResolver
 {
     private final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
+    private final long queryStartNanoTime;
 
-    public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
+    public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
     {
         super(keyspace, command, consistency, maxResponseCount);
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public PartitionIterator getData()
@@ -88,7 +90,7 @@ public class DataResolver extends ResponseResolver
         if (!command.limits().isUnlimited())
         {
             for (int i = 0; i < results.size(); i++)
-                results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter)));
+                results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
         }
 
         return UnfilteredPartitionIterators.mergeAndFilter(results, 
command.nowInSec(), listener);
@@ -385,12 +387,14 @@ public class DataResolver extends ResponseResolver
         private final InetAddress source;
         private final DataLimits.Counter counter;
         private final DataLimits.Counter postReconciliationCounter;
+        private final long queryStartNanoTime;
 
-        private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter)
+        private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter, long queryStartNanoTime)
         {
             this.source = source;
             this.counter = command.limits().newCounter(command.nowInSec(), 
false).onlyCount();
             this.postReconciliationCounter = postReconciliationCounter;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         @Override
@@ -503,8 +507,8 @@ public class DataResolver extends ResponseResolver
 
             private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand retryCommand)
             {
-                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1);
-                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
+                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), 
queryStartNanoTime);
                 if (StorageProxy.canDoLocalRequest(source))
                       
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index b095c7f..9584611 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -46,10 +46,11 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
                                               ConsistencyLevel 
consistencyLevel,
                                               Keyspace keyspace,
                                               Runnable callback,
-                                              WriteType writeType)
+                                              WriteType writeType,
+                                              long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, 
callback, writeType);
+        super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, 
callback, writeType, queryStartNanoTime);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();

Reply via email to