Repository: cassandra
Updated Branches:
  refs/heads/trunk 78ca3447c -> a9ec46a61


List clients by protocol versions `nodetool clientstats --by-protocol`

patch by Dinesh Joshi; reviewed by jasobrown for CASSANDRA-14335


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9ec46a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9ec46a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9ec46a6

Branch: refs/heads/trunk
Commit: a9ec46a613ae5602ced004935c9954638e83e735
Parents: 78ca344
Author: Dinesh A. Joshi <dinesh.jo...@apple.com>
Authored: Mon Apr 30 17:18:44 2018 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Fri May 4 11:19:44 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../cassandra/service/CassandraDaemon.java      |   6 +
 .../service/NativeTransportService.java         |  17 +++
 .../cassandra/service/StorageService.java       |   7 ++
 .../cassandra/service/StorageServiceMBean.java  |   4 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   6 +
 .../cassandra/tools/nodetool/ClientStats.java   |  41 +++++++
 .../transport/ProtocolVersionTracker.java       | 109 ++++++++++++++++++
 .../org/apache/cassandra/transport/Server.java  |  32 ++++++
 .../transport/ProtocolVersionTrackerTest.java   | 115 +++++++++++++++++++
 10 files changed, 339 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 628d0af..25c237f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,9 +1,10 @@
 4.0
  * Replace deprecated junit.framework.Assert usages with org.junit.Assert 
(CASSANDRA-14431)
  * cassandra-stress throws NPE if insert section isn't specified in user 
profile (CASSSANDRA-14426)
+ * List clients by protocol versions `nodetool clientstats --by-protocol` 
(CASSANDRA-14335)
  * Improve LatencyMetrics performance by reducing write path processing 
(CASSANDRA-14281)
  * Add network authz (CASSANDRA-13985)
- * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDAR-14389)
+ * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDRA-14389)
  * nodetool listsnapshots is missing local system keyspace snapshots 
(CASSANDRA-14381)
  * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
  * Rename nodetool --with-port to --print-port to disambiguate from --port 
(CASSANDRA-14392)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5ca3844..7e7649d 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -679,6 +679,10 @@ public class CassandraDaemon
         instance.activate();
     }
 
+    public void clearConnectionHistory() {
+        nativeTransportService.clearConnectionHistory();
+    }
+
     private void exitOrFail(int code, String message)
     {
         exitOrFail(code, message, null);
@@ -731,5 +735,7 @@ public class CassandraDaemon
          * Returns whether the server is currently running.
          */
         public boolean isRunning();
+
+        public void clearConnectionHistory();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/NativeTransportService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java 
b/src/java/org/apache/cassandra/service/NativeTransportService.java
index d70e56e..39b334e 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -148,6 +148,17 @@ public class NativeTransportService
             }
             return result;
         });
+
+        ClientMetrics.instance.addGauge("clientsByProtocolVersion", () ->
+        {
+            List<Map<String, String>> result = new ArrayList<>();
+            for (Server server : servers)
+            {
+                result.addAll(server.getClientsByProtocolVersion());
+            }
+            return result;
+        });
+
         AuthMetrics.init();
 
         initialized = true;
@@ -225,4 +236,10 @@ public class NativeTransportService
     {
         return servers;
     }
+
+    public void clearConnectionHistory()
+    {
+        for (Server server : servers)
+            server.clearConnectionHistory();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0f114dd..c94f603 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5380,4 +5380,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
         logger.info("Updated hinted_handoff_throttle_in_kb to {}", 
throttleInKB);
     }
