http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 382e834..06f25bf 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -17,10 +17,17 @@ */ package org.apache.cassandra.transport.messages; +import java.util.UUID; + +import com.google.common.collect.ImmutableMap; import org.jboss.netty.buffer.ChannelBuffer; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.*; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; public class PrepareMessage extends Message.Request { @@ -51,16 +58,38 @@ public class PrepareMessage extends Message.Request return codec.encode(this); } - public Message.Response execute() + public Message.Response execute(QueryState state) { try { - return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState(), false); + UUID tracingId = null; + if (isTracingRequested()) + { + tracingId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()); + state.prepareTracingSession(tracingId); + } + + if (state.traceNextQuery()) + { + state.createTracingSession(); + Tracing.instance().begin("Preparing CQL3 query", ImmutableMap.of("query", query)); + } + + Message.Response response = QueryProcessor.prepare(query, state.getClientState(), false); + + if (tracingId != null) + response.setTracingId(tracingId); + + return response; } catch (Exception e) { return ErrorMessage.fromException(e); } + finally + { + Tracing.instance().stopSession(); + } } @Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 5223528..872782d 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -17,13 +17,20 @@ */ package org.apache.cassandra.transport.messages; +import java.util.UUID; + +import com.google.common.collect.ImmutableMap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.*; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; /** * A CQL query @@ -61,11 +68,29 @@ public class QueryMessage extends Message.Request return codec.encode(this); } - public Message.Response execute() + public Message.Response execute(QueryState state) { try { - return QueryProcessor.process(query, consistency, ((ServerConnection)connection).clientState()); + UUID tracingId = null; + if (isTracingRequested()) + { + tracingId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()); + state.prepareTracingSession(tracingId); + } + + if (state.traceNextQuery()) + { + state.createTracingSession(); + Tracing.instance().begin("Execute CQL3 query", ImmutableMap.of("query", query)); + } + + Message.Response response = QueryProcessor.process(query, consistency, state); + + if (tracingId != null) + response.setTracingId(tracingId); + + return response; } catch (Exception e) { @@ -73,6 +98,10 @@ public class QueryMessage extends Message.Request logger.error("Unexpected error during query", e); return ErrorMessage.fromException(e); } + finally + { + Tracing.instance().stopSession(); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java index 656e03c..9e46e92 100644 --- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java @@ -23,6 +23,7 @@ import java.util.List; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; +import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.*; public class RegisterMessage extends Message.Request @@ -57,7 +58,7 @@ public class RegisterMessage extends Message.Request this.eventTypes = eventTypes; } - public Response execute() + public Response execute(QueryState state) { assert connection instanceof ServerConnection; Connection.Tracker tracker = ((ServerConnection)connection).getTracker(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 7e4db6e..e781517 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -24,6 +24,8 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.*; import org.apache.cassandra.utils.SemanticVersion; @@ -64,19 +66,17 @@ public class StartupMessage extends Message.Request return codec.encode(this); } - public Message.Response execute() + public Message.Response execute(QueryState state) { try { - assert connection instanceof ServerConnection; - ServerConnection c = (ServerConnection)connection; - + ClientState cState = state.getClientState(); String cqlVersion = options.get(CQL_VERSION); if (cqlVersion == null) throw new ProtocolException("Missing value CQL_VERSION in STARTUP message"); - c.clientState().setCQLVersion(cqlVersion); - if (c.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) + cState.setCQLVersion(cqlVersion); + if (cState.getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion)); if (options.containsKey(COMPRESSION)) @@ -94,7 +94,7 @@ public class StartupMessage extends Message.Request } } - if (c.clientState().isLogged()) + if (cState.isLogged()) return new ReadyMessage(); else return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
