Repository: cassandra Updated Branches: refs/heads/trunk e83f9e69e -> aa83c942a
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index a189000..910f334 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -85,7 +85,7 @@ public class CassandraServer implements Cassandra.Iface return ThriftSessionManager.instance.currentSession(); } - protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) + protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { try @@ -93,7 +93,7 @@ public class CassandraServer implements Cassandra.Iface schedule(DatabaseDescriptor.getReadRpcTimeout()); try { - return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState); + return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState, queryStartNanoTime); } finally { @@ -257,10 +257,10 @@ public class CassandraServer implements Cassandra.Iface : result; } - private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) + private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - try (PartitionIterator results = read(commands, consistency_level, cState)) + try (PartitionIterator results = read(commands, consistency_level, cState, queryStartNanoTime)) { Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<>(); while (results.hasNext()) @@ -278,6 +278,7 @@ public class CassandraServer implements Cassandra.Iface public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -296,7 +297,7 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState); + List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime); return result == null ? Collections.<ColumnOrSuperColumn>emptyList() : result; } catch (RequestValidationException e) @@ -315,15 +316,17 @@ public class CassandraServer implements Cassandra.Iface int nowInSec, SlicePredicate predicate, ConsistencyLevel consistency_level, - ClientState cState) + ClientState cState, + long queryStartNanoTime) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState).get(key); + return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).get(key); } public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { List<String> keysList = Lists.newArrayList(); @@ -345,7 +348,7 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState); + return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime); } catch (RequestValidationException e) { @@ -541,7 +544,8 @@ public class CassandraServer implements Cassandra.Iface int nowInSec, SlicePredicate predicate, ConsistencyLevel consistency_level, - ClientState cState) + ClientState cState, + long queryStartNanoTime) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family); @@ -563,12 +567,13 @@ public class CassandraServer implements Cassandra.Iface commands.add(SinglePartitionReadCommand.create(true, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, dk, filter)); } - return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState); + return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState, queryStartNanoTime); } public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -643,7 +648,7 @@ public class CassandraServer implements Cassandra.Iface DecoratedKey dk = metadata.decorateKey(key); SinglePartitionReadCommand command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter); - try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState), command)) + try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState, queryStartNanoTime), command)) { if (!result.hasNext()) throw new NotFoundException(); @@ -672,6 +677,7 @@ public class CassandraServer implements Cassandra.Iface public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -695,7 +701,7 @@ public class CassandraServer implements Cassandra.Iface int nowInSec = FBUtilities.nowInSeconds(); if (predicate.column_names != null) - return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState).size(); + return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).size(); int pageSize; // request by page if this is a large row @@ -742,7 +748,8 @@ public class CassandraServer implements Cassandra.Iface cState, pageSize, nowInSec, - true); + true, + queryStartNanoTime); } catch (IllegalArgumentException e) { @@ -766,6 +773,7 @@ public class CassandraServer implements Cassandra.Iface public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { List<String> keysList = Lists.newArrayList(); @@ -797,7 +805,8 @@ public class CassandraServer implements Cassandra.Iface FBUtilities.nowInSeconds(), predicate, consistency_level, - cState); + cState, + queryStartNanoTime); for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet()) counts.put(cf.getKey(), cf.getValue().size()); @@ -833,7 +842,7 @@ public class CassandraServer implements Cassandra.Iface return column.ttl; } - private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level) + private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level, long queryStartNanoTime) throws RequestValidationException, UnavailableException, TimedOutException { ThriftClientState cState = state(); @@ -870,12 +879,13 @@ public class CassandraServer implements Cassandra.Iface { throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage()); } - doInsert(consistency_level, Collections.singletonList(mutation)); + doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime); } public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -891,7 +901,7 @@ public class CassandraServer implements Cassandra.Iface try { - internal_insert(key, column_parent, column, consistency_level); + internal_insert(key, column_parent, column, consistency_level, queryStartNanoTime); } catch (RequestValidationException e) { @@ -911,6 +921,7 @@ public class CassandraServer implements Cassandra.Iface ConsistencyLevel commit_consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { ImmutableMap.Builder<String,String> builder = ImmutableMap.builder(); @@ -964,7 +975,8 @@ public class CassandraServer implements Cassandra.Iface new ThriftCASRequest(toLegacyCells(metadata, expected, nowInSec), partitionUpdates, nowInSec), ThriftConversion.fromThrift(serial_consistency_level), ThriftConversion.fromThrift(commit_consistency_level), - cState)) + cState, + queryStartNanoTime)) { return result == null ? new CASResult(true) @@ -1276,6 +1288,7 @@ public class CassandraServer implements Cassandra.Iface public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = Maps.newLinkedHashMap(); @@ -1294,7 +1307,7 @@ public class CassandraServer implements Cassandra.Iface try { - doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true)); + doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true), queryStartNanoTime); } catch (RequestValidationException e) { @@ -1309,6 +1322,7 @@ public class CassandraServer implements Cassandra.Iface public void atomic_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = Maps.newLinkedHashMap(); @@ -1327,7 +1341,7 @@ public class CassandraServer implements Cassandra.Iface try { - doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true); + doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true, queryStartNanoTime); } catch (RequestValidationException e) { @@ -1339,7 +1353,7 @@ public class CassandraServer implements Cassandra.Iface } } - private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp) + private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp, long queryStartNanoTime) throws RequestValidationException, UnavailableException, TimedOutException { ThriftClientState cState = state(); @@ -1386,14 +1400,15 @@ public class CassandraServer implements Cassandra.Iface org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update); if (isCommutativeOp) - doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); + doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime); else - doInsert(consistency_level, Collections.singletonList(mutation)); + doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime); } public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -1409,7 +1424,7 @@ public class CassandraServer implements Cassandra.Iface try { - internal_remove(key, column_path, timestamp, consistency_level, false); + internal_remove(key, column_path, timestamp, consistency_level, false, queryStartNanoTime); } catch (RequestValidationException e) { @@ -1421,13 +1436,13 @@ public class CassandraServer implements Cassandra.Iface } } - private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) + private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, long queryStartNanoTime) throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException { - doInsert(consistency_level, mutations, false); + doInsert(consistency_level, mutations, false, queryStartNanoTime); } - private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically) + private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically, long queryStartNanoTime) throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException { org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level); @@ -1442,7 +1457,7 @@ public class CassandraServer implements Cassandra.Iface schedule(timeout); try { - StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically); + StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically, queryStartNanoTime); } catch (RequestExecutionException e) { @@ -1480,6 +1495,7 @@ public class CassandraServer implements Cassandra.Iface public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of( @@ -1541,7 +1557,7 @@ public class CassandraServer implements Cassandra.Iface limits, new DataRange(bounds, filter), Optional.empty()); - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { assert results != null; return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); @@ -1569,6 +1585,7 @@ public class CassandraServer implements Cassandra.Iface public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("column_family", column_family, @@ -1635,7 +1652,7 @@ public class CassandraServer implements Cassandra.Iface limits, new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true), Optional.empty()); - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount()); } @@ -1684,6 +1701,7 @@ public class CassandraServer implements Cassandra.Iface public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(), @@ -1734,7 +1752,7 @@ public class CassandraServer implements Cassandra.Iface // further lookups. cmd.maybeValidateIndex(); - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); } @@ -2139,6 +2157,7 @@ public class CassandraServer implements Cassandra.Iface public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(), @@ -2183,7 +2202,7 @@ public class CassandraServer implements Cassandra.Iface PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell)); org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update); - doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); + doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime); } catch (MarshalException|UnknownColumnException e) { @@ -2203,6 +2222,7 @@ public class CassandraServer implements Cassandra.Iface public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key), @@ -2217,7 +2237,7 @@ public class CassandraServer implements Cassandra.Iface try { - internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true); + internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true, queryStartNanoTime); } catch (RequestValidationException e) { @@ -2296,6 +2316,7 @@ public class CassandraServer implements Cassandra.Iface { try { + long queryStartNanoTime = System.nanoTime(); String queryString = uncompress(query, compression); if (startSessionIfRequested()) { @@ -2313,7 +2334,8 @@ public class CassandraServer implements Cassandra.Iface cState.getQueryState(), QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList()), - null).toThriftResult(); + null, + queryStartNanoTime).toThriftResult(); } catch (RequestExecutionException e) { @@ -2359,6 +2381,7 @@ public class CassandraServer implements Cassandra.Iface public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel) throws TException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. @@ -2384,7 +2407,8 @@ public class CassandraServer implements Cassandra.Iface return ClientState.getCQLQueryHandler().processPrepared(prepared.statement, cState.getQueryState(), QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables), - null).toThriftResult(); + null, + queryStartNanoTime).toThriftResult(); } catch (RequestExecutionException e) { @@ -2404,6 +2428,7 @@ public class CassandraServer implements Cassandra.Iface public List<ColumnOrSuperColumn> get_multi_slice(MultiSliceRequest request) throws InvalidRequestException, UnavailableException, TimedOutException { + long queryStartNanoTime = System.nanoTime(); if (startSessionIfRequested()) { Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(request.key), @@ -2457,7 +2482,8 @@ public class CassandraServer implements Cassandra.Iface false, limits.perPartitionCount(), consistencyLevel, - cState).entrySet().iterator().next().getValue(); + cState, + queryStartNanoTime).entrySet().iterator().next().getValue(); } catch (RequestValidationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java index e20f99b..fe78e64 100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@ -116,7 +116,7 @@ public class TraceStateImpl extends TraceState { try { - StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY); + StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime()); } catch (OverloadedException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 66e0014..2463306 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -210,7 +210,7 @@ public abstract class Message throw new IllegalArgumentException(); } - public abstract Response execute(QueryState queryState); + public abstract Response execute(QueryState queryState, long queryStartNanoTime); public void setTracingRequested() { @@ -501,6 +501,7 @@ public abstract class Message final Response response; final ServerConnection connection; + long queryStartNanoTime = System.nanoTime(); try { @@ -512,7 +513,7 @@ public abstract class Message QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); logger.trace("Received: {}, v={}", request, connection.getVersion()); - response = request.execute(qstate); + response = request.execute(qstate, queryStartNanoTime); response.setStreamId(request.getStreamId()); response.setWarnings(ClientWarn.instance.getWarnings()); response.attach(connection); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/AuthResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java index 8b3e866..e90f740 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java @@ -68,7 +68,7 @@ public class AuthResponse extends Message.Request } @Override - public Response execute(QueryState queryState) + public Response execute(QueryState queryState, long queryStartNanoTime) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 9d1047f..6675565 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -147,7 +147,7 @@ public class BatchMessage extends Message.Request this.options = options; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { try { @@ -214,7 +214,7 @@ public class BatchMessage extends Message.Request // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor // (and no value would be really correct, so we prefer passing a clearly wrong one). BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none()); - Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload()); + Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime); if (tracingId != null) response.setTracingId(tracingId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java index 4c51cce..4ecaffd 100644 --- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java @@ -71,7 +71,7 @@ public class CredentialsMessage extends Message.Request this.credentials = credentials; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index a5348a4..088f278 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -87,7 +87,7 @@ public class ExecuteMessage extends Message.Request this.options = options; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { try { @@ -143,7 +143,7 @@ public class ExecuteMessage extends Message.Request // Some custom QueryHandlers are interested by the bound names. We provide them this information // by wrapping the QueryOptions. QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames); - Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload()); + Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime); if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java index 2f6e3da..4e95342 100644 --- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java @@ -56,7 +56,7 @@ public class OptionsMessage extends Message.Request super(Message.Type.OPTIONS); } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { List<String> cqlVersions = new ArrayList<String>(); cqlVersions.add(QueryProcessor.CQL_VERSION.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index f54d1d9..f5192de 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -58,7 +58,7 @@ public class PrepareMessage extends Message.Request this.query = query; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 3b48d52..2bd5efc 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -82,7 +82,7 @@ public class QueryMessage extends Message.Request this.options = options; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { try { @@ -112,7 +112,7 @@ public class QueryMessage extends Message.Request Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build()); } - Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload()); + Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime); if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java index 928e676..c8e48b0 100644 --- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java @@ -62,7 +62,7 @@ public class RegisterMessage extends Message.Request this.eventTypes = eventTypes; } - public Response execute(QueryState state) + public Response execute(QueryState state, long queryStartNanoTime) { assert connection instanceof ServerConnection; Connection.Tracker tracker = connection.getTracker(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 04d8e62..8966aeb 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -62,7 +62,7 @@ public class StartupMessage extends Message.Request this.options = options; } - public Message.Response execute(QueryState state) + public Message.Response execute(QueryState state, long queryStartNanoTime) { String cqlVersion = options.get(CQL_VERSION); if (cqlVersion == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java index 17e426b..6a075c9 100644 --- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -125,6 +125,6 @@ public class PstmtPersistenceTest extends CQLTester { ParsedStatement.Prepared prepared = handler.getPrepared(stmtId); Assert.assertNotNull(prepared); - handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap()); + handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index 3fee5f9..156bd66 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -132,7 +132,7 @@ public class DataResolverTest @Test public void testResolveNewerSingleRow() throws UnknownHostException { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") .add("c1", "v1") @@ -161,7 +161,7 @@ public class DataResolverTest @Test public void testResolveDisjointSingleRow() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") .add("c1", "v1") @@ -196,7 +196,7 @@ public class DataResolverTest public void testResolveDisjointMultipleRows() throws UnknownHostException { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") .add("c1", "v1") @@ -242,7 +242,7 @@ public class DataResolverTest @Test public void testResolveDisjointMultipleRowsWithRangeTombstones() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime()); RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec); RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec); @@ -322,7 +322,7 @@ public class DataResolverTest @Test public void testResolveWithOneEmpty() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .add("c2", "v2") @@ -349,7 +349,7 @@ public class DataResolverTest @Test public void testResolveWithBothEmpty() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false))); resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false))); @@ -364,7 +364,7 @@ public class DataResolverTest @Test public void testResolveDeleted() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); // one response with columns timestamped before a delete in another response InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") @@ -389,7 +389,7 @@ public class DataResolverTest @Test public void testResolveMultipleDeleted() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime()); // deletes and columns with interleaved timestamp, with out of order return sequence InetAddress peer1 = peer(); resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec))); @@ -471,7 +471,7 @@ public class DataResolverTest */ private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2) { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime()); InetAddress peer1 = peer(); InetAddress peer2 = peer(); @@ -562,7 +562,7 @@ public class DataResolverTest public void testResolveComplexDelete() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime()); long[] ts = {100, 200}; @@ -611,7 +611,7 @@ public class DataResolverTest { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime()); long[] ts = {100, 200}; @@ -654,7 +654,7 @@ public class DataResolverTest public void testResolveNewCollection() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime()); long[] ts = {100, 200}; @@ -700,7 +700,7 @@ public class DataResolverTest public void testResolveNewCollectionOverwritingDeleted() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime()); long[] ts = {100, 200}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index 260c507..f8a77e1 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -316,12 +316,13 @@ public class MessagePayloadTest extends CQLTester public ResultMessage process(String query, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) - throws RequestExecutionException, RequestValidationException + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) + throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload; - ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload); + ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload, queryStartNanoTime); if (customPayload != null) { result.setCustomPayload(responsePayload); @@ -333,12 +334,13 @@ public class MessagePayloadTest extends CQLTester public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, - Map<String, ByteBuffer> customPayload) + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload; - ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload); + ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload, queryStartNanoTime); if (customPayload != null) { result.setCustomPayload(responsePayload); @@ -350,12 +352,13 @@ public class MessagePayloadTest extends CQLTester public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, - Map<String, ByteBuffer> customPayload) - throws RequestExecutionException, RequestValidationException + Map<String, ByteBuffer> customPayload, + long queryStartNanoTime) + throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload; - ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload); + ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload, queryStartNanoTime); if (customPayload != null) { result.setCustomPayload(responsePayload);
