add tracing to OutboundTcpConnection
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6adf52c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6adf52c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6adf52c9 Branch: refs/heads/trunk Commit: 6adf52c94d5c819bd1b4c38be9cdc234d40aa1fd Parents: ec9af99 Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 24 21:16:29 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Oct 25 09:50:19 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ReadVerbHandler.java | 2 +- .../cassandra/db/RowMutationVerbHandler.java | 2 +- .../apache/cassandra/db/TruncateVerbHandler.java | 2 +- .../org/apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/net/OutboundTcpConnection.java | 18 +++++++++++--- .../cassandra/service/IndexScanVerbHandler.java | 2 +- .../cassandra/service/RangeSliceVerbHandler.java | 2 +- .../cassandra/service/SnapshotVerbHandler.java | 3 +- .../org/apache/cassandra/service/StorageProxy.java | 2 + src/java/org/apache/cassandra/tracing/Tracing.java | 6 ++++- 10 files changed, 28 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java index 7477993..c0814a1 100644 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java @@ -49,7 +49,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand> MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE, getResponse(command, row), ReadResponse.serializer); - logger.debug("Sending response to {}", message.from); + logger.debug("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply, id, message.from); } catch (IOException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java index a8cdfdc..842c539 100644 --- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java @@ -54,7 +54,7 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation> rm.apply(); WriteResponse response = new WriteResponse(); - logger.debug("Sending response to {}", replyTo); + logger.debug("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(response.createMessage(), id, replyTo); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/TruncateVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java index ff9448f..ea9ef14 100644 --- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java +++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java @@ -49,7 +49,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation> logger.debug("Truncate operation succeeded at this host"); TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true); - logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.from }); + logger.debug("{} applied. Enqueuing response to {}@{} ", new Object[]{ t, id, message.from }); MessagingService.instance().sendReply(response.createMessage(), id, message.from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 46ac6dd..10de977 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -692,7 +692,7 @@ public final class MessagingService implements MessagingServiceMBean public void receive(MessageIn message, String id, long timestamp) { Tracing.instance().initializeFromMessage(message); - logger.debug("Request received from {}", message.from); + logger.debug("Messsage received from {}", message.from); message = SinkManager.processInboundMessage(message, id); if (message == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index e52b4cc..1093f70 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; +import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -30,7 +31,9 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; import org.xerial.snappy.SnappyOutputStream; import org.apache.cassandra.config.Config; @@ -164,6 +167,13 @@ public class OutboundTcpConnection extends Thread { try { + byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER); + if (sessionBytes != null) + { + Tracing.instance().continueExistingSession(UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes))); + logger.debug("Sending message to {}", poolReference.endPoint()); + } + write(qm.message, qm.id, qm.timestamp, out, targetVersion); completed++; if (active.peek() == null) @@ -227,8 +237,8 @@ public class OutboundTcpConnection extends Thread } catch (IOException e) { - if (logger.isDebugEnabled()) - logger.debug("exception closing connection to " + poolReference.endPoint(), e); + if (logger.isTraceEnabled()) + logger.trace("exception closing connection to " + poolReference.endPoint(), e); } out = null; socket = null; @@ -270,7 +280,7 @@ public class OutboundTcpConnection extends Thread if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version) { - logger.debug("Detected higher max version {} (using {}); will reconnect when queued messages are done", + logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done", maxTargetVersion, targetVersion); MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion)); softCloseSocket(); @@ -281,7 +291,7 @@ public class OutboundTcpConnection extends Thread if (shouldCompressConnection()) { out.flush(); - logger.debug("Upgrading OutputStream to be compressed"); + logger.trace("Upgrading OutputStream to be compressed"); out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream()))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java index 774e046..fa4aabb 100644 --- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java +++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java @@ -44,7 +44,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand> command.index_clause.count, ThriftValidation.asIFilter(command.predicate, cfs.getComparator())); RangeSliceReply reply = new RangeSliceReply(rows); - logger.debug("Sending response to {}", message.from); + logger.debug("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply.createMessage(), id, message.from); } catch (Exception ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index fbbf88a..fd26760 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -55,7 +55,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand> throw new RuntimeException("Cannot service reads while bootstrapping!"); } RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload)); - logger.debug("Sending response to {}", message.from); + logger.debug("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply.createMessage(), id, message.from); } catch (Exception ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java index 2d51590..f0f814d 100644 --- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@ -38,8 +38,7 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> Table.open(command.keyspace).clearSnapshot(command.snapshot_name); else Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name); - if (logger.isDebugEnabled()) - logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from); + logger.debug("Enqueuing response to snapshot request {} to {} ", command.snapshot_name, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 57deaad..b08f5b8 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -170,6 +170,7 @@ public class StorageProxy implements StorageProxyMBean public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { + logger.debug("Determining replicas for mutation"); logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -247,6 +248,7 @@ public class StorageProxy implements StorageProxyMBean public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { + logger.debug("Determining replicas for atomic batch"); long startTime = System.nanoTime(); logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index c158b10..df49960 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -248,11 +248,15 @@ public class Tracing } assert sessionBytes.length == 16; - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState ts = initiatedSessions.get(sessionId); if (ts == null) ts = new TraceState(message.from, sessionId); state.set(ts); } + + public void continueExistingSession(UUID sessionId) + { + state.set(initiatedSessions.get(sessionId)); + } }
