This is an automated email from the ASF dual-hosted git repository.
jeetkundoug pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41ea72ba CASSANDRASC-144 Introduces sidecar endpoint to vend client
connection stats (#135)
41ea72ba is described below
commit 41ea72bafb37f971ecf2649587595da74de51556
Author: Arjun Ashok <[email protected]>
AuthorDate: Fri Oct 25 12:37:25 2024 -0700
CASSANDRASC-144 Introduces sidecar endpoint to vend client connection stats
(#135)
---
adapters/base/build.gradle | 1 +
.../sidecar/adapters/base/CassandraAdapter.java | 7 +
.../adapters/base/CassandraMetricsOperations.java | 104 +++++++++++
.../adapters/base/db/ConnectedClientStats.java | 87 ++++++++++
.../db/ConnectedClientStatsDatabaseAccessor.java | 64 +++++++
.../base/db/ConnectedClientStatsSummary.java | 56 ++++++
.../base/db/schema/ConnectedClientsSchema.java | 91 ++++++++++
.../adapters/base/db/ConnectedClientStatsTest.java | 130 ++++++++++++++
.../cassandra/sidecar/common/ApiEndpointsV1.java | 2 +
.../request/ConnectedClientStatsRequest.java | 31 ++--
.../response/ConnectedClientStatsResponse.java | 75 ++++++++
.../response/data/ClientConnectionEntry.java | 166 ++++++++++++++++++
.../cassandra/sidecar/client/RequestContext.java | 12 ++
.../cassandra/sidecar/client/SidecarClient.java | 13 ++
.../sidecar/client/SidecarClientTest.java | 42 +++++
.../sidecar/common/server/ICassandraAdapter.java | 6 +
.../sidecar/common/server/MetricsOperations.java | 19 +-
.../sidecar/db/DataObjectMappingException.java | 0
.../cassandra/sidecar/db/DatabaseAccessor.java | 6 +-
.../sidecar/db/schema/AbstractSchema.java | 81 +++++++++
.../cassandra/sidecar/db/schema/TableSchema.java | 0
.../SidecarSchemaModificationException.java | 0
.../sidecar/cluster/CassandraAdapterDelegate.java | 8 +
.../sidecar/db/RestoreJobDatabaseAccessor.java | 5 +-
.../sidecar/db/RestoreRangeDatabaseAccessor.java | 5 +-
.../sidecar/db/RestoreSliceDatabaseAccessor.java | 5 +-
.../cassandra/sidecar/db/schema/SidecarSchema.java | 2 +-
.../cassandra/sidecar/metrics/SchemaMetrics.java | 3 +-
.../cassandra/sidecar/metrics/ServerMetrics.java | 4 +-
.../cassandra/sidecar/routes/AbstractHandler.java | 23 +++
.../routes/ConnectedClientStatsHandler.java | 72 ++++++++
.../cassandra/sidecar/server/MainModule.java | 5 +
...ConnectedClientStatsHandlerIntegrationTest.java | 193 +++++++++++++++++++++
.../routes/ConnectedClientStatsHandlerTest.java | 169 ++++++++++++++++++
34 files changed, 1457 insertions(+), 30 deletions(-)
diff --git a/adapters/base/build.gradle b/adapters/base/build.gradle
index c34ae261..006b3f28 100644
--- a/adapters/base/build.gradle
+++ b/adapters/base/build.gradle
@@ -62,6 +62,7 @@ dependencies {
compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
+ testImplementation('com.datastax.cassandra:cassandra-driver-core:3.11.3')
testImplementation
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
testImplementation
"org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
testImplementation "org.assertj:assertj-core:3.24.2"
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index fb61054b..55a7b99b 100644
---
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -34,6 +34,7 @@ import
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.server.ClusterMembershipOperations;
import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
@@ -169,6 +170,12 @@ public class CassandraAdapter implements ICassandraAdapter
return new CassandraStorageOperations(jmxClient, dnsResolver);
}
+ @Override
+ public MetricsOperations metricsOperations()
+ {
+ return new CassandraMetricsOperations(cqlSessionProvider);
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
new file mode 100644
index 00000000..cc3ff9fc
--- /dev/null
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStats;
+import
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsDatabaseAccessor;
+import
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsSummary;
+import
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Default implementation that pulls methods from the Cassandra Metrics Proxy
+ */
+public class CassandraMetricsOperations implements MetricsOperations
+{
+ private final ConnectedClientStatsDatabaseAccessor dbAccessor;
+
+ /**
+ * Creates a new instance with the provided {@link CQLSessionProvider}
+ */
+ public CassandraMetricsOperations(CQLSessionProvider session)
+ {
+ this.dbAccessor = new ConnectedClientStatsDatabaseAccessor(session,
new ConnectedClientsSchema());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ConnectedClientStatsResponse connectedClientStats(boolean
summaryOnly)
+ {
+ if (summaryOnly)
+ {
+ return connectedClientSummary();
+ }
+ return connectedClientDetails();
+ }
+
+ public ConnectedClientStatsResponse connectedClientDetails()
+ {
+ List<ClientConnectionEntry> entries =
statsToEntries(dbAccessor.stats());
+ Map<String, Long> connectionsByUser =
entries.stream().collect(Collectors.groupingBy(ClientConnectionEntry::username,
+
Collectors.counting()));
+ long totalConnectedClients = entries.size();
+ return new ConnectedClientStatsResponse(entries,
totalConnectedClients, connectionsByUser);
+ }
+
+ private ConnectedClientStatsResponse connectedClientSummary()
+ {
+ ConnectedClientStatsSummary summary = dbAccessor.summary();
+ return new ConnectedClientStatsResponse(null,
summary.totalConnectedClients, summary.connectionsByUser);
+ }
+
+ private List<ClientConnectionEntry>
statsToEntries(Stream<ConnectedClientStats> stats)
+ {
+ return stats.map(CassandraMetricsOperations::statToEntry)
+ .collect(Collectors.toList());
+ }
+
+ private static @NotNull ClientConnectionEntry
statToEntry(ConnectedClientStats stat)
+ {
+ // Note: We explicitly use constructor params based object creation
instead of builder in order to optimize the
+ // number of potential objects created for each row of the table
queried, specifically since we know this can be large
+ return new ClientConnectionEntry(stat.address,
+ stat.port,
+ stat.sslEnabled,
+ stat.sslCipherSuite,
+ stat.sslProtocol,
+ stat.protocolVersion,
+ stat.username,
+ stat.requestCount,
+ stat.driverName,
+ stat.driverVersion,
+ stat.keyspaceName,
+ stat.clientOptions,
+ stat.authenticationMode,
+ stat.authenticationMetadata);
+ }
+}
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
new file mode 100644
index 00000000..6142ad62
--- /dev/null
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.Map;
+
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.sidecar.db.DataObjectMappingException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Representation of the connected clients metadata
+ */
+public class ConnectedClientStats
+{
+ public final String address;
+ public final int port;
+ public final String hostname;
+ public final String username;
+ public final String connectionStage;
+ public final String protocolVersion;
+ public final String driverName;
+ public final String driverVersion;
+ public final boolean sslEnabled;
+ public final String sslProtocol;
+ public final String sslCipherSuite;
+ public final long requestCount;
+ public final Map<String, String> clientOptions;
+ public final String keyspaceName;
+ public final String authenticationMode;
+ public final Map<String, String> authenticationMetadata;
+
+ public static ConnectedClientStats from(@NotNull Row row) throws
DataObjectMappingException
+ {
+ return new ConnectedClientStats(row);
+ }
+
+ public ConnectedClientStats(@NotNull Row row)
+ {
+ this.address = row.getInet("address").getHostAddress();
+ this.port = row.getInt("port");
+ this.hostname = row.getString("hostname");
+ this.username = row.getString("username");
+ this.connectionStage = row.getString("connection_stage");
+ this.protocolVersion =
Integer.toString(row.getInt("protocol_version"));
+ this.driverName = row.getString("driver_name");
+ this.driverVersion = row.getString("driver_version");
+ this.sslEnabled = row.getBool("ssl_enabled");
+ this.sslProtocol = row.getString("ssl_protocol");
+ this.sslCipherSuite = row.getString("ssl_cipher_suite");
+ this.requestCount = row.getLong("request_count");
+ this.keyspaceName = getStringFieldIfExists(row, "keyspace_name");
+ this.authenticationMode = getStringFieldIfExists(row,
"authentication_mode");
+ this.authenticationMetadata = getMapFieldIfExists(row,
"authentication_metadata");
+ this.clientOptions = getMapFieldIfExists(row, "client_options");
+ }
+
+ public String getStringFieldIfExists(@NotNull Row row, String fieldName)
+ {
+ return (row.getColumnDefinitions().contains(fieldName)) ?
row.getString(fieldName) : null;
+ }
+
+ public Map getMapFieldIfExists(@NotNull Row row, String fieldName)
+ {
+ if ((row.getColumnDefinitions().contains(fieldName)))
+ {
+ return row.getMap(fieldName, String.class, String.class);
+ }
+ return null;
+ }
+}
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
new file mode 100644
index 00000000..38cf1354
--- /dev/null
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsDatabaseAccessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.DatabaseAccessor;
+
+/**
+ * DataAccessor implementation to read client connection stats from the table
represented in {@link ConnectedClientsSchema}
+ */
+public class ConnectedClientStatsDatabaseAccessor extends
DatabaseAccessor<ConnectedClientsSchema>
+{
+ public ConnectedClientStatsDatabaseAccessor(CQLSessionProvider
sessionProvider, ConnectedClientsSchema tableSchema)
+ {
+ super(tableSchema, sessionProvider);
+ }
+
+ /**
+ * Query for a summary of the client connection stats
+ * @return {@link ConnectedClientStatsSummary} with total connections and
counts grouped by user
+ */
+ public ConnectedClientStatsSummary summary()
+ {
+ tableSchema.prepareStatements(session());
+ BoundStatement statement = tableSchema.connectionsByUser().bind();
+ ResultSet resultSet = execute(statement);
+ return ConnectedClientStatsSummary.from(resultSet);
+ }
+
+ /**
+ * Query for all the client connection stats with an entry per connection
+ * @return {@link ConnectedClientStats} for each connection
+ */
+ public Stream<ConnectedClientStats> stats()
+ {
+ tableSchema.prepareStatements(session());
+ BoundStatement statement = tableSchema.stats().bind();
+ ResultSet resultSet = execute(statement);
+ return StreamSupport.stream(resultSet.spliterator(), false)
+ .map(ConnectedClientStats::from);
+ }
+}
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
new file mode 100644
index 00000000..c59da807
--- /dev/null
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsSummary.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.sidecar.db.DataObjectMappingException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Representation of a summary of client connections stats
+ */
+public class ConnectedClientStatsSummary
+{
+ public final int totalConnectedClients;
+ public final Map<String, Long> connectionsByUser;
+
+ public static ConnectedClientStatsSummary from(@NotNull ResultSet
resultSet) throws DataObjectMappingException
+ {
+
+ Map<String, Long> resultMap =
StreamSupport.stream(resultSet.spliterator(), false)
+
.collect(Collectors.toMap(r -> r.getString("username"),
+ r ->
r.getLong("connection_count")));
+ int totalConnections =
resultMap.values().stream().mapToInt(Math::toIntExact).sum();
+
+ return new ConnectedClientStatsSummary(resultMap, totalConnections);
+ }
+
+ public ConnectedClientStatsSummary(Map<String, Long> connectionsByUser,
int totalConnections)
+ {
+ this.totalConnectedClients = totalConnections;
+ this.connectionsByUser = connectionsByUser;
+ }
+
+
+}
+
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
new file mode 100644
index 00000000..63701438
--- /dev/null
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/db/schema/ConnectedClientsSchema.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.sidecar.db.schema.TableSchema;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Holds the prepared statements for operations related to client connection
stats retrieved from
+ * the "clients" virtual table
+ */
+public class ConnectedClientsSchema extends TableSchema
+{
+ private static final String TABLE_NAME = "clients";
+ private static final String KEYSPACE_NAME = "system_views";
+
+ private PreparedStatement statsStatement;
+ private PreparedStatement connectionsByUserStatement;
+
+ @Override
+ protected String keyspaceName()
+ {
+ return KEYSPACE_NAME;
+ }
+
+ public void prepareStatements(@NotNull Session session)
+ {
+ statsStatement = prepare(statsStatement, session, statsStatement());
+ connectionsByUserStatement = prepare(connectionsByUserStatement,
session, selectConnectionsByUserStatement());
+ }
+
+ @Override
+ protected boolean initializeInternal(@NotNull Session session)
+ {
+ prepareStatements(session);
+ return true;
+ }
+
+ @Override
+ protected String createSchemaStatement()
+ {
+ return null;
+ }
+
+ @Override
+ protected String tableName()
+ {
+ return TABLE_NAME;
+ }
+
+ public PreparedStatement stats()
+ {
+ return statsStatement;
+ }
+
+ public PreparedStatement connectionsByUser()
+ {
+ return connectionsByUserStatement;
+ }
+
+ private String statsStatement()
+ {
+ return String.format("SELECT * FROM %s.%s;", KEYSPACE_NAME,
TABLE_NAME);
+ }
+
+ static String selectConnectionsByUserStatement()
+ {
+ return String.format("SELECT username, COUNT(*) AS connection_count " +
+ "FROM %s.%s;",
+ KEYSPACE_NAME,
+ TABLE_NAME);
+ }
+}
diff --git
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
new file mode 100644
index 00000000..fdb6b86b
--- /dev/null
+++
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/db/ConnectedClientStatsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.db;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.Row;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for ConnectedClientStats transformed from the cassandra driver results
+ */
+public class ConnectedClientStatsTest
+{
+ private static final String TEST_SPIFFE_IDENTITY =
"spiffe://test.cassandra.apache.org/unitTest/mtls";
+ Row mockRow = mock(Row.class);
+
+ @Test
+ public void connectedClientStatsTest()
+ {
+ setupMockData(mockRow, false);
+ ConnectedClientStats stats = new ConnectedClientStats(mockRow);
+ assertThat(stats).isNotNull();
+ assertThat(stats.authenticationMetadata).isNotNull();
+ assertThat(stats.authenticationMetadata.keySet()).contains("identity");
+
assertThat(stats.authenticationMetadata.get("identity")).isEqualTo(TEST_SPIFFE_IDENTITY);
+ assertThat(stats.clientOptions).isNotNull();
+ assertThat(stats.clientOptions.get("CQL_VERSION")).isEqualTo("3.4.6");
+ }
+
+ @Test
+ public void connectedClientStatsMissingFieldsTest()
+ {
+ setupMockData(mockRow, true);
+ ConnectedClientStats stats = new ConnectedClientStats(mockRow);
+ assertThat(stats).isNotNull();
+ assertThat(stats.keyspaceName).isNull();
+ assertThat(stats.authenticationMode).isNull();
+ assertThat(stats.authenticationMetadata).isNull();
+ assertThat(stats.clientOptions).isNull();
+ }
+
+ private void setupMockData(Row mockRow, boolean isMissingFields)
+ {
+ ColumnDefinitions mockColumnDefinitions =
mock(ColumnDefinitions.class);
+ when(mockRow.getColumnDefinitions()).thenReturn(mockColumnDefinitions);
+
+ String ks;
+ String authMode;
+ Map<String, String> authMetadata = null;
+ Map<String, String> clientOptions = null;
+
+ // Case where the fields introduced in newer C* versions are not
present
+ if (isMissingFields)
+ {
+ ks = null;
+ authMode = null;
+
when(mockRow.getColumnDefinitions().contains(anyString())).thenAnswer(i -> {
+ String input = i.getArgument(0, String.class);
+ return !("keyspace_name".equals(input)
+ || "authentication_mode".equals(input)
+ || "authentication_metadata".equals(input)
+ || "client_options".equals(input));
+ });
+ }
+ else
+ {
+ ks = "test";
+ authMode = "password";
+ authMetadata = new HashMap<String, String>()
+ {
+ {
+ put("identity", TEST_SPIFFE_IDENTITY);
+ }
+ };
+ clientOptions = new HashMap<String, String>()
+ {
+ {
+ put("CQL_VERSION", "3.4.6");
+ put("DRIVER_NAME", "DataStax Python Driver");
+ put("DRIVER_VERSION", "3.25.0");
+ }
+ };
+
when(mockRow.getColumnDefinitions().contains(anyString())).thenReturn(true);
+ }
+
+
when(mockRow.getInet("address")).thenReturn(InetAddress.getLoopbackAddress());
+ when(mockRow.getInt("port")).thenReturn(0);
+ when(mockRow.getString("hostname")).thenReturn("localhost");
+ when(mockRow.getString("username")).thenReturn("u1");
+ when(mockRow.getString("connection_stage")).thenReturn("test");
+ when(mockRow.getInt("protocol_version")).thenReturn(5);
+ when(mockRow.getString("driver_name")).thenReturn("TestDriver");
+ when(mockRow.getString("driver_version")).thenReturn("TestVersion");
+ when(mockRow.getBool("ssl_enabled")).thenReturn(false);
+ when(mockRow.getString("ssl_protocol")).thenReturn("");
+ when(mockRow.getString("ssl_cipher_suite")).thenReturn("");
+ when(mockRow.getLong("request_count")).thenReturn(10L);
+ when(mockRow.getString("keyspace_name")).thenReturn(ks);
+ when(mockRow.getMap("authentication_metadata", String.class,
String.class)).thenReturn(authMetadata);
+ when(mockRow.getString("authentication_mode")).thenReturn(authMode);
+ when(mockRow.getMap("client_options", String.class,
String.class)).thenReturn(clientOptions);
+
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 12fd1f79..ec4c7b7e 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -109,6 +109,8 @@ public final class ApiEndpointsV1
public static final String ABORT_RESTORE_JOB_ROUTE = RESTORE_JOB_ROUTE +
ABORT;
public static final String RESTORE_JOB_PROGRESS_ROUTE = RESTORE_JOB_ROUTE
+ PROGRESS;
+ public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 +
CASSANDRA + "/stats/connected-clients";
+
private ApiEndpointsV1()
{
throw new IllegalStateException(getClass() + " is a constants
container and shall not be instantiated");
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
similarity index 53%
copy from
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
copy to
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
index 8d884831..7d8d7b98 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ConnectedClientStatsRequest.java
@@ -16,25 +16,32 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.db.schema;
+package org.apache.cassandra.sidecar.common.request;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.Metadata;
-import org.jetbrains.annotations.NotNull;
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
/**
- * Abstract base schema class for table schema
+ * Represents a request get the stats related to the clients connected to the
node
*/
-public abstract class TableSchema extends AbstractSchema
+public class ConnectedClientStatsRequest extends
JsonRequest<ConnectedClientStatsResponse>
{
- protected abstract String tableName();
+ /**
+ * Constructs a request to retrieve the Cassandra node stats connected
clients information
+ */
+ public ConnectedClientStatsRequest()
+ {
+ super(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE);
+ }
+ /**
+ * {@inheritDoc}
+ */
@Override
- protected boolean exists(@NotNull Metadata metadata)
+ public HttpMethod method()
{
- KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceName());
- if (ksMetadata == null)
- return false;
- return ksMetadata.getTable(tableName()) != null;
+ return HttpMethod.GET;
}
+
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
new file mode 100644
index 00000000..fb243187
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ConnectedClientStatsResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+
+/**
+ * Class response for the ConnectedClientStats API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ConnectedClientStatsResponse
+{
+
+ @JsonProperty("clientConnections")
+ public List<ClientConnectionEntry> clientConnections()
+ {
+ return clientConnections;
+ }
+
+ @JsonProperty("totalConnectedClients")
+ public long totalConnectedClients()
+ {
+ return totalConnectedClients;
+ }
+
+ @JsonProperty("connectionsByUser")
+ public Map<String, Long> connectionsByUser()
+ {
+ return connectionsByUser;
+ }
+
+ private final List<ClientConnectionEntry> clientConnections;
+ private final long totalConnectedClients;
+ private final Map<String, Long> connectionsByUser;
+
+
+ /**
+ * Constructs a new {@link ConnectedClientStatsResponse}.
+ *
+ * @param clientConnections list of client connection stats
+ * @param totalConnectedClients total count of the connected clients
+ * @param connectionsByUser mapping of user to the no. connections
+ */
+ @JsonCreator
+ public ConnectedClientStatsResponse(@JsonProperty("clientConnections")
List<ClientConnectionEntry> clientConnections,
+ @JsonProperty("totalConnectedClients")
long totalConnectedClients,
+ @JsonProperty("connectionsByUser")
Map<String, Long> connectionsByUser)
+ {
+ this.clientConnections = clientConnections;
+ this.totalConnectedClients = totalConnectedClients;
+ this.connectionsByUser = connectionsByUser;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
new file mode 100644
index 00000000..afb4c824
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/ClientConnectionEntry.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Class representing an entry of the Client Connection Stats
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ClientConnectionEntry
+{
+ @JsonProperty("address")
+ public String address()
+ {
+ return address;
+ }
+
+ @JsonProperty("port")
+ public int port()
+ {
+ return port;
+ }
+
+ @JsonProperty("sslEnabled")
+ public Boolean sslEnabled()
+ {
+ return sslEnabled;
+ }
+
+ @JsonProperty("sslCipherSuite")
+ public String sslCipherSuite()
+ {
+ return sslCipherSuite;
+ }
+
+ @JsonProperty("sslProtocol")
+ public String sslProtocol()
+ {
+ return sslProtocol;
+ }
+
+ @JsonProperty("protocolVersion")
+ public String protocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ @JsonProperty("username")
+ public String username()
+ {
+ return username;
+ }
+
+ @JsonProperty("requestCount")
+ public long requestCount()
+ {
+ return requestCount;
+ }
+
+ @JsonProperty("driverName")
+ public String driverName()
+ {
+ return driverName;
+ }
+
+ @JsonProperty("driverVersion")
+ public String driverVersion()
+ {
+ return driverVersion;
+ }
+ @JsonProperty("keyspaceName")
+ public String keyspaceName()
+ {
+ return keyspaceName;
+ }
+
+ @JsonProperty("clientOptions")
+ public Map<String, String> clientOptions()
+ {
+ return clientOptions;
+ }
+
+ @JsonProperty("authenticationMode")
+ public String authenticationMode()
+ {
+ return authenticationMode;
+ }
+
+ @JsonProperty("authenticationMetadata")
+ public Map<String, String> authenticationMetadata()
+ {
+ return authenticationMetadata;
+ }
+
+
+ private final String address;
+ private final int port;
+ private final Boolean sslEnabled;
+ private final String sslCipherSuite;
+ private final String sslProtocol;
+ private final String protocolVersion;
+ private final String username;
+ private final long requestCount;
+ private final String driverName;
+ private final String driverVersion;
+
+ private final Map<String, String> clientOptions;
+ private final String keyspaceName;
+ private final String authenticationMode;
+ private final Map<String, String> authenticationMetadata;
+
+
+ @JsonCreator
+ public ClientConnectionEntry(@NotNull @JsonProperty("address") String
address,
+ @NotNull @JsonProperty("port") int port,
+ @NotNull @JsonProperty("sslEnabled") boolean
sslEnabled,
+ @JsonProperty("sslCipherSuite") String
sslCipherSuite,
+ @JsonProperty("sslProtocol") String
sslProtocol,
+ @NotNull @JsonProperty("protocolVersion")
String protocolVersion,
+ @NotNull @JsonProperty("username") String
username,
+ @NotNull @JsonProperty("requestCount") long
requestCount,
+ @NotNull @JsonProperty("driverName") String
driverName,
+ @NotNull @JsonProperty("driverVersion")
String driverVersion,
+ @JsonProperty("keyspaceName") String
keyspaceName,
+ @JsonProperty("clientOptions") Map<String,
String> clientOptions,
+ @JsonProperty("authenticationMode") String
authMode,
+ @JsonProperty("authenticationMetadata")
Map<String, String> authMetadata)
+ {
+ this.address = address;
+ this.port = port;
+ this.sslEnabled = sslEnabled;
+ this.sslCipherSuite = sslCipherSuite;
+ this.sslProtocol = sslProtocol;
+ this.protocolVersion = protocolVersion;
+ this.username = username;
+ this.requestCount = requestCount;
+ this.driverName = driverName;
+ this.driverVersion = driverVersion;
+ this.keyspaceName = keyspaceName;
+ this.clientOptions = clientOptions;
+ this.authenticationMode = authMode;
+ this.authenticationMetadata = authMetadata;
+ }
+}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 61120d5f..96082a61 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -31,6 +31,7 @@ import
org.apache.cassandra.sidecar.common.request.CassandraJmxHealthRequest;
import
org.apache.cassandra.sidecar.common.request.CassandraNativeHealthRequest;
import
org.apache.cassandra.sidecar.common.request.CleanSSTableUploadSessionRequest;
import org.apache.cassandra.sidecar.common.request.ClearSnapshotRequest;
+import org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest;
import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest;
import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
@@ -474,6 +475,17 @@ public class RequestContext
return request(new UploadSSTableRequest(keyspace, tableName,
uploadId, component, digest, filename));
}
+ /**
+ * Sets the {@code request} to be a {@link
ConnectedClientStatsRequest} and returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @return a reference to this Builder
+ */
+ public Builder connectedClientStatsRequest()
+ {
+ return request(new ConnectedClientStatsRequest());
+ }
+
/**
* Sets the {@code retryPolicy} to be an
* {@link
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy}
configured with
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index f9f12643..1925e0ad 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -48,6 +48,7 @@ import
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayloa
import org.apache.cassandra.sidecar.common.request.data.Digest;
import
org.apache.cassandra.sidecar.common.request.data.RestoreJobProgressRequestParams;
import
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -580,6 +581,18 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Executes the connected client stats request using the default retry
policy and configured selection policy
+ *
+ * @return a completable future of the connected client stats
+ */
+ public CompletableFuture<ConnectedClientStatsResponse>
connectedClientStats()
+ {
+ return executeRequestAsync(requestBuilder()
+ .connectedClientStatsRequest()
+ .build());
+ }
+
/**
* Returns a copy of the request builder with the default parameters
configured for the client.
*
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index d576f713..8ac0bb3a 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.sidecar.common.request.Request;
import
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -75,6 +76,7 @@ import
org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
import org.apache.cassandra.sidecar.common.response.SchemaResponse;
import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
import
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.response.data.RingEntry;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
@@ -1398,6 +1400,46 @@ abstract class SidecarClientTest
});
}
+ @Test
+ public void testConnectedClientStats() throws Exception
+ {
+ String connectedClientStatsResponseAsString =
"{\"clientConnections\":[{\"address\":\"127.0.0.1\",\"port\":54628" +
+
",\"sslEnabled\":false,\"sslCipherSuite\":\"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256\""
+
+
",\"sslProtocol\":\"TLSv1.2\",\"protocolVersion\":\"5\",\"username\":\"anonymous\""
+
+
",\"requestCount\":39,\"driverName\":\"DataStax Java Driver\"" +
+
",\"driverVersion\":\"3.11.3\",\"keyspaceName\":\"test\"" +
+
",\"authenticationMode\":\"MutualTls\"" +
+
",\"authenticationMetadata\":{\"identity\":\"spiffe://test.cassandra.apache.org/unitTest/mtls\"}"
+
+
",\"clientOptions\":{\"CQL_VERSION\":\"3.4.6\",\"DRIVER_NAME\":\"DataStax
Python Driver\"" +
+
",\"DRIVER_VERSION\":\"3.25.0\"}}],\"totalConnectedClients\":1" +
+
",\"connectionsByUser\":{\"anonymous\":1}}";
+
+ MockResponse response = new
MockResponse().setResponseCode(OK.code()).setBody(connectedClientStatsResponseAsString);
+ enqueue(response);
+ ConnectedClientStatsResponse result =
client.connectedClientStats().get();
+
+ assertThat(result).isNotNull();
+ assertThat(result.clientConnections()).isNotNull().hasSize(1);
+ assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1);
+
assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous");
+ ClientConnectionEntry entry =
result.clientConnections().iterator().next();
+ assertThat(entry.address()).isEqualTo("127.0.0.1");
+ assertThat(entry.port()).isEqualTo(54628);
+ assertThat(entry.sslEnabled()).isEqualTo(false);
+
assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256");
+ assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2");
+ assertThat(entry.protocolVersion()).isEqualTo("5");
+ assertThat(entry.username()).isEqualTo("anonymous");
+ assertThat(entry.requestCount()).isEqualTo(39);
+ assertThat(entry.driverName()).isEqualTo("DataStax Java Driver");
+ assertThat(entry.driverVersion()).isEqualTo("3.11.3");
+ assertThat(entry.keyspaceName()).isEqualTo("test");
+ assertThat(entry.authenticationMode()).isEqualTo("MutualTls");
+ assertThat(entry.authenticationMetadata()).containsKey("identity");
+ assertThat(entry.clientOptions()).containsKeys("CQL_VERSION",
"DRIVER_NAME", "DRIVER_VERSION");
+ validateResponseServed(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE);
+ }
+
private void enqueue(MockResponse response)
{
for (MockWebServer server : servers)
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
index 8bb87a72..4db86f49 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java
@@ -82,6 +82,12 @@ public interface ICassandraAdapter
*/
StorageOperations storageOperations();
+ /**
+ * @return the {@link MetricsOperations} implementation for the Cassandra
cluster
+ */
+ MetricsOperations metricsOperations();
+
+
/**
* @return the {@link ClusterMembershipOperations} implementation for
handling cluster membership operations
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
similarity index 59%
copy from
server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
copy to
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
index 8b532890..ef7a5f95 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
@@ -16,15 +16,20 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.db;
+package org.apache.cassandra.sidecar.common.server;
+
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
/**
- * Exception thrown during mapping a data object into/from database record
+ * An interface that defines interactions with the metrics system in Cassandra.
*/
-public class DataObjectMappingException extends RuntimeException
+public interface MetricsOperations
{
- public DataObjectMappingException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ /**
+ * Retrieve the connected client stats metrics from the cluster
+ * @param summaryOnly boolean parameter to list connection summary only
+ * @return the requested client stats, in full or summary
+ */
+ ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
+
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
similarity index 100%
rename from
server/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
rename to
server-common/src/main/java/org/apache/cassandra/sidecar/db/DataObjectMappingException.java
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
similarity index 89%
rename from
server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
rename to
server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
index d435bde7..37c1a1f3 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java
@@ -25,7 +25,6 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
-import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.db.schema.TableSchema;
import org.jetbrains.annotations.NotNull;
@@ -36,17 +35,14 @@ import org.jetbrains.annotations.NotNull;
*/
public abstract class DatabaseAccessor<T extends TableSchema>
{
- public final SidecarSchema sidecarSchema;
public final CQLSessionProvider cqlSessionProvider;
protected final T tableSchema;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- protected DatabaseAccessor(SidecarSchema sidecarSchema,
- T tableSchema,
+ protected DatabaseAccessor(T tableSchema,
CQLSessionProvider sessionProvider)
{
- this.sidecarSchema = sidecarSchema;
this.tableSchema = tableSchema;
this.cqlSessionProvider = sessionProvider;
}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
new file mode 100644
index 00000000..7301e7b4
--- /dev/null
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import
org.apache.cassandra.sidecar.exceptions.SidecarSchemaModificationException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Abstract schema
+ */
+public abstract class AbstractSchema
+{
+ protected Logger logger = LoggerFactory.getLogger(this.getClass());
+ private boolean initialized = false;
+
+ public synchronized boolean initialize(@NotNull Session session)
+ {
+ initialized = initialized || initializeInternal(session);
+ return initialized;
+ }
+
+ protected PreparedStatement prepare(PreparedStatement cached, Session
session, String cqlLiteral)
+ {
+ return cached == null ? session.prepare(cqlLiteral) : cached;
+ }
+
+ protected boolean initializeInternal(@NotNull Session session)
+ {
+ if (!exists(session.getCluster().getMetadata()))
+ {
+ try
+ {
+ ResultSet res = session.execute(createSchemaStatement());
+ if (!res.getExecutionInfo().isSchemaInAgreement())
+ {
+ logger.warn("Schema is not yet in agreement.");
+ return false;
+ }
+ }
+ catch (Exception exception)
+ {
+ String schemaName = this.getClass().getSimpleName();
+ throw new SidecarSchemaModificationException("Failed to modify
schema for " + schemaName, exception);
+ }
+ }
+
+ prepareStatements(session);
+ return true;
+ }
+
+ protected abstract String keyspaceName();
+
+ protected abstract void prepareStatements(@NotNull Session session);
+
+ protected abstract boolean exists(@NotNull Metadata metadata);
+
+ protected abstract String createSchemaStatement();
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
similarity index 100%
rename from
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
rename to
server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/TableSchema.java
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
similarity index 100%
rename from
server/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
rename to
server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/SidecarSchemaModificationException.java
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index cc19b1bf..c1de10cf 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -50,6 +50,7 @@ import
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.server.ClusterMembershipOperations;
import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
@@ -402,6 +403,13 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
return fromAdapter(ICassandraAdapter::storageOperations);
}
+ @Nullable
+ @Override
+ public MetricsOperations metricsOperations()
+ {
+ return fromAdapter(ICassandraAdapter::metricsOperations);
+ }
+
@Nullable
@Override
public ClusterMembershipOperations clusterMembershipOperations()
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index fd942c67..b75fb132 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -55,13 +55,16 @@ import org.jetbrains.annotations.Nullable;
public class RestoreJobDatabaseAccessor extends
DatabaseAccessor<RestoreJobsSchema>
{
private static final ObjectMapper MAPPER = new ObjectMapper();
+ public final SidecarSchema sidecarSchema;
@Inject
public RestoreJobDatabaseAccessor(SidecarSchema sidecarSchema,
RestoreJobsSchema restoreJobsSchema,
CQLSessionProvider cqlSessionProvider)
{
- super(sidecarSchema, restoreJobsSchema, cqlSessionProvider);
+ super(restoreJobsSchema, cqlSessionProvider);
+ this.sidecarSchema = sidecarSchema;
+
}
public RestoreJob create(CreateRestoreJobRequestPayload payload,
QualifiedTableName qualifiedTableName)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
index 31c5a2da..516611a4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java
@@ -38,12 +38,15 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
@Singleton
public class RestoreRangeDatabaseAccessor extends
DatabaseAccessor<RestoreRangesSchema>
{
+ private final SidecarSchema sidecarSchema;
+
@Inject
protected RestoreRangeDatabaseAccessor(SidecarSchema sidecarSchema,
RestoreRangesSchema tableSchema,
CQLSessionProvider sessionProvider)
{
- super(sidecarSchema, tableSchema, sessionProvider);
+ super(tableSchema, sessionProvider);
+ this.sidecarSchema = sidecarSchema;
}
public RestoreRange create(RestoreRange range)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index 21c97a3f..640bd427 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -40,12 +40,15 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
@Singleton
public class RestoreSliceDatabaseAccessor extends
DatabaseAccessor<RestoreSlicesSchema>
{
+ private final SidecarSchema sidecarSchema;
+
@Inject
protected RestoreSliceDatabaseAccessor(SidecarSchema sidecarSchema,
RestoreSlicesSchema
restoreSlicesSchema,
CQLSessionProvider
cqlSessionProvider)
{
- super(sidecarSchema, restoreSlicesSchema, cqlSessionProvider);
+ super(restoreSlicesSchema, cqlSessionProvider);
+ this.sidecarSchema = sidecarSchema;
}
public RestoreSlice create(RestoreSlice slice)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
index 0a7825fc..a5f22e4c 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java
@@ -39,7 +39,7 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
/**
- * Encapsulates all {@link AbstractSchema} related operations for features
provided by Sidecar
+ * Encapsulates all related operations for features provided by Sidecar
*/
public class SidecarSchema
{
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
index 16b107f8..127b099d 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SchemaMetrics.java
@@ -19,11 +19,12 @@
package org.apache.cassandra.sidecar.metrics;
import com.codahale.metrics.MetricRegistry;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import static org.apache.cassandra.sidecar.metrics.ServerMetrics.SERVER_PREFIX;
/**
- * Tracks metrics for {@link
org.apache.cassandra.sidecar.db.schema.SidecarSchema} and other schema related
handling
+ * Tracks metrics for {@link SidecarSchema} and other schema related handling
* done by Sidecar
*/
public class SchemaMetrics
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
index 2e491b7f..6b986abf 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.sidecar.metrics;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+
import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX;
/**
@@ -43,7 +45,7 @@ public interface ServerMetrics
RestoreMetrics restore();
/**
- * @return metrics related to {@link
org.apache.cassandra.sidecar.db.schema.SidecarSchema} that are tracked.
+ * @return metrics related to {@link SidecarSchema} that are tracked.
*/
SchemaMetrics schema();
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
index 976256a0..0c3aa763 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.routes;
import java.util.NoSuchElementException;
+import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,9 @@ import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
import
org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import
org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException;
@@ -40,6 +43,7 @@ import
org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
@@ -98,6 +102,25 @@ public abstract class AbstractHandler<T> implements
Handler<RoutingContext>
}
}
+ protected void ifMetricsOpsAvailable(RoutingContext context,
+ String host,
+ Consumer<MetricsOperations>
ifAvailable)
+ {
+ CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+ if (delegate == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
+ MetricsOperations operations = delegate.metricsOperations();
+ if (operations == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
+ ifAvailable.accept(operations);
+ }
+
/**
* Extracts the request object from the {@code context}.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
new file mode 100644
index 00000000..dc59fba7
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Handler for retrieving stats for connected clients
+ */
+public class ConnectedClientStatsHandler extends AbstractHandler<Void>
+{
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the metadata fetcher
+ * @param executorPools executor pools for blocking executions
+ */
+ @Inject
+ protected ConnectedClientStatsHandler(InstanceMetadataFetcher
metadataFetcher, ExecutorPools executorPools)
+ {
+ super(metadataFetcher, executorPools, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ String host,
+ SocketAddress remoteAddress,
+ Void request)
+ {
+
+ ifMetricsOpsAvailable(context, host, operations -> {
+ boolean summaryOnly = parseBooleanQueryParam(httpRequest,
"summary", true);
+
+ executorPools.service()
+ .executeBlocking(() ->
operations.connectedClientStats(summaryOnly))
+ .onSuccess(context::json)
+ .onFailure(cause -> processFailure(cause, context,
host, remoteAddress, request));
+ });
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 095e910a..a2f6fc47 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -86,6 +86,7 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
import org.apache.cassandra.sidecar.routes.CassandraHealthHandler;
+import org.apache.cassandra.sidecar.routes.ConnectedClientStatsHandler;
import org.apache.cassandra.sidecar.routes.DiskSpaceProtectionHandler;
import org.apache.cassandra.sidecar.routes.FileStreamHandler;
import org.apache.cassandra.sidecar.routes.GossipInfoHandler;
@@ -210,6 +211,7 @@ public class MainModule extends AbstractModule
AbortRestoreJobHandler abortRestoreJobHandler,
CreateRestoreSliceHandler
createRestoreSliceHandler,
RestoreJobProgressHandler
restoreJobProgressHandler,
+ ConnectedClientStatsHandler
connectedClientStatsHandler,
ErrorHandler errorHandler)
{
Router router = Router.router(vertx);
@@ -294,6 +296,9 @@ public class MainModule extends AbstractModule
router.get(ApiEndpointsV1.RING_ROUTE)
.handler(ringHandler);
+ router.get(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE)
+ .handler(connectedClientStatsHandler);
+
router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE)
.handler(ringHandler);
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
new file mode 100644
index 00000000..0a592bf6
--- /dev/null
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the client-stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class ConnectedClientStatsHandlerIntegrationTest extends
IntegrationTestBase
+{
+ private static final int DEFAULT_CONNECTION_COUNT = 2;
+
+ @CassandraIntegrationTest
+ void retrieveClientStatsDefault(VertxTestContext context)
+ throws Exception
+ {
+ Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", true);
+ String testRoute = "/api/v1/cassandra/stats/connected-clients";
+ testWithClient(context, client -> {
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertClientStatsResponse(response, expectedParams);
+ context.completeNow();
+ }));
+ });
+ }
+
+ @CassandraIntegrationTest
+ void retrieveClientStatsListConnections(VertxTestContext context)
+ throws Exception
+ {
+
+ Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", false);
+ String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
+ testWithClient(context, client -> {
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertClientStatsResponse(response, expectedParams);
+ context.completeNow();
+ }));
+ });
+ }
+
+ @CassandraIntegrationTest
+ void retrieveClientStatsListConnectionsWithKeyspace(VertxTestContext
context)
+ throws Exception
+ {
+ createTestKeyspace();
+ Session session = maybeGetSession();
+ session.execute("USE " + TEST_KEYSPACE);
+
+ Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", false);
+ String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
+ testWithClient(context, client -> {
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertClientStatsResponse(response, expectedParams, 4,
true);
+ context.completeNow();
+ }));
+ });
+ }
+
+ @CassandraIntegrationTest
+ void retrieveClientStatsMultipleConnections(VertxTestContext context)
+ throws Exception
+ {
+ // Creates an additional connection pair
+ createTestKeyspace();
+ Map<String, Boolean> expectedParams =
Collections.singletonMap("summary", false);
+ String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
+ testWithClient(context, client -> {
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertClientStatsResponse(response, expectedParams, 4);
+ context.completeNow();
+ }));
+ });
+ }
+
+ /**
+ * Expects unrecognized params to be ignored and invalid value for the
expected parameter to be defaulted to true
+ * to prevent heavyweight query in the bad request case.
+ */
+ @CassandraIntegrationTest
+ void retrieveClientStatsInvalidParamaterValue(VertxTestContext context)
+ throws Exception
+ {
+ Map<String, Boolean> expectedParams = new HashMap<>();
+ expectedParams.put("summary", true);
+ String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=123&bad-arg=xyz";
+ testWithClient(context, client -> {
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertClientStatsResponse(response, expectedParams);
+ context.completeNow();
+ }));
+ });
+ }
+
+ void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String,
Boolean> params)
+ {
+ assertClientStatsResponse(response, params, DEFAULT_CONNECTION_COUNT);
+ }
+
+ void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String,
Boolean> params, int expectedConnections)
+ {
+ assertClientStatsResponse(response, params, expectedConnections,
false);
+ }
+ void assertClientStatsResponse(HttpResponse<Buffer> response, Map<String,
Boolean> params, int expectedConnections, boolean usingKeyspace)
+ {
+ boolean isSummary = params.get("summary");
+
+ logger.info("Response:" + response.bodyAsString());
+ ConnectedClientStatsResponse clientStats =
response.bodyAsJson(ConnectedClientStatsResponse.class);
+ assertThat(clientStats).isNotNull();
+ assertThat(clientStats.connectionsByUser()).isNotEmpty();
+ assertThat(clientStats.connectionsByUser()).containsKey("anonymous");
+
assertThat(clientStats.totalConnectedClients()).isEqualTo(expectedConnections);
+
+ List<ClientConnectionEntry> stats = clientStats.clientConnections();
+ if (isSummary)
+ {
+ assertThat(stats).isNull();
+ }
+ else
+ {
+ assertThat(stats.size()).isEqualTo(expectedConnections);
+ for (ClientConnectionEntry stat : stats)
+ {
+ assertThat(stat.address()).contains("127.0.0.1");
+ assertThat(stat.sslEnabled()).isEqualTo(false);
+ assertThat(stat.driverName()).isEqualTo("DataStax Java
Driver");
+ assertThat(stat.driverVersion()).isNotNull();
+ assertThat(stat.username()).isEqualTo("anonymous");
+ if
(sidecarTestContext.version.isGreaterThan(SimpleCassandraVersion.create("4.0.0")))
+ {
+ assertThat(stat.clientOptions()).isNotNull();
+
assertThat(stat.clientOptions().containsKey("CQL_VERSION")).isTrue();
+ }
+ }
+
+ // TODO: Add validations for fields in trunk once dtest jars can
advance beyond TCM commit
+ if (usingKeyspace
+ &&
sidecarTestContext.version.compareTo(SimpleCassandraVersion.create("5.0.0")) >=
0)
+ {
+
assertThat(stats.stream().map(ClientConnectionEntry::keyspaceName).collect(Collectors.toSet())).contains(TEST_KEYSPACE);
+ }
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
new file mode 100644
index 00000000..395e0359
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ConnectedClientStatsHandler} class
+ */
+@ExtendWith(VertxExtension.class)
+public class ConnectedClientStatsHandlerTest
+{
+
+ private static final int EXPECTED_TOTAL_CLIENTS = 10;
+ private static final Map<String, Long> EXPECTED_CONNECTIONS_BY_USER = new
HashMap<String, Long>()
+ {
+ {
+ put("u1", 5L);
+ put("u2", 5L);
+ }
+ };
+ private static final List<ClientConnectionEntry> EXPECTED_STATS =
Arrays.asList(
+ new ClientConnectionEntry("1", 0, false, null, null, "5", "u1", 5,
+ "name", "version", "keyspace1",
Collections.emptyMap(), null, Collections.emptyMap()),
+ new ClientConnectionEntry("1", 0, false, null, null, "5", "u1", 5,
+ "name", "version", "keyspace1",
Collections.emptyMap(), null, Collections.emptyMap()));
+ static final Logger LOGGER =
LoggerFactory.getLogger(ConnectedClientStatsHandlerTest.class);
+ Vertx vertx;
+ Server server;
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
ConnectedClientStatsTestModule());
+ Injector injector = Guice.createInjector(Modules.override(new
MainModule())
+ .with(testOverride));
+
+ server = injector.getInstance(Server.class);
+ vertx = injector.getInstance(Vertx.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testHandlerStats(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/stats/connected-clients";
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ ConnectedClientStatsResponse statsResponse =
response.bodyAsJson(ConnectedClientStatsResponse.class);
+ assertThat(statsResponse).isNotNull();
+
assertThat(statsResponse.totalConnectedClients()).isEqualTo(EXPECTED_TOTAL_CLIENTS);
+
assertThat(statsResponse.connectionsByUser()).hasSize(EXPECTED_CONNECTIONS_BY_USER.size());
+ context.completeNow();
+ }));
+ }
+
+ static class ConnectedClientStatsTestModule extends AbstractModule
+ {
+ @Provides
+ @Singleton
+ public InstancesConfig instanceConfig()
+ {
+ ConnectedClientStatsResponse summaryResponse = new
ConnectedClientStatsResponse(Collections.emptyList(),
+
EXPECTED_TOTAL_CLIENTS,
+
EXPECTED_CONNECTIONS_BY_USER);
+ ConnectedClientStatsResponse statsResponse = new
ConnectedClientStatsResponse(EXPECTED_STATS,
+
EXPECTED_TOTAL_CLIENTS,
+
EXPECTED_CONNECTIONS_BY_USER);
+
+ final int instanceId = 100;
+ final String host = "127.0.0.1";
+ final InstanceMetadata instanceMetadata =
mock(InstanceMetadata.class);
+ when(instanceMetadata.host()).thenReturn(host);
+ when(instanceMetadata.port()).thenReturn(9042);
+ when(instanceMetadata.id()).thenReturn(instanceId);
+ when(instanceMetadata.stagingDir()).thenReturn("");
+
+ CassandraAdapterDelegate delegate =
mock(CassandraAdapterDelegate.class);
+ MetricsOperations mockMetricsOperations =
mock(MetricsOperations.class);
+
when(mockMetricsOperations.connectedClientStats(true)).thenReturn(summaryResponse);
+
when(mockMetricsOperations.connectedClientStats(false)).thenReturn(statsResponse);
+
when(delegate.metricsOperations()).thenReturn(mockMetricsOperations);
+ when(instanceMetadata.delegate()).thenReturn(delegate);
+
+ InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+
when(mockInstancesConfig.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+
when(mockInstancesConfig.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+
when(mockInstancesConfig.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+ return mockInstancesConfig;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]