Updated Branches: refs/heads/trunk 6adf52c94 -> 10777f2cc
add OTC tracing support for replica nodes too Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10777f2c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10777f2c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10777f2c Branch: refs/heads/trunk Commit: 10777f2cc8d0ab51fd760427a6f11a1d41aee596 Parents: 6adf52c Author: Jonathan Ellis <[email protected]> Authored: Thu Oct 25 10:05:35 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Oct 25 10:05:35 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ReadVerbHandler.java | 1 + .../cassandra/net/OutboundTcpConnection.java | 5 ++- .../cassandra/service/RangeSliceVerbHandler.java | 1 + .../org/apache/cassandra/tracing/TraceState.java | 4 +- src/java/org/apache/cassandra/tracing/Tracing.java | 38 +++++++++++--- 5 files changed, 39 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java index c0814a1..d4af0be 100644 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java @@ -27,6 +27,7 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; public class ReadVerbHandler implements IVerbHandler<ReadCommand> http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1093f70..f79e4a0 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -170,8 +171,10 @@ public class OutboundTcpConnection extends Thread byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER); if (sessionBytes != null) { - Tracing.instance().continueExistingSession(UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes))); + UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); + Tracing.instance().continueExistingSession(sessionId); logger.debug("Sending message to {}", poolReference.endPoint()); + Tracing.instance().maybeStopNonlocalSession(sessionId); } write(qm.message, qm.id, qm.timestamp, out, targetVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index fd26760..8727005 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index ae0fa20..e305471 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -36,14 +36,16 @@ public class TraceState public final InetAddress coordinator; public final Stopwatch watch; public final ByteBuffer sessionIdBytes; + public final boolean isLocallyOwned; - public TraceState(InetAddress coordinator, UUID sessionId) + public TraceState(InetAddress coordinator, UUID sessionId, boolean locallyOwned) { assert coordinator != null; assert sessionId != null; this.coordinator = coordinator; this.sessionId = sessionId; + this.isLocallyOwned = locallyOwned; sessionIdBytes = ByteBufferUtil.bytes(sessionId); watch = new Stopwatch(); watch.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index df49960..f2f8869 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -80,8 +80,7 @@ public class Tracing private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>(); - /** sessions that were initiated on this node */ - private final Map<UUID, TraceState> initiatedSessions = new ConcurrentHashMap<UUID, TraceState>(); + private final Map<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, TraceState>(); public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value) { @@ -153,13 +152,30 @@ public class Tracing { assert state.get() == null; - TraceState ts = new TraceState(localAddress, sessionId); + TraceState ts = new TraceState(localAddress, sessionId, true); state.set(ts); - initiatedSessions.put(sessionId, ts); + sessions.put(sessionId, ts); return sessionId; } + /** + * Removes the state data but does not log it as complete. + * For use by replica nodes, after replying to the master. + * + * Note: checking that the session exists is the job of the caller. + */ + public void maybeStopNonlocalSession(UUID sessionId) + { + TraceState state = sessions.get(sessionId); + assert state != null; + if (!state.isLocallyOwned) + sessions.remove(state.sessionId); + } + + /** + * Stop the session and record its complete. Called by coodinator when request is complete. + */ public void stopSession() { TraceState state = this.state.get(); @@ -186,7 +202,7 @@ public class Tracing } }); - initiatedSessions.remove(state.sessionId); + sessions.remove(state.sessionId); this.state.set(null); } } @@ -249,14 +265,20 @@ public class Tracing assert sessionBytes.length == 16; UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - TraceState ts = initiatedSessions.get(sessionId); + TraceState ts = sessions.get(sessionId); if (ts == null) - ts = new TraceState(message.from, sessionId); + { + ts = new TraceState(message.from, sessionId, false); + sessions.put(sessionId, ts); + } state.set(ts); } + /** + * Activate @param sessionId representing a session we've already seen + */ public void continueExistingSession(UUID sessionId) { - state.set(initiatedSessions.get(sessionId)); + state.set(sessions.get(sessionId)); } }
