Repository: cassandra Updated Branches: refs/heads/trunk f3399a292 -> 814bd325e
Record client ip address in tracing sessions patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for CASSANDRA-8162 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/814bd325 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/814bd325 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/814bd325 Branch: refs/heads/trunk Commit: 814bd325e55879d93f86c97189893581208556f5 Parents: f3399a2 Author: Stefania Alborghetti <[email protected]> Authored: Tue Mar 10 23:11:41 2015 -0700 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Mar 10 23:13:42 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/service/ClientState.java | 11 ++++++----- src/java/org/apache/cassandra/service/QueryState.java | 8 ++++++++ .../org/apache/cassandra/thrift/ThriftClientState.java | 4 ++-- .../apache/cassandra/thrift/ThriftSessionManager.java | 3 ++- .../org/apache/cassandra/tracing/TraceKeyspace.java | 13 +++++++++++-- src/java/org/apache/cassandra/tracing/Tracing.java | 7 ++++++- .../cassandra/transport/messages/BatchMessage.java | 2 +- .../cassandra/transport/messages/ExecuteMessage.java | 2 +- .../cassandra/transport/messages/PrepareMessage.java | 2 +- .../cassandra/transport/messages/QueryMessage.java | 2 +- 11 files changed, 40 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3053362..d6ba737 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Record client ip address in tracing sessions (CASSANDRA-8162) * Indicate partition key columns in response metadata for prepared statements (CASSANDRA-7660) * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759) http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 1bc6e9d..e2df4ff 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.HashSet; @@ -89,7 +90,7 @@ public class ClientState { try { - handler = (QueryHandler)FBUtilities.construct(customHandlerClass, "QueryHandler"); + handler = FBUtilities.construct(customHandlerClass, "QueryHandler"); logger.info("Using {} as query handler for native protocol queries (as requested with -Dcassandra.custom_query_handler_class)", customHandlerClass); } catch (Exception e) @@ -106,7 +107,7 @@ public class ClientState public final boolean isInternal; // The remote address of the client - null for internal clients. - private final SocketAddress remoteAddress; + private final InetSocketAddress remoteAddress; // The biggest timestamp that was returned by getTimestamp/assigned to a query private final AtomicLong lastTimestampMicros = new AtomicLong(0); @@ -120,7 +121,7 @@ public class ClientState this.remoteAddress = null; } - protected ClientState(SocketAddress remoteAddress) + protected ClientState(InetSocketAddress remoteAddress) { this.isInternal = false; this.remoteAddress = remoteAddress; @@ -141,7 +142,7 @@ public class ClientState */ public static ClientState forExternalCalls(SocketAddress remoteAddress) { - return new ClientState(remoteAddress); + return new ClientState((InetSocketAddress)remoteAddress); } /** @@ -181,7 +182,7 @@ public class ClientState return cqlQueryHandler; } - public SocketAddress getRemoteAddress() + public InetSocketAddress getRemoteAddress() { return remoteAddress; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index e5c21f4..af31f47 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service; +import java.net.InetAddress; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -86,4 +87,11 @@ public class QueryState Tracing.instance.newSession(session); } } + + public InetAddress getClientAddress() + { + return clientState.isInternal + ? null + : clientState.getRemoteAddress().getAddress(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/thrift/ThriftClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftClientState.java b/src/java/org/apache/cassandra/thrift/ThriftClientState.java index 319169f..6a3c50f 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftClientState.java +++ b/src/java/org/apache/cassandra/thrift/ThriftClientState.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.thrift; -import java.net.SocketAddress; +import java.net.InetSocketAddress; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ClientState; @@ -34,7 +34,7 @@ public class ThriftClientState extends ClientState { private final QueryState queryState; - public ThriftClientState(SocketAddress remoteAddress) + public ThriftClientState(InetSocketAddress remoteAddress) { super(remoteAddress); this.queryState = new QueryState(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java index ed3df6d..2703b52 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java +++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.thrift; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -57,7 +58,7 @@ public class ThriftSessionManager ThriftClientState cState = activeSocketSessions.get(socket); if (cState == null) { - cState = new ThriftClientState(socket); + cState = new ThriftClientState((InetSocketAddress)socket); activeSocketSessions.put(socket, cState); } return cState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 392eb42..f66269d 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.tracing; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; @@ -44,6 +45,7 @@ public final class TraceKeyspace "CREATE TABLE %s (" + "session_id uuid," + "command text," + + "client inet," + "coordinator inet," + "duration int," + "parameters map<text, text>," @@ -75,13 +77,20 @@ public final class TraceKeyspace return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables); } - static Mutation makeStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt, String command, int ttl) + static Mutation makeStartSessionMutation(ByteBuffer sessionId, + InetAddress client, + Map<String, String> parameters, + String request, + long startedAt, + String command, + int ttl) { Mutation mutation = new Mutation(NAME, sessionId); ColumnFamily cells = mutation.addOrGet(TraceKeyspace.Sessions); CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl); - adder.add("coordinator", FBUtilities.getBroadcastAddress()) + adder.add("client", client) + .add("coordinator", FBUtilities.getBroadcastAddress()) .add("request", request) .add("started_at", new Date(startedAt)) .add("command", command); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 12a943e..9b33444 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -197,6 +197,11 @@ public class Tracing public TraceState begin(final String request, final Map<String, String> parameters) { + return begin(request, null, parameters); + } + + public TraceState begin(final String request, final InetAddress client, final Map<String, String> parameters) + { assert isTracing(); final TraceState state = this.state.get(); @@ -209,7 +214,7 @@ public class Tracing { public void run() { - mutateWithCatch(TraceKeyspace.makeStartSessionMutation(sessionId, parameters, request, startedAt, command, ttl)); + mutateWithCatch(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl)); } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index b68c291..64b0826 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -167,7 +167,7 @@ public class BatchMessage extends Message.Request { state.createTracingSession(); // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. - Tracing.instance.begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap()); + Tracing.instance.begin("Execute batch of CQL3 queries", state.getClientAddress(), Collections.<String, String>emptyMap()); } QueryHandler handler = ClientState.getCQLQueryHandler(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 324ae00..815935a 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -128,7 +128,7 @@ public class ExecuteMessage extends Message.Request builder.put("page_size", Integer.toString(options.getPageSize())); // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. - Tracing.instance.begin("Execute CQL3 prepared query", builder.build()); + Tracing.instance.begin("Execute CQL3 prepared query", state.getClientAddress(), builder.build()); } Message.Response response = handler.processPrepared(statement, state, options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index e5970c4..1db63c3 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -71,7 +71,7 @@ public class PrepareMessage extends Message.Request if (state.traceNextQuery()) { state.createTracingSession(); - Tracing.instance.begin("Preparing CQL3 query", ImmutableMap.of("query", query)); + Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(), ImmutableMap.of("query", query)); } Message.Response response = state.getClientState().getCQLQueryHandler().prepare(query, state); http://git-wip-us.apache.org/repos/asf/cassandra/blob/814bd325/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index cb35fdb..96accb4 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -112,7 +112,7 @@ public class QueryMessage extends Message.Request if (options.getPageSize() > 0) builder.put("page_size", Integer.toString(options.getPageSize())); - Tracing.instance.begin("Execute CQL3 query", builder.build()); + Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build()); } Message.Response response = state.getClientState().getCQLQueryHandler().process(query, state, options);