+
+    @Override
+    public void clearConnectionHistory()
+    {
+        daemon.clearConnectionHistory();
+        logger.info("Cleared connection history");
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 62a73de..20b7400 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -672,4 +672,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      * @return true if the node successfully starts resuming. (this does not 
mean bootstrap streaming was success.)
      */
     public boolean resumeBootstrap();
+
+
+    /** Clears the history of clients that have connected in the past **/
+    void clearConnectionHistory();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 49b6563..4fdb563 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1520,6 +1520,7 @@ public class NodeProbe implements AutoCloseable
                 case "connections": // List<Map<String,String>> - list of all 
native connections and their properties
                 case "connectedNativeClients": // number of connected native 
clients
                 case "connectedNativeClientsByUser": // number of native 
clients by username
+                case "clientsByProtocolVersion": // number of native clients 
by username
                     return JMX.newMBeanProxy(mbeanServerConn,
                             new 
ObjectName("org.apache.cassandra.metrics:type=Client,name=" + metricName),
                             
CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();
@@ -1668,6 +1669,11 @@ public class NodeProbe implements AutoCloseable
     {
         msProxy.reloadSslCertificates();
     }
+
+    public void clearConnectionHistory()
+    {
+        ssProxy.clearConnectionHistory();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java 
b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
index 0469074..9b4ada8 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.tools.nodetool;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -34,9 +36,48 @@ public class ClientStats extends NodeToolCmd
     @Option(title = "list_connections", name = "--all", description = "Lists 
all connections")
     private boolean listConnections = false;
 
+    @Option(title = "by_protocol", name = "--by-protocol", description = 
"Lists last 100 client connections with oldest protocol version")
+    private boolean oldestProtocolConnections = false;
+
+    @Option(title = "clear_history", name = "--clear-history", description = 
"Clear the history of connected clients")
+    private boolean clearConnectionHistory = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
+        if (clearConnectionHistory)
+        {
+            System.out.println("Clearing history");
+            probe.clearConnectionHistory();
+            return;
+        }
+
+        if (oldestProtocolConnections)
+        {
+            SimpleDateFormat sdf = new SimpleDateFormat("MMM dd, yyyy 
HH:mm:ss");
+
+            System.out.println("Clients by protocol version");
+            System.out.println("");
+
+            List<Map<String, String>> clients = (List<Map<String, String>>) 
probe.getClientMetric("clientsByProtocolVersion");
+
+            if (!clients.isEmpty())
+            {
+                TableBuilder table = new TableBuilder();
+                table.add("Protocol-Version", "IP-Address", "Last-Seen");
+
+                for (Map<String, String> client : clients)
+                {
+                    table.add(client.get("protocolVersion"), 
client.get("inetAddress"), sdf.format(new 
Date(Long.valueOf(client.get("lastSeenTime")))));
+                }
+
+                table.printTo(System.out);
+                System.out.println();
+            }
+
+            return;
+        }
+
         if (listConnections)
         {
             List<Map<String, String>> clients = (List<Map<String, String>>) 
probe.getClientMetric("connections");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java 
b/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java
new file mode 100644
index 0000000..a2468bd
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersionTracker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.utils.Clock;
+
+/**
+ * This class tracks the last 100 connections per protocol version
+ */
+public class ProtocolVersionTracker
+{
+    public static final int DEFAULT_MAX_CAPACITY = 100;
+
+    @VisibleForTesting
+    final EnumMap<ProtocolVersion, LoadingCache<InetAddress, Long>> 
clientsByProto;
+
+    public ProtocolVersionTracker()
+    {
+        this(DEFAULT_MAX_CAPACITY);
+    }
+
+    public ProtocolVersionTracker(final int capacity)
+    {
+        this.clientsByProto = new EnumMap<>(ProtocolVersion.class);
+
+        for (ProtocolVersion version : ProtocolVersion.values())
+        {
+            clientsByProto.put(version, 
Caffeine.newBuilder().maximumSize(capacity)
+                                                .build(key -> 
Clock.instance.currentTimeMillis()));
+        }
+    }
+
+    void addConnection(final InetAddress addr, final ProtocolVersion version)
+    {
+        if (addr == null || version == null) return;
+
+        LoadingCache<InetAddress, Long> clients = clientsByProto.get(version);
+        clients.put(addr, Clock.instance.currentTimeMillis());
+    }
+
+    public LinkedHashMap<ProtocolVersion, ImmutableSet<ClientIPAndTime>> 
getAll()
+    {
+        LinkedHashMap<ProtocolVersion, ImmutableSet<ClientIPAndTime>> result = 
new LinkedHashMap<>();
+        for (ProtocolVersion version : ProtocolVersion.values())
+        {
+            result.put(version, 
ImmutableSet.copyOf(clientsByProto.get(version).asMap().entrySet().stream()
+                                                                  .map(e -> 
new ClientIPAndTime(e.getKey(), e.getValue())).collect(Collectors.toSet())));
+        }
+        return result;
+    }
+
+    public void clear()
+    {
+        for (Map.Entry<ProtocolVersion, LoadingCache<InetAddress, Long>> entry 
: clientsByProto.entrySet())
+        {
+            entry.getValue().invalidateAll();
+        }
+    }
+
+    public static class ClientIPAndTime
+    {
+        final InetAddress inetAddress;
+        final long lastSeen;
+
+        public ClientIPAndTime(final InetAddress inetAddress, final long 
lastSeen)
+        {
+            Preconditions.checkNotNull(inetAddress);
+            this.inetAddress = inetAddress;
+            this.lastSeen = lastSeen;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ClientIPAndTime{" +
+                   "inetAddress=" + inetAddress +
+                   ", lastSeen=" + lastSeen +
+                   '}';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 7aade66..a7cfdfb 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -206,6 +207,32 @@ public class Server implements CassandraDaemon.Server
         return result;
     }
 
+    public List<Map<String, String>> getClientsByProtocolVersion() {
+        LinkedHashMap<ProtocolVersion, 
ImmutableSet<ProtocolVersionTracker.ClientIPAndTime>> all = 
connectionTracker.protoTracker.getAll();
+        List<Map<String, String>> result = new ArrayList<>();
+
+        for (Map.Entry<ProtocolVersion, 
ImmutableSet<ProtocolVersionTracker.ClientIPAndTime>> entry : all.entrySet())
+        {
+            ProtocolVersion protoVersion = entry.getKey();
+
+            for (ProtocolVersionTracker.ClientIPAndTime client : 
entry.getValue())
+            {
+                result.add(new ImmutableMap.Builder<String, String>()
+                           .put("protocolVersion", protoVersion.toString())
+                           .put("inetAddress", client.inetAddress.toString())
+                           .put("lastSeenTime", 
String.valueOf(client.lastSeen))
+                           .build());
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void clearConnectionHistory()
+    {
+        connectionTracker.protoTracker.clear();
+    }
+
     private void close()
     {
         // Close opened connections
@@ -282,6 +309,7 @@ public class Server implements CassandraDaemon.Server
         // TODO: should we be using the GlobalEventExecutor or defining our 
own?
         public final ChannelGroup allChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
         private final EnumMap<Event.Type, ChannelGroup> groups = new 
EnumMap<>(Event.Type.class);
+        private final ProtocolVersionTracker protoTracker = new 
ProtocolVersionTracker();
 
         public ConnectionTracker()
         {
@@ -292,6 +320,9 @@ public class Server implements CassandraDaemon.Server
         public void addConnection(Channel ch, Connection connection)
         {
             allChannels.add(ch);
+
+            if (ch.remoteAddress() instanceof InetSocketAddress)
+                protoTracker.addConnection(((InetSocketAddress) 
ch.remoteAddress()).getAddress(), connection.getVersion());
         }
 
         public void register(Event.Type type, Channel ch)
@@ -334,6 +365,7 @@ public class Server implements CassandraDaemon.Server
             }
             return result;
         }
+
     }
 
     private static class Initializer extends ChannelInitializer<Channel>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9ec46a6/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java 
b/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java
new file mode 100644
index 0000000..6808c0a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTrackerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import static 
org.apache.cassandra.transport.ProtocolVersionTracker.ClientIPAndTime;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ProtocolVersionTrackerTest
+{
+    @Test
+    public void addConnection_shouldUpdateSetToLatestTimestamp() throws 
UnknownHostException, InterruptedException
+    {
+        ProtocolVersionTracker pvt = new ProtocolVersionTracker();
+        final InetAddress client = InetAddress.getByName("127.0.1.1");
+        pvt.addConnection(client, ProtocolVersion.V4);
+
+        for(InetAddress addr : getMockConnections(10))
+        {
+            pvt.addConnection(addr, ProtocolVersion.V4);
+        }
+
+        ImmutableSet<ClientIPAndTime> clientIPAndTimes1 = 
pvt.getAll().get(ProtocolVersion.V4);
+        assertEquals(10, clientIPAndTimes1.size());
+
+        Thread.sleep(10);
+
+        pvt.addConnection(client, ProtocolVersion.V4);
+        ImmutableSet<ClientIPAndTime> clientIPAndTimes2 = 
pvt.getAll().get(ProtocolVersion.V4);
+        assertEquals(10, clientIPAndTimes2.size());
+
+        long ls1 = clientIPAndTimes1.stream().filter(c -> 
c.inetAddress.equals(client)).findFirst().get().lastSeen;
+        long ls2 = clientIPAndTimes2.stream().filter(c -> 
c.inetAddress.equals(client)).findFirst().get().lastSeen;
+
+        assertTrue(ls2 > ls1);
+    }
+
+    @Test
+    public void addConnection_validConnection_Succeeds()
+    {
+        ProtocolVersionTracker pvt = new ProtocolVersionTracker();
+
+        for(InetAddress addr : getMockConnections(10))
+        {
+            pvt.addConnection(addr, ProtocolVersion.V4);
+        }
+
+        for(InetAddress addr : getMockConnections(7))
+        {
+            pvt.addConnection(addr, ProtocolVersion.V3);
+        }
+
+        assertEquals(5, pvt.getAll().size());
+        assertEquals(0, pvt.getAll().get(ProtocolVersion.V2).size());
+        assertEquals(7, pvt.getAll().get(ProtocolVersion.V3).size());
+        assertEquals(10, pvt.getAll().get(ProtocolVersion.V4).size());
+    }
+
+    @Test
+    public void clear()
+    {
+        ProtocolVersionTracker pvt = new ProtocolVersionTracker();
+
+        for(InetAddress addr : getMockConnections(7))
+        {
+            pvt.addConnection(addr, ProtocolVersion.V3);
+        }
+
+        assertEquals(7, pvt.getAll().get(ProtocolVersion.V3).size());
+        pvt.clear();
+
+        assertEquals(0, pvt.getAll().get(ProtocolVersion.V3).size());
+    }
+
+    /* Helper */
+    private List<InetAddress> getMockConnections(int num)
+    {
+        return IntStream.range(0, num).mapToObj(n -> {
+            try
+            {
+                return InetAddress.getByName("127.0.1." + n);
+            }
+            catch (UnknownHostException e)
+            {
+                e.printStackTrace();
+            }
+            return null;
+        }).collect(Collectors.toList());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to