This is an automated email from the ASF dual-hosted git repository.

jeetkundoug pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41ea72ba CASSANDRASC-144 Introduces sidecar endpoint to vend client 
connection stats (#135)
41ea72ba is described below

commit 41ea72bafb37f971ecf2649587595da74de51556
Author: Arjun Ashok <[email protected]>
AuthorDate: Fri Oct 25 12:37:25 2024 -0700

    CASSANDRASC-144 Introduces sidecar endpoint to vend client connection stats 
(#135)
---
 adapters/base/build.gradle                         |   1 +
 .../sidecar/adapters/base/CassandraAdapter.java    |   7 +
 .../adapters/base/CassandraMetricsOperations.java  | 104 +++++++++++
 .../adapters/base/db/ConnectedClientStats.java     |  87 ++++++++++
 .../db/ConnectedClientStatsDatabaseAccessor.java   |  64 +++++++
 .../base/db/ConnectedClientStatsSummary.java       |  56 ++++++
 .../base/db/schema/ConnectedClientsSchema.java     |  91 ++++++++++
 .../adapters/base/db/ConnectedClientStatsTest.java | 130 ++++++++++++++
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   2 +
 .../request/ConnectedClientStatsRequest.java       |  31 ++--
 .../response/ConnectedClientStatsResponse.java     |  75 ++++++++
 .../response/data/ClientConnectionEntry.java       | 166 ++++++++++++++++++
 .../cassandra/sidecar/client/RequestContext.java   |  12 ++
 .../cassandra/sidecar/client/SidecarClient.java    |  13 ++
 .../sidecar/client/SidecarClientTest.java          |  42 +++++
 .../sidecar/common/server/ICassandraAdapter.java   |   6 +
 .../sidecar/common/server/MetricsOperations.java   |  19 +-
 .../sidecar/db/DataObjectMappingException.java     |   0
 .../cassandra/sidecar/db/DatabaseAccessor.java     |   6 +-
 .../sidecar/db/schema/AbstractSchema.java          |  81 +++++++++
 .../cassandra/sidecar/db/schema/TableSchema.java   |   0
 .../SidecarSchemaModificationException.java        |   0
 .../sidecar/cluster/CassandraAdapterDelegate.java  |   8 +
 .../sidecar/db/RestoreJobDatabaseAccessor.java     |   5 +-
 .../sidecar/db/RestoreRangeDatabaseAccessor.java   |   5 +-
 .../sidecar/db/RestoreSliceDatabaseAccessor.java   |   5 +-
 .../cassandra/sidecar/db/schema/SidecarSchema.java |   2 +-
 .../cassandra/sidecar/metrics/SchemaMetrics.java   |   3 +-
 .../cassandra/sidecar/metrics/ServerMetrics.java   |   4 +-
 .../cassandra/sidecar/routes/AbstractHandler.java  |  23 +++
 .../routes/ConnectedClientStatsHandler.java        |  72 ++++++++
 .../cassandra/sidecar/server/MainModule.java       |   5 +
 ...ConnectedClientStatsHandlerIntegrationTest.java | 193 +++++++++++++++++++++
 .../routes/ConnectedClientStatsHandlerTest.java    | 169 ++++++++++++++++++
 34 files changed, 1457 insertions(+), 30 deletions(-)

diff --git a/adapters/base/build.gradle b/adapters/base/build.gradle
index c34ae261..006b3f28 100644
--- a/adapters/base/build.gradle
+++ b/adapters/base/build.gradle
@@ -62,6 +62,7 @@ dependencies {
     compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
     implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
 
+    testImplementation('com.datastax.cassandra:cassandra-driver-core:3.11.3')
     testImplementation 
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
     testImplementation 
"org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
     testImplementation "org.assertj:assertj-core:3.24.2"
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index fb61054b..55a7b99b 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -34,6 +34,7 @@ import 
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.server.ClusterMembershipOperations;
 import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.common.server.TableOperations;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
@@ -169,6 +170,12 @@ public class CassandraAdapter implements ICassandraAdapter
         return new CassandraStorageOperations(jmxClient, dnsResolver);
     }
 
+    @Override
+    public MetricsOperations metricsOperations()
+    {
+        return new CassandraMetricsOperations(cqlSessionProvider);
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
new file mode 100644
index 00000000..cc3ff9fc
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStats;
+import 
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsDatabaseAccessor;
+import 
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsSummary;
+import 
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Default implementation that pulls methods from the Cassandra Metrics Proxy
+ */
+public class CassandraMetricsOperations implements MetricsOperations
+{
+    private final ConnectedClientStatsDatabaseAccessor dbAccessor;
+
+    /**
+     * Creates a new instance with the provided {@link CQLSessionProvider}
+     */
+    public CassandraMetricsOperations(CQLSessionProvider session)
+    {
+        this.dbAccessor = new ConnectedClientStatsDatabaseAccessor(session, 
new ConnectedClientsSchema());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ConnectedClientStatsResponse connectedClientStats(boolean 
summaryOnly)
+    {
+        if (summaryOnly)
+        {
+            return connectedClientSummary();
+        }
+        return connectedClientDetails();
+    }
+
+    public ConnectedClientStatsResponse connectedClientDetails()
+    {
+        List<ClientConnectionEntry> entries = 
statsToEntries(dbAccessor.stats());
+        Map<String, Long> connectionsByUser = 
entries.stream().collect(Collectors.groupingBy(ClientConnectionEntry::username,
+                                                                               
              Collectors.counting()));
+        long totalConnectedClients = entries.size();
+        return new ConnectedClientStatsResponse(entries, 
totalConnectedClients, connectionsByUser);
+    }
+
+    private ConnectedClientStatsResponse connectedClientSummary()
+    {
+        ConnectedClientStatsSummary summary = dbAccessor.summary();
+        return new ConnectedClientStatsResponse(null, 
summary.totalConnectedClients, summary.connectionsByUser);
+    }
+
+    private List<ClientConnectionEntry> 
statsToEntries(Stream<ConnectedClientStats> stats)
+    {
+        return stats.map(CassandraMetricsOperations::statToEntry)
+                    .collect(Collectors.toList());
+    }
+
+    private static @NotNull ClientConnectionEntry 
statToEntry(ConnectedClientStats stat)
+    {
+        // Note: We explicitly use constructor params based object creation 
instead of builder in order to optimize the
+        // number of potential objects created for each row of the table 
queried, specifically since we know this can be large
+        return new ClientConnectionEntry(stat.address,
+                                         stat.port,
+                                         stat.sslEnabled,
+                                         stat.sslCipherSuite,
+                                         stat.sslProtocol,
+                                         stat.protocolVersion,
+                                         stat.username,
+                                         stat.requestCount,
+                                         stat.driverName,
+                                         stat.driverVersion,
+                                         stat.keyspaceName,
+                                         stat.clientOptions,
+                                         stat.authenticationMode,
+                                         stat.authenticationMetadata);
+    }
+}
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
new file mode 100644
index 00000000..6142ad62
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.Map;
+
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.sidecar.db.DataObjectMappingException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Representation of the connected clients metadata
+ */
+public class ConnectedClientStats
+{
+    public final String address;
+    public final int port;
+    public final String hostname;
+    public final String username;
+    public final String connectionStage;
+    public final String protocolVersion;
+    public final String driverName;
+    public final String driverVersion;
+    public final boolean sslEnabled;
+    public final String sslProtocol;
+    public final String sslCipherSuite;
+    public final long requestCount;
+    public final Map<String, String> clientOptions;
+    public final String keyspaceName;
+    public final String authenticationMode;
+    public final Map<String, String> authenticationMetadata;
+
+    public static ConnectedClientStats from(@NotNull Row row) throws 
DataObjectMappingException
+    {
+        return new ConnectedClientStats(row);
+    }
+
+    public ConnectedClientStats(@NotNull Row row)
+    {
+        this.address = row.getInet("address").getHostAddress();
+        this.port = row.getInt("port");
+        this.hostname = row.getString("hostname");
+        this.username = row.getString("username");
+        this.connectionStage = row.getString("connection_stage");
+        this.protocolVersion = 
Integer.toString(row.getInt("protocol_version"));
+        this.driverName = row.getString("driver_name");
+        this.driverVersion = row.getString("driver_version");
+        this.sslEnabled = row.getBool("ssl_enabled");
+        this.sslProtocol = row.getString("ssl_protocol");
+        this.sslCipherSuite = row.getString("ssl_cipher_suite");
+        this.requestCount = row.getLong("request_count");
+        this.keyspaceName = getStringFieldIfExists(row, "keyspace_name");
+        this.authenticationMode = getStringFieldIfExists(row, 
"authentication_mode");
+        this.authenticationMetadata = getMapFieldIfExists(row, 
"authentication_metadata");
+        this.clientOptions = getMapFieldIfExists(row, "client_options");
+    }
+
+    public String getStringFieldIfExists(@NotNull Row row, String fieldName)
+    {
+        return (row.getColumnDefinitions().contains(fieldName)) ? 
row.getString(fieldName) : null;
+    }
+
+    public Map getMapFieldIfExists(@NotNull Row row, String fieldName)
+    {
+        if ((row.getColumnDefinitions().contains(fieldName)))
+        {
+            return row.getMap(fieldName, String.class, String.class);
+        }
+        return null;
+    }
+}
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
new file mode 100644
index 00000000..38cf1354
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import 
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.DatabaseAccessor;
+
+/**
+ * DataAccessor implementation to read client connection stats from the table 
represented in {@link ConnectedClientsSchema}
+ */
+public class ConnectedClientStatsDatabaseAccessor extends 
DatabaseAccessor<ConnectedClientsSchema>
+{
+    public ConnectedClientStatsDatabaseAccessor(CQLSessionProvider 
sessionProvider, ConnectedClientsSchema tableSchema)
+    {
+        super(tableSchema, sessionProvider);
+    }
+
+    /**
+     * Query for a summary of the client connection stats
+     * @return {@link ConnectedClientStatsSummary} with total connections and 
counts grouped by user
+     */
+    public ConnectedClientStatsSummary summary()
+    {
+        tableSchema.prepareStatements(session());
+        BoundStatement statement = tableSchema.connectionsByUser().bind();
+        ResultSet resultSet = execute(statement);
+        return ConnectedClientStatsSummary.from(resultSet);
+    }
+
+    /**
+     * Query for all the client connection stats with an entry per connection
+     * @return {@link ConnectedClientStats} for each connection
+     */
+    public Stream<ConnectedClientStats> stats()
+    {
+        tableSchema.prepareStatements(session());
+        BoundStatement statement = tableSchema.stats().bind();
+        ResultSet resultSet = execute(statement);
+        return StreamSupport.stream(resultSet.spliterator(), false)
+                            .map(ConnectedClientStats::from);
+    }
+}
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
new file mode 100644
index 00000000..c59da807
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.sidecar.db.DataObjectMappingException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Representation of a summary of client connections stats
+ */
+public class ConnectedClientStatsSummary
+{
+    public final int totalConnectedClients;
+    public final Map<String, Long> connectionsByUser;
+
+    public static ConnectedClientStatsSummary from(@NotNull ResultSet 
resultSet) throws DataObjectMappingException
+    {
+
+        Map<String, Long> resultMap = 
StreamSupport.stream(resultSet.spliterator(), false)
+                                                             
.collect(Collectors.toMap(r -> r.getString("username"),
+                                                                         r -> 
r.getLong("connection_count")));
+        int totalConnections = 
resultMap.values().stream().mapToInt(Math::toIntExact).sum();
+
+        return new ConnectedClientStatsSummary(resultMap, totalConnections);
+    }
+
+    public ConnectedClientStatsSummary(Map<String, Long> connectionsByUser, 
int totalConnections)
+    {
+        this.totalConnectedClients = totalConnections;
+        this.connectionsByUser = connectionsByUser;
+    }
+
+
+}
+
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
new file mode 100644
index 00000000..63701438
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.sidecar.db.schema.TableSchema;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Holds the prepared statements for operations related to client connection 
stats retrieved from
+ * the "clients" virtual table
+ */
+public class ConnectedClientsSchema extends TableSchema
+{
+    private static final String TABLE_NAME = "clients";
+    private static final String KEYSPACE_NAME = "system_views";
+
+    private PreparedStatement statsStatement;
+    private PreparedStatement connectionsByUserStatement;
+
+    @Override
+    protected String keyspaceName()
+    {
+        return KEYSPACE_NAME;
+    }
+
+    public void prepareStatements(@NotNull Session session)
+    {
+        statsStatement = prepare(statsStatement, session, statsStatement());
+        connectionsByUserStatement = prepare(connectionsByUserStatement, 
session, selectConnectionsByUserStatement());
+    }
+
+    @Override
+    protected boolean initializeInternal(@NotNull Session session)
+    {
+        prepareStatements(session);
+        return true;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return null;
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_NAME;
+    }
+
+    public PreparedStatement stats()
+    {
+        return statsStatement;
+    }
+
+    public PreparedStatement connectionsByUser()
+    {
+        return connectionsByUserStatement;
+    }
+
+    private String statsStatement()
+    {
+        return String.format("SELECT * FROM %s.%s;", KEYSPACE_NAME, 
TABLE_NAME);
+    }
+
+    static String selectConnectionsByUserStatement()
+    {
+        return String.format("SELECT username, COUNT(*) AS connection_count " +
+                             "FROM %s.%s;",
+                             KEYSPACE_NAME,
+                             TABLE_NAME);
+    }
+}
diff --git 
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
new file mode 100644
index 00000000..fdb6b86b
--- /dev/null
+++ 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.Row;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for ConnectedClientStats transformed from the cassandra driver results
+ */
+public class ConnectedClientStatsTest
+{
+    private static final String TEST_SPIFFE_IDENTITY = 
"spiffe://test.cassandra.apache.org/unitTest/mtls";
+    Row mockRow = mock(Row.class);
+
+    @Test
+    public void connectedClientStatsTest()
+    {
+        setupMockData(mockRow, false);
+        ConnectedClientStats stats = new ConnectedClientStats(mockRow);
+        assertThat(stats).isNotNull();
+        assertThat(stats.authenticationMetadata).isNotNull();
+        assertThat(stats.authenticationMetadata.keySet()).contains("identity");
+        
assertThat(stats.authenticationMetadata.get("identity")).isEqualTo(TEST_SPIFFE_IDENTITY);
+        assertThat(stats.clientOptions).isNotNull();
+        assertThat(stats.clientOptions.get("CQL_VERSION")).isEqualTo("3.4.6");
+    }
+
+    @Test
+    public void connectedClientStatsMissingFieldsTest()
+    {
+        setupMockData(mockRow, true);
+        ConnectedClientStats stats = new ConnectedClientStats(mockRow);
+        assertThat(stats).isNotNull();
+        assertThat(stats.keyspaceName).isNull();
+        assertThat(stats.authenticationMode).isNull();
+        assertThat(stats.authenticationMetadata).isNull();
+        assertThat(stats.clientOptions).isNull();
+    }
+
+    private void setupMockData(Row mockRow, boolean isMissingFields)
+    {
+        ColumnDefinitions mockColumnDefinitions = 
mock(ColumnDefinitions.class);
+        when(mockRow.getColumnDefinitions()).thenReturn(mockColumnDefinitions);
+
+        String ks;
+        String authMode;
+        Map<String, String> authMetadata = null;
+        Map<String, String> clientOptions = null;
+
+        // Case where the fields introduced in newer C* versions are not 
present
+        if (isMissingFields)
+        {
+            ks = null;
+            authMode = null;
+            
when(mockRow.getColumnDefinitions().contains(anyString())).thenAnswer(i -> {
+                String input = i.getArgument(0, String.class);
+                return !("keyspace_name".equals(input)
+                         || "authentication_mode".equals(input)
+                         || "authentication_metadata".equals(input)
+                         || "client_options".equals(input));
+            });
+        }
+        else
+        {
+            ks = "test";
+            authMode = "password";
+            authMetadata = new HashMap<String, String>()
+            {
+                {
+                put("identity", TEST_SPIFFE_IDENTITY);
+                }
+            };
+            clientOptions = new HashMap<String, String>()
+            {
+                {
+                put("CQL_VERSION", "3.4.6");
+                put("DRIVER_NAME", "DataStax Python Driver");
+                put("DRIVER_VERSION", "3.25.0");
+                }
+            };
+            
when(mockRow.getColumnDefinitions().contains(anyString())).thenReturn(true);
+        }
+
+        
when(mockRow.getInet("address")).thenReturn(InetAddress.getLoopbackAddress());
+        when(mockRow.getInt("port")).thenReturn(0);
+        when(mockRow.getString("hostname")).thenReturn("localhost");
+        when(mockRow.getString("username")).thenReturn("u1");
+        when(mockRow.getString("connection_stage")).thenReturn("test");
+        when(mockRow.getInt("protocol_version")).thenReturn(5);
+        when(mockRow.getString("driver_name")).thenReturn("TestDriver");
+        when(mockRow.getString("driver_version")).thenReturn("TestVersion");
+        when(mockRow.getBool("ssl_enabled")).thenReturn(false);
+        when(mockRow.getString("ssl_protocol")).thenReturn("");
+        when(mockRow.getString("ssl_cipher_suite")).thenReturn("");
+        when(mockRow.getLong("request_count")).thenReturn(10L);
+        when(mockRow.getString("keyspace_name")).thenReturn(ks);
+        when(mockRow.getMap("authentication_metadata", String.class, 
String.class)).thenReturn(authMetadata);
+        when(mockRow.getString("authentication_mode")).thenReturn(authMode);
+        when(mockRow.getMap("client_options", String.class, 
String.class)).thenReturn(clientOptions);
+
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 12fd1f79..ec4c7b7e 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -109,6 +109,8 @@ public final class ApiEndpointsV1
     public static final String ABORT_RESTORE_JOB_ROUTE = RESTORE_JOB_ROUTE + 
ABORT;
     public static final String RESTORE_JOB_PROGRESS_ROUTE = RESTORE_JOB_ROUTE 
+ PROGRESS;
 
+    public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + 
CASSANDRA + "/stats/connected-clients";
+
     private ApiEndpointsV1()
     {
         throw new IllegalStateException(getClass() + " is a constants 
container and shall not be instantiated");
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
similarity index 53%
copy from 
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
copy to 
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
index 8d884831..7d8d7b98 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
@@ -16,25 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.db.schema;
+package org.apache.cassandra.sidecar.common.request;
 
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.Metadata;
-import org.jetbrains.annotations.NotNull;
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 
 /**
- * Abstract base schema class for table schema
+ * Represents a request get the stats related to the clients connected to the 
node
  */
-public abstract class TableSchema extends AbstractSchema
+public class ConnectedClientStatsRequest extends 
JsonRequest<ConnectedClientStatsResponse>
 {
-    protected abstract String tableName();
+    /**
+     * Constructs a request to retrieve the Cassandra node stats connected 
clients information
+     */
+    public ConnectedClientStatsRequest()
+    {
+        super(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE);
+    }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    protected boolean exists(@NotNull Metadata metadata)
+    public HttpMethod method()
     {
-        KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceName());
-        if (ksMetadata == null)
-            return false;
-        return ksMetadata.getTable(tableName()) != null;
+        return HttpMethod.GET;
     }
+
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
new file mode 100644
index 00000000..fb243187
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+
+/**
+ * Class response for the ConnectedClientStats API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ConnectedClientStatsResponse
+{
+
+    @JsonProperty("clientConnections")
+    public List<ClientConnectionEntry> clientConnections()
+    {
+        return clientConnections;
+    }
+
+    @JsonProperty("totalConnectedClients")
+    public long totalConnectedClients()
+    {
+        return totalConnectedClients;
+    }
+
+    @JsonProperty("connectionsByUser")
+    public Map<String, Long> connectionsByUser()
+    {
+        return connectionsByUser;
+    }
+
+    private final List<ClientConnectionEntry> clientConnections;
+    private final long totalConnectedClients;
+    private final Map<String, Long> connectionsByUser;
+
+
+    /**
+     * Constructs a new {@link ConnectedClientStatsResponse}.
+     *
+     * @param clientConnections list of client connection stats
+     * @param totalConnectedClients total count of the connected clients
+     * @param connectionsByUser mapping of user to the no. connections
+     */
+    @JsonCreator
+    public ConnectedClientStatsResponse(@JsonProperty("clientConnections") 
List<ClientConnectionEntry> clientConnections,
+                                        @JsonProperty("totalConnectedClients") 
long totalConnectedClients,
+                                        @JsonProperty("connectionsByUser") 
Map<String, Long> connectionsByUser)
+    {
+        this.clientConnections = clientConnections;
+        this.totalConnectedClients = totalConnectedClients;
+        this.connectionsByUser = connectionsByUser;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
new file mode 100644
index 00000000..afb4c824
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Class representing an entry of the Client Connection Stats
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ClientConnectionEntry
+{
+    @JsonProperty("address")
+    public String address()
+    {
+        return address;
+    }
+
+    @JsonProperty("port")
+    public int port()
+    {
+        return port;
+    }
+
+    @JsonProperty("sslEnabled")
+    public Boolean sslEnabled()
+    {
+        return sslEnabled;
+    }
+
+    @JsonProperty("sslCipherSuite")
+    public String sslCipherSuite()
+    {
+        return sslCipherSuite;
+    }
+
+    @JsonProperty("sslProtocol")
+    public String sslProtocol()
+    {
+        return sslProtocol;
+    }
+
+    @JsonProperty("protocolVersion")
+    public String protocolVersion()
+    {
+        return protocolVersion;
+    }
+
+    @JsonProperty("username")
+    public String username()
+    {
+        return username;
+    }
+
+    @JsonProperty("requestCount")
+    public long requestCount()
+    {
+        return requestCount;
+    }
+
+    @JsonProperty("driverName")
+    public String driverName()
+    {
+        return driverName;
+    }
+
+    @JsonProperty("driverVersion")
+    public String driverVersion()
+    {
+        return driverVersion;
+    }
+    @JsonProperty("keyspaceName")
+    public String keyspaceName()
+    {
+        return keyspaceName;
+    }
+
+    @JsonProperty("clientOptions")
+    public Map<String, String> clientOptions()
+    {
+        return clientOptions;
+    }
+
+    @JsonProperty("authenticationMode")
+    public String authenticationMode()
+    {
+        return authenticationMode;
+    }
+
+    @JsonProperty("authenticationMetadata")
+    public Map<String, String> authenticationMetadata()
+    {
+        return authenticationMetadata;
+    }
+
+
+    private final String address;
+    private final int port;
+    private final Boolean sslEnabled;
+    private final String sslCipherSuite;
+    private final String sslProtocol;
+    private final String protocolVersion;
+    private final String username;
+    private final long requestCount;
+    private final String driverName;
+    private final String driverVersion;
+
+    private final Map<String, String> clientOptions;
+    private final String keyspaceName;
+    private final String authenticationMode;
+    private final Map<String, String> authenticationMetadata;
+
+
+    @JsonCreator
+    public ClientConnectionEntry(@NotNull @JsonProperty("address") String 
address,
+                                 @NotNull @JsonProperty("port") int port,
+                                 @NotNull @JsonProperty("sslEnabled") boolean 
sslEnabled,
+                                 @JsonProperty("sslCipherSuite") String 
sslCipherSuite,
+                                 @JsonProperty("sslProtocol") String 
sslProtocol,
+                                 @NotNull @JsonProperty("protocolVersion") 
String protocolVersion,
+                                 @NotNull @JsonProperty("username") String 
username,
+                                 @NotNull @JsonProperty("requestCount") long 
requestCount,
+                                 @NotNull @JsonProperty("driverName") String 
driverName,
+                                 @NotNull @JsonProperty("driverVersion") 
String driverVersion,
+                                 @JsonProperty("keyspaceName") String 
keyspaceName,
+                                 @JsonProperty("clientOptions") Map<String, 
String> clientOptions,
+                                 @JsonProperty("authenticationMode") String 
authMode,
+                                 @JsonProperty("authenticationMetadata") 
Map<String, String> authMetadata)
+    {
+        this.address = address;
+        this.port = port;
+        this.sslEnabled = sslEnabled;
+        this.sslCipherSuite = sslCipherSuite;
+        this.sslProtocol = sslProtocol;
+        this.protocolVersion = protocolVersion;
+        this.username = username;
+        this.requestCount = requestCount;
+        this.driverName = driverName;
+        this.driverVersion = driverVersion;
+        this.keyspaceName = keyspaceName;
+        this.clientOptions = clientOptions;
+        this.authenticationMode = authMode;
+        this.authenticationMetadata = authMetadata;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 61120d5f..96082a61 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -31,6 +31,7 @@ import 
org.apache.cassandra.sidecar.common.request.CassandraJmxHealthRequest;
 import 
org.apache.cassandra.sidecar.common.request.CassandraNativeHealthRequest;
 import 
org.apache.cassandra.sidecar.common.request.CleanSSTableUploadSessionRequest;
 import org.apache.cassandra.sidecar.common.request.ClearSnapshotRequest;
+import org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest;
 import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest;
 import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
@@ -474,6 +475,17 @@ public class RequestContext
             return request(new UploadSSTableRequest(keyspace, tableName, 
uploadId, component, digest, filename));
         }
 
+        /**
+         * Sets the {@code request} to be a {@link 
ConnectedClientStatsRequest} and returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder connectedClientStatsRequest()
+        {
+            return request(new ConnectedClientStatsRequest());
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an
          * {@link 
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} 
configured with
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index f9f12643..1925e0ad 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -48,6 +48,7 @@ import 
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayloa
 import org.apache.cassandra.sidecar.common.request.data.Digest;
 import 
org.apache.cassandra.sidecar.common.request.data.RestoreJobProgressRequestParams;
 import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -580,6 +581,18 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                             .build());
     }
 
+    /**
+     * Executes the connected client stats request using the default retry 
policy and configured selection policy
+     *
+     * @return a completable future of the connected client stats
+     */
+    public CompletableFuture<ConnectedClientStatsResponse> 
connectedClientStats()
+    {
+        return executeRequestAsync(requestBuilder()
+                                   .connectedClientStatsRequest()
+                                   .build());
+    }
+
     /**
      * Returns a copy of the request builder with the default parameters 
configured for the client.
      *
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index d576f713..8ac0bb3a 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.sidecar.common.request.Request;
 import 
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
 import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
 import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -75,6 +76,7 @@ import 
org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
 import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
 import 
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.response.data.RingEntry;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
@@ -1398,6 +1400,46 @@ abstract class SidecarClientTest
                                });
     }
 
+    @Test
+    public void testConnectedClientStats() throws Exception
+    {
+        String connectedClientStatsResponseAsString = 
"{\"clientConnections\":[{\"address\":\"127.0.0.1\",\"port\":54628" +
+                                                      
",\"sslEnabled\":false,\"sslCipherSuite\":\"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256\""
 +
+                                                      
",\"sslProtocol\":\"TLSv1.2\",\"protocolVersion\":\"5\",\"username\":\"anonymous\""
 +
+                                                      
",\"requestCount\":39,\"driverName\":\"DataStax Java Driver\"" +
+                                                      
",\"driverVersion\":\"3.11.3\",\"keyspaceName\":\"test\"" +
+                                                      
",\"authenticationMode\":\"MutualTls\"" +
+                                                      
",\"authenticationMetadata\":{\"identity\":\"spiffe://test.cassandra.apache.org/unitTest/mtls\"}"
 +
+                                                      
",\"clientOptions\":{\"CQL_VERSION\":\"3.4.6\",\"DRIVER_NAME\":\"DataStax 
Python Driver\"" +
+                                                      
",\"DRIVER_VERSION\":\"3.25.0\"}}],\"totalConnectedClients\":1" +
+                                                      
",\"connectionsByUser\":{\"anonymous\":1}}";
+
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(connectedClientStatsResponseAsString);
+        enqueue(response);
+        ConnectedClientStatsResponse result = 
client.connectedClientStats().get();
+
+        assertThat(result).isNotNull();
+        assertThat(result.clientConnections()).isNotNull().hasSize(1);
+        assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1);
+        
assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous");
+        ClientConnectionEntry entry = 
result.clientConnections().iterator().next();
+        assertThat(entry.address()).isEqualTo("127.0.0.1");
+        assertThat(entry.port()).isEqualTo(54628);
+        assertThat(entry.sslEnabled()).isEqualTo(false);
+        
assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256");
+        assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2");
+        assertThat(entry.protocolVersion()).isEqualTo("5");
+        assertThat(entry.username()).isEqualTo("anonymous");
+        assertThat(entry.requestCount()).isEqualTo(39);
+        assertThat(entry.driverName()).isEqualTo("DataStax Java Driver");
+        assertThat(entry.driverVersion()).isEqualTo("3.11.3");
+        assertThat(entry.keyspaceName()).isEqualTo("test");
+        assertThat(entry.authenticationMode()).isEqualTo("MutualTls");
+        assertThat(entry.authenticationMetadata()).containsKey("identity");
+        assertThat(entry.clientOptions()).containsKeys("CQL_VERSION", 
"DRIVER_NAME", "DRIVER_VERSION");
+        validateResponseServed(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE);
+    }
+
     private void enqueue(MockResponse response)
     {
         for (MockWebServer server : servers)
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
index 8bb87a72..4db86f49 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
@@ -82,6 +82,12 @@ public interface ICassandraAdapter
      */
     StorageOperations storageOperations();
 
+    /**
+     * @return the {@link MetricsOperations} implementation for the Cassandra 
cluster
+     */
+    MetricsOperations metricsOperations();
+
+
     /**
      * @return the {@link ClusterMembershipOperations} implementation for 
handling cluster membership operations
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
similarity index 59%
copy from 
server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
copy to 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
index 8b532890..ef7a5f95 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
@@ -16,15 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.db;
+package org.apache.cassandra.sidecar.common.server;
+
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 
 /**
- * Exception thrown during mapping a data object into/from database record
+ * An interface that defines interactions with the metrics system in Cassandra.
  */
-public class DataObjectMappingException extends RuntimeException
+public interface MetricsOperations
 {
-    public DataObjectMappingException(String message, Throwable cause)
-    {
-        super(message, cause);
-    }
+    /**
+     * Retrieve the connected client stats metrics from the cluster
+     * @param summaryOnly boolean parameter to list connection summary only
+     * @return the requested client stats, in full or summary
+     */
+    ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
+
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
similarity index 100%
rename from 
server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
rename to 
server-common/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
similarity index 89%
rename from 
server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
rename to 
server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
index d435bde7..37c1a1f3 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
@@ -25,7 +25,6 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
-import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.db.schema.TableSchema;
 import org.jetbrains.annotations.NotNull;
 
@@ -36,17 +35,14 @@ import org.jetbrains.annotations.NotNull;
  */
 public abstract class DatabaseAccessor<T extends TableSchema>
 {
-    public final SidecarSchema sidecarSchema;
     public final CQLSessionProvider cqlSessionProvider;
     protected final T tableSchema;
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    protected DatabaseAccessor(SidecarSchema sidecarSchema,
-                               T tableSchema,
+    protected DatabaseAccessor(T tableSchema,
                                CQLSessionProvider sessionProvider)
     {
-        this.sidecarSchema = sidecarSchema;
         this.tableSchema = tableSchema;
         this.cqlSessionProvider = sessionProvider;
     }
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
new file mode 100644
index 00000000..7301e7b4
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import 
org.apache.cassandra.sidecar.exceptions.SidecarSchemaModificationException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Abstract schema
+ */
+public abstract class AbstractSchema
+{
+    protected Logger logger = LoggerFactory.getLogger(this.getClass());
+    private boolean initialized = false;
+
+    public synchronized boolean initialize(@NotNull Session session)
+    {
+        initialized = initialized || initializeInternal(session);
+        return initialized;
+    }
+
+    protected PreparedStatement prepare(PreparedStatement cached, Session 
session, String cqlLiteral)
+    {
+        return cached == null ? session.prepare(cqlLiteral) : cached;
+    }
+
+    protected boolean initializeInternal(@NotNull Session session)
+    {
+        if (!exists(session.getCluster().getMetadata()))
+        {
+            try
+            {
+                ResultSet res = session.execute(createSchemaStatement());
+                if (!res.getExecutionInfo().isSchemaInAgreement())
+                {
+                    logger.warn("Schema is not yet in agreement.");
+                    return false;
+                }
+            }
+            catch (Exception exception)
+            {
+                String schemaName = this.getClass().getSimpleName();
+                throw new SidecarSchemaModificationException("Failed to modify 
schema for " + schemaName, exception);
+            }
+        }
+
+        prepareStatements(session);
+        return true;
+    }
+
+    protected abstract String keyspaceName();
+
+    protected abstract void prepareStatements(@NotNull Session session);
+
+    protected abstract boolean exists(@NotNull Metadata metadata);
+
+    protected abstract String createSchemaStatement();
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
similarity index 100%
rename from 
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
rename to 
server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
similarity index 100%
rename from 
server/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
rename to 
server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index cc19b1bf..c1de10cf 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -50,6 +50,7 @@ import 
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.server.ClusterMembershipOperations;
 import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.common.server.TableOperations;
 import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
@@ -402,6 +403,13 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         return fromAdapter(ICassandraAdapter::storageOperations);
     }
 
+    @Nullable
+    @Override
+    public MetricsOperations metricsOperations()
+    {
+        return fromAdapter(ICassandraAdapter::metricsOperations);
+    }
+
     @Nullable
     @Override
     public ClusterMembershipOperations clusterMembershipOperations()
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index fd942c67..b75fb132 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -55,13 +55,16 @@ import org.jetbrains.annotations.Nullable;
 public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor<RestoreJobsSchema>
 {
     private static final ObjectMapper MAPPER = new ObjectMapper();
+    public final SidecarSchema sidecarSchema;
 
     @Inject
     public RestoreJobDatabaseAccessor(SidecarSchema sidecarSchema,
                                       RestoreJobsSchema restoreJobsSchema,
                                       CQLSessionProvider cqlSessionProvider)
     {
-        super(sidecarSchema, restoreJobsSchema, cqlSessionProvider);
+        super(restoreJobsSchema, cqlSessionProvider);
+        this.sidecarSchema = sidecarSchema;
+
     }
 
     public RestoreJob create(CreateRestoreJobRequestPayload payload, 
QualifiedTableName qualifiedTableName)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
index 31c5a2da..516611a4 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
@@ -38,12 +38,15 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 @Singleton
 public class RestoreRangeDatabaseAccessor extends 
DatabaseAccessor<RestoreRangesSchema>
 {
+    private final SidecarSchema sidecarSchema;
+
     @Inject
     protected RestoreRangeDatabaseAccessor(SidecarSchema sidecarSchema,
                                            RestoreRangesSchema tableSchema,
                                            CQLSessionProvider sessionProvider)
     {
-        super(sidecarSchema, tableSchema, sessionProvider);
+        super(tableSchema, sessionProvider);
+        this.sidecarSchema = sidecarSchema;
     }
 
     public RestoreRange create(RestoreRange range)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index 21c97a3f..640bd427 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -40,12 +40,15 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 @Singleton
 public class RestoreSliceDatabaseAccessor extends 
DatabaseAccessor<RestoreSlicesSchema>
 {
+    private final SidecarSchema sidecarSchema;
+
     @Inject
     protected RestoreSliceDatabaseAccessor(SidecarSchema sidecarSchema,
                                            RestoreSlicesSchema 
restoreSlicesSchema,
                                            CQLSessionProvider 
cqlSessionProvider)
     {
-        super(sidecarSchema, restoreSlicesSchema, cqlSessionProvider);
+        super(restoreSlicesSchema, cqlSessionProvider);
+        this.sidecarSchema = sidecarSchema;
     }
 
     public RestoreSlice create(RestoreSlice slice)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
index 0a7825fc..a5f22e4c 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
@@ -39,7 +39,7 @@ import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
 
 /**
- * Encapsulates all {@link AbstractSchema} related operations for features 
provided by Sidecar
+ * Encapsulates all related operations for features provided by Sidecar
  */
 public class SidecarSchema
 {
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
index 16b107f8..127b099d 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
@@ -19,11 +19,12 @@
 package org.apache.cassandra.sidecar.metrics;
 
 import com.codahale.metrics.MetricRegistry;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 
 import static org.apache.cassandra.sidecar.metrics.ServerMetrics.SERVER_PREFIX;
 
 /**
- * Tracks metrics for {@link 
org.apache.cassandra.sidecar.db.schema.SidecarSchema} and other schema related 
handling
+ * Tracks metrics for {@link SidecarSchema} and other schema related handling
  * done by Sidecar
  */
 public class SchemaMetrics
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
index 2e491b7f..6b986abf 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.metrics;
 
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+
 import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX;
 
 /**
@@ -43,7 +45,7 @@ public interface ServerMetrics
     RestoreMetrics restore();
 
     /**
-     * @return metrics related to {@link 
org.apache.cassandra.sidecar.db.schema.SidecarSchema} that are tracked.
+     * @return metrics related to {@link SidecarSchema} that are tracked.
      */
     SchemaMetrics schema();
 
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
index 976256a0..0c3aa763 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.routes;
 
 import java.util.NoSuchElementException;
+import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +32,9 @@ import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.handler.HttpException;
 import 
org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 import 
org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException;
@@ -40,6 +43,7 @@ import 
org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
 import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
 
 
@@ -98,6 +102,25 @@ public abstract class AbstractHandler<T> implements 
Handler<RoutingContext>
         }
     }
 
+    protected void ifMetricsOpsAvailable(RoutingContext context,
+                                         String host,
+                                         Consumer<MetricsOperations> 
ifAvailable)
+    {
+        CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+        MetricsOperations operations = delegate.metricsOperations();
+        if (operations == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+         ifAvailable.accept(operations);
+    }
+
     /**
      * Extracts the request object from the {@code context}.
      *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
new file mode 100644
index 00000000..dc59fba7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Handler for retrieving stats for connected clients
+ */
+public class ConnectedClientStatsHandler extends AbstractHandler<Void>
+{
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the metadata fetcher
+     * @param executorPools   executor pools for blocking executions
+     */
+    @Inject
+    protected ConnectedClientStatsHandler(InstanceMetadataFetcher 
metadataFetcher, ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, null);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               Void request)
+    {
+
+        ifMetricsOpsAvailable(context, host, operations -> {
+            boolean summaryOnly = parseBooleanQueryParam(httpRequest, 
"summary", true);
+
+            executorPools.service()
+                         .executeBlocking(() -> 
operations.connectedClientStats(summaryOnly))
+                         .onSuccess(context::json)
+                         .onFailure(cause -> processFailure(cause, context, 
host, remoteAddress, request));
+        });
+    }
+
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 095e910a..a2f6fc47 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -86,6 +86,7 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
 import org.apache.cassandra.sidecar.routes.CassandraHealthHandler;
+import org.apache.cassandra.sidecar.routes.ConnectedClientStatsHandler;
 import org.apache.cassandra.sidecar.routes.DiskSpaceProtectionHandler;
 import org.apache.cassandra.sidecar.routes.FileStreamHandler;
 import org.apache.cassandra.sidecar.routes.GossipInfoHandler;
@@ -210,6 +211,7 @@ public class MainModule extends AbstractModule
                               AbortRestoreJobHandler abortRestoreJobHandler,
                               CreateRestoreSliceHandler 
createRestoreSliceHandler,
                               RestoreJobProgressHandler 
restoreJobProgressHandler,
+                              ConnectedClientStatsHandler 
connectedClientStatsHandler,
                               ErrorHandler errorHandler)
     {
         Router router = Router.router(vertx);
@@ -294,6 +296,9 @@ public class MainModule extends AbstractModule
         router.get(ApiEndpointsV1.RING_ROUTE)
               .handler(ringHandler);
 
+        router.get(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE)
+              .handler(connectedClientStatsHandler);
+
         router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE)
               .handler(ringHandler);
 
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
new file mode 100644
index 00000000..0a592bf6
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the client-stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class ConnectedClientStatsHandlerIntegrationTest extends 
IntegrationTestBase
+{
+    private static final int DEFAULT_CONNECTION_COUNT = 2;
+
+    @CassandraIntegrationTest
+    void retrieveClientStatsDefault(VertxTestContext context)
+    throws Exception
+    {
+        Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", true);
+        String testRoute = "/api/v1/cassandra/stats/connected-clients";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(response -> {
+                      assertClientStatsResponse(response, expectedParams);
+                      context.completeNow();
+                  }));
+        });
+    }
+
+    @CassandraIntegrationTest
+    void retrieveClientStatsListConnections(VertxTestContext context)
+    throws Exception
+    {
+
+        Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", false);
+        String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(response -> {
+                      assertClientStatsResponse(response, expectedParams);
+                      context.completeNow();
+                  }));
+        });
+    }
+
+    @CassandraIntegrationTest
+    void retrieveClientStatsListConnectionsWithKeyspace(VertxTestContext 
context)
+    throws Exception
+    {
+        createTestKeyspace();
+        Session session = maybeGetSession();
+        session.execute("USE " + TEST_KEYSPACE);
+
+        Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", false);
+        String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(response -> {
+                      assertClientStatsResponse(response, expectedParams, 4, 
true);
+                      context.completeNow();
+                  }));
+        });
+    }
+
+    @CassandraIntegrationTest
+    void retrieveClientStatsMultipleConnections(VertxTestContext context)
+    throws Exception
+    {
+        // Creates an additional connection pair
+        createTestKeyspace();
+        Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", false);
+        String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(response -> {
+                      assertClientStatsResponse(response, expectedParams, 4);
+                      context.completeNow();
+                  }));
+        });
+    }
+
+    /**
+     * Expects unrecognized params to be ignored and invalid value for the 
expected parameter to be defaulted to true
+     * to prevent heavyweight query in the bad request case.
+     */
+    @CassandraIntegrationTest
+    void retrieveClientStatsInvalidParamaterValue(VertxTestContext context)
+    throws Exception
+    {
+        Map<String, Boolean> expectedParams = new HashMap<>();
+        expectedParams.put("summary", true);
+        String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=123&bad-arg=xyz";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(response -> {
+                      assertClientStatsResponse(response, expectedParams);
+                      context.completeNow();
+                  }));
+        });
+    }
+
+    void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String, 
Boolean> params)
+    {
+        assertClientStatsResponse(response, params, DEFAULT_CONNECTION_COUNT);
+    }
+
+    void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String, 
Boolean> params, int expectedConnections)
+    {
+        assertClientStatsResponse(response, params, expectedConnections, 
false);
+    }
+    void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String, 
Boolean> params, int expectedConnections, boolean usingKeyspace)
+    {
+        boolean isSummary = params.get("summary");
+
+        logger.info("Response:" + response.bodyAsString());
+        ConnectedClientStatsResponse clientStats = 
response.bodyAsJson(ConnectedClientStatsResponse.class);
+        assertThat(clientStats).isNotNull();
+        assertThat(clientStats.connectionsByUser()).isNotEmpty();
+        assertThat(clientStats.connectionsByUser()).containsKey("anonymous");
+        
assertThat(clientStats.totalConnectedClients()).isEqualTo(expectedConnections);
+
+        List<ClientConnectionEntry> stats = clientStats.clientConnections();
+        if (isSummary)
+        {
+            assertThat(stats).isNull();
+        }
+        else
+        {
+            assertThat(stats.size()).isEqualTo(expectedConnections);
+            for (ClientConnectionEntry stat : stats)
+            {
+                assertThat(stat.address()).contains("127.0.0.1");
+                assertThat(stat.sslEnabled()).isEqualTo(false);
+                assertThat(stat.driverName()).isEqualTo("DataStax Java 
Driver");
+                assertThat(stat.driverVersion()).isNotNull();
+                assertThat(stat.username()).isEqualTo("anonymous");
+                if 
(sidecarTestContext.version.isGreaterThan(SimpleCassandraVersion.create("4.0.0")))
+                {
+                    assertThat(stat.clientOptions()).isNotNull();
+                    
assertThat(stat.clientOptions().containsKey("CQL_VERSION")).isTrue();
+                }
+            }
+
+            // TODO: Add validations for fields in trunk once dtest jars can 
advance beyond TCM commit
+            if (usingKeyspace
+               && 
sidecarTestContext.version.compareTo(SimpleCassandraVersion.create("5.0.0")) >= 
0)
+            {
+                
assertThat(stats.stream().map(ClientConnectionEntry::keyspaceName).collect(Collectors.toSet())).contains(TEST_KEYSPACE);
+            }
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
new file mode 100644
index 00000000..395e0359
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ConnectedClientStatsHandler} class
+ */
+@ExtendWith(VertxExtension.class)
+public class ConnectedClientStatsHandlerTest
+{
+
+    private static final int EXPECTED_TOTAL_CLIENTS = 10;
+    private static final Map<String, Long> EXPECTED_CONNECTIONS_BY_USER = new 
HashMap<String, Long>()
+    {
+        {
+        put("u1", 5L);
+        put("u2", 5L);
+        }
+    };
+    private static final List<ClientConnectionEntry> EXPECTED_STATS = 
Arrays.asList(
+    new ClientConnectionEntry("1", 0, false, null, null, "5", "u1", 5,
+                              "name", "version", "keyspace1", 
Collections.emptyMap(), null, Collections.emptyMap()),
+    new ClientConnectionEntry("1", 0, false, null, null, "5", "u1", 5,
+                              "name", "version", "keyspace1", 
Collections.emptyMap(), null, Collections.emptyMap()));
+    static final Logger LOGGER = 
LoggerFactory.getLogger(ConnectedClientStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new 
ConnectedClientStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+
+        server = injector.getInstance(Server.class);
+        vertx = injector.getInstance(Vertx.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testHandlerStats(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/connected-clients";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  ConnectedClientStatsResponse statsResponse = 
response.bodyAsJson(ConnectedClientStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.totalConnectedClients()).isEqualTo(EXPECTED_TOTAL_CLIENTS);
+                  
assertThat(statsResponse.connectionsByUser()).hasSize(EXPECTED_CONNECTIONS_BY_USER.size());
+                  context.completeNow();
+              }));
+    }
+
+    static class ConnectedClientStatsTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public InstancesConfig instanceConfig()
+        {
+            ConnectedClientStatsResponse summaryResponse = new 
ConnectedClientStatsResponse(Collections.emptyList(),
+                                                                               
             EXPECTED_TOTAL_CLIENTS,
+                                                                               
             EXPECTED_CONNECTIONS_BY_USER);
+            ConnectedClientStatsResponse statsResponse = new 
ConnectedClientStatsResponse(EXPECTED_STATS,
+                                                                               
           EXPECTED_TOTAL_CLIENTS,
+                                                                               
           EXPECTED_CONNECTIONS_BY_USER);
+
+            final int instanceId = 100;
+            final String host = "127.0.0.1";
+            final InstanceMetadata instanceMetadata = 
mock(InstanceMetadata.class);
+            when(instanceMetadata.host()).thenReturn(host);
+            when(instanceMetadata.port()).thenReturn(9042);
+            when(instanceMetadata.id()).thenReturn(instanceId);
+            when(instanceMetadata.stagingDir()).thenReturn("");
+
+            CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
+            MetricsOperations mockMetricsOperations = 
mock(MetricsOperations.class);
+            
when(mockMetricsOperations.connectedClientStats(true)).thenReturn(summaryResponse);
+            
when(mockMetricsOperations.connectedClientStats(false)).thenReturn(statsResponse);
+            
when(delegate.metricsOperations()).thenReturn(mockMetricsOperations);
+            when(instanceMetadata.delegate()).thenReturn(delegate);
+
+            InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+            
when(mockInstancesConfig.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+            
when(mockInstancesConfig.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+            
when(mockInstancesConfig.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+            return mockInstancesConfig;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to