Repository: cassandra Updated Branches: refs/heads/trunk 78ca3447c -> a9ec46a61
List clients by protocol versions `nodetool clientstats --by-protocol` patch by Dinesh Joshi; reviewed by jasobrown for CASSANDRA-14335 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9ec46a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9ec46a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9ec46a6 Branch: refs/heads/trunk Commit: a9ec46a613ae5602ced004935c9954638e83e735 Parents: 78ca344 Author: Dinesh A. Joshi <dinesh.jo...@apple.com> Authored: Mon Apr 30 17:18:44 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri May 4 11:19:44 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../cassandra/service/CassandraDaemon.java | 6 + .../service/NativeTransportService.java | 17 +++ .../cassandra/service/StorageService.java | 7 ++ .../cassandra/service/StorageServiceMBean.java | 4 + .../org/apache/cassandra/tools/NodeProbe.java | 6 + .../cassandra/tools/nodetool/ClientStats.java | 41 +++++++ .../transport/ProtocolVersionTracker.java | 109 ++++++++++++++++++ .../org/apache/cassandra/transport/Server.java | 32 ++++++ .../transport/ProtocolVersionTrackerTest.java | 115 +++++++++++++++++++ 10 files changed, 339 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 628d0af..25c237f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,9 +1,10 @@ 4.0 * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431) * cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426) + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335) * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281) * Add network authz (CASSANDRA-13985) - * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDAR-14389) + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389) * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381) * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402) * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 5ca3844..7e7649d 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -679,6 +679,10 @@ public class CassandraDaemon instance.activate(); } + public void clearConnectionHistory() { + nativeTransportService.clearConnectionHistory(); + } + private void exitOrFail(int code, String message) { exitOrFail(code, message, null); @@ -731,5 +735,7 @@ public class CassandraDaemon * Returns whether the server is currently running. */ public boolean isRunning(); + + public void clearConnectionHistory(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/NativeTransportService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java index d70e56e..39b334e 100644 --- a/src/java/org/apache/cassandra/service/NativeTransportService.java +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@ -148,6 +148,17 @@ public class NativeTransportService } return result; }); + + ClientMetrics.instance.addGauge("clientsByProtocolVersion", () -> + { + List<Map<String, String>> result = new ArrayList<>(); + for (Server server : servers) + { + result.addAll(server.getClientsByProtocolVersion()); + } + return result; + }); + AuthMetrics.init(); initialized = true; @@ -225,4 +236,10 @@ public class NativeTransportService { return servers; } + + public void clearConnectionHistory() + { + for (Server server : servers) + server.clearConnectionHistory(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0f114dd..c94f603 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5380,4 +5380,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB); } + + @Override + public void clearConnectionHistory() + { + daemon.clearConnectionHistory(); + logger.info("Cleared connection history"); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 62a73de..20b7400 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -672,4 +672,8 @@ public interface StorageServiceMBean extends NotificationEmitter * @return true if the node successfully starts resuming. (this does not mean bootstrap streaming was success.) */ public boolean resumeBootstrap(); + + + /** Clears the history of clients that have connected in the past **/ + void clearConnectionHistory(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 49b6563..4fdb563 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1520,6 +1520,7 @@ public class NodeProbe implements AutoCloseable case "connections": // List<Map<String,String>> - list of all native connections and their properties case "connectedNativeClients": // number of connected native clients case "connectedNativeClientsByUser": // number of native clients by username + case "clientsByProtocolVersion": // number of native clients by username return JMX.newMBeanProxy(mbeanServerConn, new ObjectName("org.apache.cassandra.metrics:type=Client,name=" + metricName), CassandraMetricsRegistry.JmxGaugeMBean.class).getValue(); @@ -1668,6 +1669,11 @@ public class NodeProbe implements AutoCloseable { msProxy.reloadSslCertificates(); } + + public void clearConnectionHistory() + { + ssProxy.clearConnectionHistory(); + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java index 0469074..9b4ada8 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.tools.nodetool; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,9 +36,48 @@ public class ClientStats extends NodeToolCmd @Option(title = "list_connections", name = "--all", description = "Lists all connections") private boolean listConnections = false; + @Option(title = "by_protocol", name = "--by-protocol", description = "Lists last 100 client connections with oldest protocol version") + private boolean oldestProtocolConnections = false; + + @Option(title = "clear_history", name = "--clear-history", description = "Clear the history of connected clients") + private boolean clearConnectionHistory = false; + @Override public void execute(NodeProbe probe) { + if (clearConnectionHistory) + { + System.out.println("Clearing history"); + probe.clearConnectionHistory(); + return; + } + + if (oldestProtocolConnections) + { + SimpleDateFormat sdf = new SimpleDateFormat("MMM dd, yyyy HH:mm:ss"); + + System.out.println("Clients by protocol version"); + System.out.println(""); + + List<Map<String, String>> clients = (List<Map<String, String>>) probe.getClientMetric("clientsByProtocolVersion"); + + if (!clients.isEmpty()) + { + TableBuilder table = new TableBuilder(); + table.add("Protocol-Version", "IP-Address", "Last-Seen"); + + for (Map<String, String> client : clients) + { + table.add(client.get("protocolVersion"), client.get("inetAddress"), sdf.format(new Date(Long.valueOf(client.get("lastSeenTime"))))); + } + + table.printTo(System.out); + System.out.println(); + } + + return; + } + if (listConnections) { List<Map<String, String>> clients = (List<Map<String, String>>) probe.getClientMetric("connections"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java b/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java new file mode 100644 index 0000000..a2468bd --- /dev/null +++ b/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.net.InetAddress; +import java.util.EnumMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.utils.Clock; + +/** + * This class tracks the last 100 connections per protocol version + */ +public class ProtocolVersionTracker +{ + public static final int DEFAULT_MAX_CAPACITY = 100; + + @VisibleForTesting + final EnumMap<ProtocolVersion, LoadingCache<InetAddress, Long>> clientsByProto; + + public ProtocolVersionTracker() + { + this(DEFAULT_MAX_CAPACITY); + } + + public ProtocolVersionTracker(final int capacity) + { + this.clientsByProto = new EnumMap<>(ProtocolVersion.class); + + for (ProtocolVersion version : ProtocolVersion.values()) + { + clientsByProto.put(version, Caffeine.newBuilder().maximumSize(capacity) + .build(key -> Clock.instance.currentTimeMillis())); + } + } + + void addConnection(final InetAddress addr, final ProtocolVersion version) + { + if (addr == null || version == null) return; + + LoadingCache<InetAddress, Long> clients = clientsByProto.get(version); + clients.put(addr, Clock.instance.currentTimeMillis()); + } + + public LinkedHashMap<ProtocolVersion, ImmutableSet<ClientIPAndTime>> getAll() + { + LinkedHashMap<ProtocolVersion, ImmutableSet<ClientIPAndTime>> result = new LinkedHashMap<>(); + for (ProtocolVersion version : ProtocolVersion.values()) + { + result.put(version, ImmutableSet.copyOf(clientsByProto.get(version).asMap().entrySet().stream() + .map(e -> new ClientIPAndTime(e.getKey(), e.getValue())).collect(Collectors.toSet()))); + } + return result; + } + + public void clear() + { + for (Map.Entry<ProtocolVersion, LoadingCache<InetAddress, Long>> entry : clientsByProto.entrySet()) + { + entry.getValue().invalidateAll(); + } + } + + public static class ClientIPAndTime + { + final InetAddress inetAddress; + final long lastSeen; + + public ClientIPAndTime(final InetAddress inetAddress, final long lastSeen) + { + Preconditions.checkNotNull(inetAddress); + this.inetAddress = inetAddress; + this.lastSeen = lastSeen; + } + + @Override + public String toString() + { + return "ClientIPAndTime{" + + "inetAddress=" + inetAddress + + ", lastSeen=" + lastSeen + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 7aade66..a7cfdfb 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -206,6 +207,32 @@ public class Server implements CassandraDaemon.Server return result; } + public List<Map<String, String>> getClientsByProtocolVersion() { + LinkedHashMap<ProtocolVersion, ImmutableSet<ProtocolVersionTracker.ClientIPAndTime>> all = connectionTracker.protoTracker.getAll(); + List<Map<String, String>> result = new ArrayList<>(); + + for (Map.Entry<ProtocolVersion, ImmutableSet<ProtocolVersionTracker.ClientIPAndTime>> entry : all.entrySet()) + { + ProtocolVersion protoVersion = entry.getKey(); + + for (ProtocolVersionTracker.ClientIPAndTime client : entry.getValue()) + { + result.add(new ImmutableMap.Builder<String, String>() + .put("protocolVersion", protoVersion.toString()) + .put("inetAddress", client.inetAddress.toString()) + .put("lastSeenTime", String.valueOf(client.lastSeen)) + .build()); + } + } + return result; + } + + @Override + public void clearConnectionHistory() + { + connectionTracker.protoTracker.clear(); + } + private void close() { // Close opened connections @@ -282,6 +309,7 @@ public class Server implements CassandraDaemon.Server // TODO: should we be using the GlobalEventExecutor or defining our own? public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class); + private final ProtocolVersionTracker protoTracker = new ProtocolVersionTracker(); public ConnectionTracker() { @@ -292,6 +320,9 @@ public class Server implements CassandraDaemon.Server public void addConnection(Channel ch, Connection connection) { allChannels.add(ch); + + if (ch.remoteAddress() instanceof InetSocketAddress) + protoTracker.addConnection(((InetSocketAddress) ch.remoteAddress()).getAddress(), connection.getVersion()); } public void register(Event.Type type, Channel ch) @@ -334,6 +365,7 @@ public class Server implements CassandraDaemon.Server } return result; } + } private static class Initializer extends ChannelInitializer<Channel> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java b/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java new file mode 100644 index 0000000..6808c0a --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import static org.apache.cassandra.transport.ProtocolVersionTracker.ClientIPAndTime; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ProtocolVersionTrackerTest +{ + @Test + public void addConnection_shouldUpdateSetToLatestTimestamp() throws UnknownHostException, InterruptedException + { + ProtocolVersionTracker pvt = new ProtocolVersionTracker(); + final InetAddress client = InetAddress.getByName("127.0.1.1"); + pvt.addConnection(client, ProtocolVersion.V4); + + for(InetAddress addr : getMockConnections(10)) + { + pvt.addConnection(addr, ProtocolVersion.V4); + } + + ImmutableSet<ClientIPAndTime> clientIPAndTimes1 = pvt.getAll().get(ProtocolVersion.V4); + assertEquals(10, clientIPAndTimes1.size()); + + Thread.sleep(10); + + pvt.addConnection(client, ProtocolVersion.V4); + ImmutableSet<ClientIPAndTime> clientIPAndTimes2 = pvt.getAll().get(ProtocolVersion.V4); + assertEquals(10, clientIPAndTimes2.size()); + + long ls1 = clientIPAndTimes1.stream().filter(c -> c.inetAddress.equals(client)).findFirst().get().lastSeen; + long ls2 = clientIPAndTimes2.stream().filter(c -> c.inetAddress.equals(client)).findFirst().get().lastSeen; + + assertTrue(ls2 > ls1); + } + + @Test + public void addConnection_validConnection_Succeeds() + { + ProtocolVersionTracker pvt = new ProtocolVersionTracker(); + + for(InetAddress addr : getMockConnections(10)) + { + pvt.addConnection(addr, ProtocolVersion.V4); + } + + for(InetAddress addr : getMockConnections(7)) + { + pvt.addConnection(addr, ProtocolVersion.V3); + } + + assertEquals(5, pvt.getAll().size()); + assertEquals(0, pvt.getAll().get(ProtocolVersion.V2).size()); + assertEquals(7, pvt.getAll().get(ProtocolVersion.V3).size()); + assertEquals(10, pvt.getAll().get(ProtocolVersion.V4).size()); + } + + @Test + public void clear() + { + ProtocolVersionTracker pvt = new ProtocolVersionTracker(); + + for(InetAddress addr : getMockConnections(7)) + { + pvt.addConnection(addr, ProtocolVersion.V3); + } + + assertEquals(7, pvt.getAll().get(ProtocolVersion.V3).size()); + pvt.clear(); + + assertEquals(0, pvt.getAll().get(ProtocolVersion.V3).size()); + } + + /* Helper */ + private List<InetAddress> getMockConnections(int num) + { + return IntStream.range(0, num).mapToObj(n -> { + try + { + return InetAddress.getByName("127.0.1." + n); + } + catch (UnknownHostException e) + { + e.printStackTrace(); + } + return null; + }).collect(Collectors.toList()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org