This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 529171b CASSANDRASC-87: Add JMX health checks during the periodic
health checks
529171b is described below
commit 529171b1f6dee277a9087eb9da7242ce17873643
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Dec 11 13:49:36 2023 -0800
CASSANDRASC-87: Add JMX health checks during the periodic health checks
In this commit, we add health checks based on the JMX connectivity to the
managed
Cassandra instances. Additionally, we construct the NodeSettings object
based on
JMX. This allows the Sidecar process to be able to determine an adapter for
the
node even if the node is in joining state, or its binary port has been
disabled.
Co-authored-by: Doug Rohrer <[email protected]>
Co-authored-by: Francisco Guerrero <[email protected]>
Patch by Doug Rohrer, Francisco Guerrero; Reviewed by Yifan Cai for
CASSANDRASC-87
---
CHANGES.txt | 1 +
.../sidecar/adapters/base/CassandraAdapter.java | 33 +--
.../cassandra/sidecar/client/RequestContext.java | 43 ++-
.../cassandra/sidecar/client/SidecarClient.java | 40 ++-
...Request.java => CassandraJmxHealthRequest.java} | 10 +-
...uest.java => CassandraNativeHealthRequest.java} | 24 +-
.../sidecar/client/SidecarClientTest.java | 72 ++++-
.../cassandra/sidecar/common/ApiEndpointsV1.java | 9 +
.../apache/cassandra/sidecar/common/JmxClient.java | 33 ++-
.../cassandra/sidecar/common/NodeSettings.java | 7 -
.../sidecar/cluster/CassandraAdapterDelegate.java | 316 +++++++++++++++++----
.../cluster/SidecarLoadBalancingPolicy.java | 8 +-
.../sidecar/cluster/instance/InstanceMetadata.java | 3 +-
.../cluster/instance/InstanceMetadataImpl.java | 4 +-
.../sidecar/routes/CassandraHealthHandler.java | 7 +-
.../cassandra/sidecar/routes/RingHandler.java | 6 +
.../routes/TokenRangeReplicaMapHandler.java | 5 +
.../routes/cassandra/NodeSettingsHandler.java | 5 +
.../sstableuploads/SSTableImportHandler.java | 5 +
.../cassandra/sidecar/server/MainModule.java | 8 +
.../sidecar/server/SidecarServerEvents.java | 12 +
.../sidecar/utils/InstanceMetadataFetcher.java | 1 +
.../cassandra/sidecar/utils/SSTableImporter.java | 6 +
.../sidecar/utils/SimpleCassandraVersion.java | 2 +-
.../distributed/impl/AbstractClusterUtils.java | 26 +-
.../sidecar/common/CQLSessionProviderTest.java | 4 +-
.../sidecar/common/DelegateIntegrationTest.java | 313 ++++++++++++++++++++
.../cassandra/sidecar/common/DelegateTest.java | 125 --------
.../cassandra/sidecar/common/JmxClientTest.java | 4 +-
.../routes/GossipInfoHandlerIntegrationTest.java | 2 +-
.../sidecar/routes/RingHandlerIntegrationTest.java | 2 +-
.../routes/tokenrange/BasicGossipDisabledTest.java | 2 +-
.../testing/CassandraSidecarTestContext.java | 33 ++-
.../sidecar/testing/IntegrationTestBase.java | 6 +-
.../testing/AbstractCassandraTestContext.java | 2 +-
.../testing/CassandraIntegrationTest.java | 2 +-
.../cassandra/testing/CassandraTestContext.java | 5 -
.../org/apache/cassandra/sidecar/TestModule.java | 2 +-
.../cassandra/sidecar/server/ServerSSLTest.java | 7 +-
.../cassandra/sidecar/snapshots/SnapshotUtils.java | 6 +-
40 files changed, 892 insertions(+), 309 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f94b7b9..25e76dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add JMX health checks during the periodic health checks (CASSANDRASC-87)
* Sidecar should be able to load metadata even if the local instance is
unavailable (CASSANDRASC-79)
* Expose additional SSL configuration options for the Sidecar Service
(CASSANDRASC-82)
* Expose additional node settings (CASSANDRASC-84)
diff --git
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index 0e12a0e..f02ea2f 100644
---
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -28,7 +28,6 @@ import com.datastax.driver.core.DriverUtils;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
@@ -41,13 +40,6 @@ import org.apache.cassandra.sidecar.common.TableOperations;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME;
-
/**
* A {@link ICassandraAdapter} implementation for Cassandra 4.0 and later
*/
@@ -107,30 +99,7 @@ public class CassandraAdapter implements ICassandraAdapter
@Nullable
public NodeSettings nodeSettings()
{
- ResultSet rs = executeLocal("SELECT "
- + RELEASE_VERSION_COLUMN_NAME +
", "
- + PARTITIONER_COLUMN_NAME + ", "
- + DATA_CENTER_COLUMN_NAME + ", "
- + RPC_ADDRESS_COLUMN_NAME + ", "
- + RPC_PORT_COLUMN_NAME + ", "
- + TOKENS_COLUMN_NAME
- + " FROM system.local");
- if (rs == null)
- {
- return null;
- }
-
- Row oneResult = rs.one();
-
- return NodeSettings.builder()
-
.releaseVersion(oneResult.getString(RELEASE_VERSION_COLUMN_NAME))
-
.partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME))
- .sidecarVersion(sidecarVersion)
-
.datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME))
- .tokens(oneResult.getSet(TOKENS_COLUMN_NAME,
String.class))
-
.rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME))
- .rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME))
- .build();
+ throw new UnsupportedOperationException("Node settings are not
provided by this adapter");
}
@Override
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index f595fb1..b7c7e82 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -22,7 +22,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.cassandra.sidecar.client.request.CassandraHealthRequest;
+import org.apache.cassandra.sidecar.client.request.CassandraJmxHealthRequest;
+import
org.apache.cassandra.sidecar.client.request.CassandraNativeHealthRequest;
import
org.apache.cassandra.sidecar.client.request.CleanSSTableUploadSessionRequest;
import org.apache.cassandra.sidecar.client.request.ClearSnapshotRequest;
import org.apache.cassandra.sidecar.client.request.CreateSnapshotRequest;
@@ -52,7 +53,16 @@ import org.apache.cassandra.sidecar.common.utils.HttpRange;
public class RequestContext
{
protected static final SidecarHealthRequest SIDECAR_HEALTH_REQUEST = new
SidecarHealthRequest();
- protected static final CassandraHealthRequest CASSANDRA_HEALTH_REQUEST =
new CassandraHealthRequest();
+
+ /**
+ * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_REQUEST}
+ */
+ @Deprecated
+ protected static final CassandraNativeHealthRequest
CASSANDRA_HEALTH_REQUEST =
+ new CassandraNativeHealthRequest(true /* useDeprecatedHealthEndpoint */);
+ protected static final CassandraNativeHealthRequest
CASSANDRA_NATIVE_HEALTH_REQUEST =
+ new CassandraNativeHealthRequest();
+ protected static final CassandraJmxHealthRequest
CASSANDRA_JMX_HEALTH_REQUEST = new CassandraJmxHealthRequest();
protected static final SchemaRequest FULL_SCHEMA_REQUEST = new
SchemaRequest();
protected static final TimeSkewRequest TIME_SKEW_REQUEST = new
TimeSkewRequest();
protected static final NodeSettingsRequest NODE_SETTINGS_REQUEST = new
NodeSettingsRequest();
@@ -190,16 +200,41 @@ public class RequestContext
}
/**
- * Sets the {@code request} to be a {@link CassandraHealthRequest}
- * and returns a reference to this Builder enabling method chaining
+ * Sets the {@code request} to be a {@link
CassandraNativeHealthRequest}
+ * with the {@code useDeprecatedHealthEndpoint} parameter set to
{@code true}
+ * and returns a reference to this Builder enabling method chaining.
*
* @return a reference to this Builder
+ * @deprecated in favor of {@link #cassandraNativeHealthRequest()}
*/
+ @Deprecated
public Builder cassandraHealthRequest()
{
return request(CASSANDRA_HEALTH_REQUEST);
}
+ /**
+ * Sets the {@code request} to be a {@link
CassandraNativeHealthRequest}
+ * and returns a reference to this Builder enabling method chaining
+ *
+ * @return a reference to this Builder
+ */
+ public Builder cassandraNativeHealthRequest()
+ {
+ return request(CASSANDRA_NATIVE_HEALTH_REQUEST);
+ }
+
+ /**
+ * Sets the {@code request} to be a {@link CassandraJmxHealthRequest}
+ * and returns a reference to this Builder enabling method chaining
+ *
+ * @return a reference to this Builder
+ */
+ public Builder cassandraJmxHealthRequest()
+ {
+ return request(CASSANDRA_JMX_HEALTH_REQUEST);
+ }
+
/**
* Sets the {@code request} to be a {@link SchemaRequest} for the full
schema and returns a reference to
* this Builder enabling method chaining.
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index d9d77f8..90fd47f 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -79,22 +79,50 @@ public class SidecarClient implements AutoCloseable
public CompletableFuture<HealthResponse> sidecarHealth()
{
return executor.executeRequestAsync(requestBuilder()
- .sidecarHealthRequest()
- .retryPolicy(new OncePerInstanceRetryPolicy())
- .build());
+ .sidecarHealthRequest()
+ .retryPolicy(new
OncePerInstanceRetryPolicy())
+ .build());
}
/**
* Executes the Cassandra health request using the configured selection
policy and with no retries
*
* @return a completable future of the Cassandra health response
+ * @deprecated use {@link #cassandraNativeHealth()} instead
*/
+ @Deprecated
public CompletableFuture<HealthResponse> cassandraHealth()
{
return executor.executeRequestAsync(requestBuilder()
- .cassandraHealthRequest()
- .retryPolicy(new OncePerInstanceRetryPolicy())
- .build());
+ .cassandraHealthRequest()
+ .retryPolicy(new
OncePerInstanceRetryPolicy())
+ .build());
+ }
+
+ /**
+ * Executes the Cassandra health request using the configured selection
policy and with no retries
+ *
+ * @return a completable future of the Cassandra health response
+ */
+ public CompletableFuture<HealthResponse> cassandraNativeHealth()
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .cassandraNativeHealthRequest()
+ .retryPolicy(new
OncePerInstanceRetryPolicy())
+ .build());
+ }
+
+ /**
+ * Executes the Cassandra health request using the configured selection
policy and with no retries
+ *
+ * @return a completable future of the Cassandra health response
+ */
+ public CompletableFuture<HealthResponse> cassandraJmxHealth()
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .cassandraJmxHealthRequest()
+ .retryPolicy(new
OncePerInstanceRetryPolicy())
+ .build());
}
/**
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
similarity index 77%
copy from
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
copy to
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
index 2061c89..1d5644a 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
@@ -23,16 +23,16 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
/**
- * Represents a request to retrieve the Cassandra health
+ * Represents a request to retrieve the connectivity health checks performed
against the Cassandra JMX protocol
*/
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class CassandraJmxHealthRequest extends DecodableRequest<HealthResponse>
{
/**
- * Constructs a request to retrieve the Cassandra health
+ * Constructs a request to retrieve the Cassandra JMX health
*/
- public CassandraHealthRequest()
+ public CassandraJmxHealthRequest()
{
- super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
+ super(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
}
/**
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
similarity index 57%
copy from
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
copy to
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
index 2061c89..35960c9 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
@@ -23,16 +23,30 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
/**
- * Represents a request to retrieve the Cassandra health
+ * Represents a request to retrieve the connectivity health checks performed
against the Cassandra native protocol
*/
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class CassandraNativeHealthRequest extends
DecodableRequest<HealthResponse>
{
/**
- * Constructs a request to retrieve the Cassandra health
+ * Constructs a request to retrieve the Cassandra native health
*/
- public CassandraHealthRequest()
+ public CassandraNativeHealthRequest()
{
- super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
+ this(false);
+ }
+
+ /**
+ * Constructs a request to retrieve the Cassandra native health
+ *
+ * @param useDeprecatedHealthEndpoint {@code true} if using the deprecated
endpoint, {@code false} to use
+ * the new endpoint
+ */
+ @SuppressWarnings("deprecation")
+ public CassandraNativeHealthRequest(boolean useDeprecatedHealthEndpoint)
+ {
+ super(useDeprecatedHealthEndpoint
+ ? ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE
+ : ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
}
/**
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 669f9a4..d47e460 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -144,8 +144,9 @@ abstract class SidecarClientTest
validateResponseServed(ApiEndpointsV1.HEALTH_ROUTE);
}
+ @SuppressWarnings("deprecation")
@Test
- void testCassandraHealthOk() throws Exception
+ void testCassandraDeprecatedHealthOk() throws Exception
{
MockResponse response = new MockResponse()
.setResponseCode(200)
@@ -161,8 +162,9 @@ abstract class SidecarClientTest
validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
}
+ @SuppressWarnings("deprecation")
@Test
- void testCassandraHealthNotOk() throws Exception
+ void testCassandraDeprecatedHealthNotOk() throws Exception
{
MockResponse response = new MockResponse()
.setResponseCode(503)
@@ -177,6 +179,72 @@ abstract class SidecarClientTest
validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
}
+ @Test
+ void testCassandraNativeHealthOk() throws Exception
+ {
+ MockResponse response = new MockResponse()
+ .setResponseCode(200)
+ .setHeader("content-type", "application/json")
+ .setBody("{\"status\":\"OK\"}");
+ enqueue(response);
+
+ HealthResponse result = client.cassandraNativeHealth().get(30,
TimeUnit.SECONDS);
+ assertThat(result).isNotNull();
+ assertThat(result.status()).isEqualToIgnoringCase("OK");
+ assertThat(result.isOk()).isTrue();
+
+ validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
+ }
+
+ @Test
+ void testCassandraNativeHealthNotOk() throws Exception
+ {
+ MockResponse response = new MockResponse()
+ .setResponseCode(503)
+ .setHeader("content-type", "application/json")
+ .setBody("{\"status\":\"NOT_OK\"}");
+ enqueue(response);
+
+ assertThatThrownBy(() -> client.cassandraNativeHealth().get(30,
TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(RetriesExhaustedException.class);
+
+ validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
+ }
+
+ @Test
+ void testCassandraJmxHealthOk() throws Exception
+ {
+ MockResponse response = new MockResponse()
+ .setResponseCode(200)
+ .setHeader("content-type", "application/json")
+ .setBody("{\"status\":\"OK\"}");
+ enqueue(response);
+
+ HealthResponse result = client.cassandraJmxHealth().get(1,
TimeUnit.SECONDS);
+ assertThat(result).isNotNull();
+ assertThat(result.status()).isEqualTo("OK");
+ assertThat(result.isOk()).isTrue();
+
+ validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
+ }
+
+ @Test
+ void testCassandraJmxHealthNotOk() throws Exception
+ {
+ MockResponse response = new MockResponse()
+ .setResponseCode(503)
+ .setHeader("content-type", "application/json")
+ .setBody("{\"status\":\"NOT_OK\"}");
+ enqueue(response);
+
+ assertThatThrownBy(() -> client.cassandraJmxHealth().get(1,
TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(RetriesExhaustedException.class);
+
+ validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
+ }
+
@Test
void testFullSchema() throws Exception
{
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 5fe2d5b..550a8c4 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -28,6 +28,8 @@ public final class ApiEndpointsV1
public static final String HEALTH = "/__health";
public static final String CASSANDRA = "/cassandra";
+ public static final String NATIVE = "/native";
+ public static final String JMX = "/jmx";
public static final String KEYSPACE_PATH_PARAM = ":keyspace";
public static final String TABLE_PATH_PARAM = ":table";
public static final String SNAPSHOT_PATH_PARAM = ":snapshot";
@@ -44,7 +46,14 @@ public final class ApiEndpointsV1
public static final String PER_UPLOAD = "/uploads/" + UPLOAD_ID_PATH_PARAM;
public static final String HEALTH_ROUTE = API_V1 + HEALTH;
+
+ /**
+ * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_ROUTE}
+ */
+ @Deprecated
public static final String CASSANDRA_HEALTH_ROUTE = API_V1 + CASSANDRA +
HEALTH;
+ public static final String CASSANDRA_NATIVE_HEALTH_ROUTE = API_V1 +
CASSANDRA + NATIVE + HEALTH;
+ public static final String CASSANDRA_JMX_HEALTH_ROUTE = API_V1 + CASSANDRA
+ JMX + HEALTH;
@Deprecated // NOTE: Uses singular forms of "keyspace" and "table"
public static final String DEPRECATED_SNAPSHOTS_ROUTE = API_V1 +
"/keyspace/" + KEYSPACE_PATH_PARAM +
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
index 2bb86d8..44ab18c 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
@@ -23,9 +23,12 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMISocketFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
@@ -66,7 +69,8 @@ public class JmxClient implements NotificationListener,
Closeable
private final BooleanSupplier enableSslSupplier;
private final int connectionMaxRetries;
private final long connectionRetryDelayMillis;
-
+ private final Set<NotificationListener> registeredNotificationListeners =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* Creates a new JMX client with {@link Builder} options.
@@ -119,6 +123,27 @@ public class JmxClient implements NotificationListener,
Closeable
}
}
+ /**
+ * Registers a {@link NotificationListener} to be notified whenever we
encounter a JMX event. This method
+ * guarantees that a listener will be registered at most once.
+ *
+ * @param notificationListener the listener to be notified
+ */
+ public void registerListener(NotificationListener notificationListener)
+ {
+ registeredNotificationListeners.add(notificationListener);
+ }
+
+ /**
+ * Removes an already registered {@link NotificationListener} from the
recipient list for JMX events.
+ *
+ * @param notificationListener the listener to be removed
+ */
+ public void unregisterListener(NotificationListener notificationListener)
+ {
+ registeredNotificationListeners.remove(notificationListener);
+ }
+
private RMIClientSocketFactory rmiClientSocketFactory(boolean enableSsl)
{
return enableSsl
@@ -206,10 +231,16 @@ public class JmxClient implements NotificationListener,
Closeable
{
this.connected = justConnected;
}
+ forwardNotification(notification, handback);
}
}
}
+ private void forwardNotification(Notification notification, Object
handback)
+ {
+ registeredNotificationListeners.forEach(listener ->
listener.handleNotification(notification, handback));
+ }
+
/**
* @return true if JMX is connected, false otherwise
*/
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
index 61dae74..f6c5244 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
@@ -33,13 +33,6 @@ public class NodeSettings
{
private static final String VERSION = "version";
- public static final String RELEASE_VERSION_COLUMN_NAME = "release_version";
- public static final String PARTITIONER_COLUMN_NAME = "partitioner";
- public static final String DATA_CENTER_COLUMN_NAME = "data_center";
- public static final String RPC_ADDRESS_COLUMN_NAME = "rpc_address";
- public static final String RPC_PORT_COLUMN_NAME = "rpc_port";
- public static final String TOKENS_COLUMN_NAME = "tokens";
-
@JsonProperty("releaseVersion")
private final String releaseVersion;
@JsonProperty("partitioner")
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 4523ed4..15cea06 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -20,9 +20,15 @@ package org.apache.cassandra.sidecar.cluster;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.remote.JMXConnectionNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,14 +58,12 @@ import
org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME;
-import static
org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
/**
@@ -82,13 +86,16 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
private final CassandraVersionProvider versionProvider;
private final CQLSessionProvider cqlSessionProvider;
private final JmxClient jmxClient;
+ private final JmxNotificationListener notificationListener;
private SimpleCassandraVersion currentVersion;
private ICassandraAdapter adapter;
- private volatile NodeSettings nodeSettings = null;
+ private volatile boolean isNativeUp = false;
+ private volatile NodeSettings nodeSettingsFromJmx = null;
private final AtomicBoolean registered = new AtomicBoolean(false);
private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
private final InetSocketAddress localNativeTransportAddress;
private volatile Host host;
+ private volatile boolean closed = false;
/**
* Constructs a new {@link CassandraAdapterDelegate} for the given {@code
cassandraInstance}
@@ -118,6 +125,8 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
this.versionProvider = versionProvider;
this.cqlSessionProvider = session;
this.jmxClient = jmxClient;
+ notificationListener = new JmxNotificationListener();
+ this.jmxClient.registerListener(notificationListener);
}
private void maybeRegisterHostListener(@NotNull Session session)
@@ -146,17 +155,24 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
/**
* Should be called on initial connect as well as when a server comes back
since it might be from an upgrade
- * synchronized so we don't flood the DB with version requests
+ * synchronized, so we don't flood the DB with version requests
*
* <p>If the healthcheck determines we've changed versions, it should load
the proper adapter</p>
*/
public void healthCheck()
{
+ if (closed)
+ {
+ LOGGER.debug("Skipping health check for cassandraInstanceId={}.
Delegate is closed", cassandraInstanceId);
+ return;
+ }
+
if (isHealthCheckActive.compareAndSet(false, true))
{
try
{
- healthCheckInternal();
+ jmxHealthCheck();
+ nativeProtocolHealthCheck();
}
finally
{
@@ -170,14 +186,49 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
}
}
- private void healthCheckInternal()
+ /**
+ * Performs health checks by utilizing the JMX protocol. It uses a small
subset of the exposed mBeans to
+ * collect information needed to populate the {@link NodeSettings} object.
+ */
+ protected void jmxHealthCheck()
+ {
+ try
+ {
+ NodeSettings newNodeSettings = newNodeSettingsFromJmx();
+ if (!newNodeSettings.equals(nodeSettingsFromJmx))
+ {
+ // Update the nodeSettings cache
+ SimpleCassandraVersion previousVersion = currentVersion;
+ currentVersion =
SimpleCassandraVersion.create(newNodeSettings.releaseVersion());
+ adapter =
versionProvider.cassandra(newNodeSettings.releaseVersion())
+ .create(cqlSessionProvider,
jmxClient, localNativeTransportAddress);
+ nodeSettingsFromJmx = newNodeSettings;
+ LOGGER.info("Cassandra version change detected (from={} to={})
for cassandraInstanceId={}. " +
+ "New adapter loaded={}", previousVersion,
currentVersion, cassandraInstanceId, adapter);
+
+ notifyJmxConnection();
+ }
+ LOGGER.debug("Cassandra version {}",
newNodeSettings.releaseVersion());
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.error("Unable to connect JMX to Cassandra instance {}",
cassandraInstanceId, e);
+ // The cassandra node JMX connectivity is unavailable.
+ markJmxDownAndMaybeNotifyDisconnection();
+ }
+ }
+
+ /**
+ * Performs health checks by utilizing the native protocol
+ */
+ protected void nativeProtocolHealthCheck()
{
Session activeSession = cqlSessionProvider.get();
if (activeSession == null)
{
LOGGER.info("No local CQL session is available for
cassandraInstanceId={}. " +
"Cassandra instance is down presumably.",
cassandraInstanceId);
- markAsDownAndMaybeNotify();
+ markNativeDownAndMaybeNotifyDisconnection();
return;
}
@@ -186,15 +237,7 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
try
{
// NOTE: We cannot use `executeLocal` here as there may be no
adapter yet.
- SimpleStatement healthCheckStatement =
- new SimpleStatement("SELECT "
- + RELEASE_VERSION_COLUMN_NAME + ", "
- + PARTITIONER_COLUMN_NAME + ", "
- + DATA_CENTER_COLUMN_NAME + ", "
- + RPC_ADDRESS_COLUMN_NAME + ", "
- + RPC_PORT_COLUMN_NAME + ", "
- + TOKENS_COLUMN_NAME
- + " FROM system.local");
+ SimpleStatement healthCheckStatement = new SimpleStatement("SELECT
release_version FROM system.local");
Metadata metadata = activeSession.getCluster().getMetadata();
host = getHost(metadata);
if (host == null)
@@ -205,46 +248,82 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
}
healthCheckStatement.setHost(host);
healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE);
- Row oneResult = activeSession.execute(healthCheckStatement).one();
-
- // Note that within the scope of this method, we should keep on
using the local releaseVersion
- String releaseVersion =
oneResult.getString(RELEASE_VERSION_COLUMN_NAME);
- NodeSettings newNodeSettings = NodeSettings.builder()
-
.releaseVersion(releaseVersion)
-
.partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME))
-
.sidecarVersion(sidecarVersion)
-
.datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME))
-
.tokens(oneResult.getSet(TOKENS_COLUMN_NAME, String.class))
-
.rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME))
-
.rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME))
- .build();
-
- if (!newNodeSettings.equals(nodeSettings))
- {
- // Update the nodeSettings cache
- SimpleCassandraVersion previousVersion = currentVersion;
- currentVersion = SimpleCassandraVersion.create(releaseVersion);
- adapter = versionProvider.cassandra(releaseVersion)
- .create(cqlSessionProvider,
jmxClient, localNativeTransportAddress);
- nodeSettings = newNodeSettings;
- LOGGER.info("Cassandra version change detected (from={} to={})
for cassandraInstanceId={}. " +
- "New adapter loaded={}", previousVersion,
currentVersion, cassandraInstanceId, adapter);
+ Row row = activeSession.execute(healthCheckStatement).one();
- notifyCqlConnection();
+ if (row != null)
+ {
+ if (!isNativeUp)
+ {
+ isNativeUp = true;
+ notifyNativeConnection();
+ }
+ }
+ else
+ {
+ // This should never happen but added for completeness
+ LOGGER.error("Expected to query the release_version from
system.local but encountered null {}",
+ cassandraInstanceId);
+ // The cassandra native protocol connection to the node is
down.
+ markNativeDownAndMaybeNotifyDisconnection();
+ // Unregister the host listener.
+ maybeUnregisterHostListener(activeSession);
}
- LOGGER.debug("Cassandra version {}", releaseVersion);
}
catch (IllegalArgumentException | NoHostAvailableException e)
{
- LOGGER.error("Unexpected error connecting to Cassandra instance
{}", cassandraInstanceId, e);
- // The cassandra node is down.
+ LOGGER.error("Unexpected error querying Cassandra instance {}",
cassandraInstanceId, e);
+ // The cassandra native protocol connection to the node is down.
+ markNativeDownAndMaybeNotifyDisconnection();
// Unregister the host listener.
- markAsDownAndMaybeNotify();
maybeUnregisterHostListener(activeSession);
}
}
- private Host getHost(Metadata metadata)
+ protected NodeSettings newNodeSettingsFromJmx()
+ {
+ LimitedStorageOperations storageOperations =
+ jmxClient.proxy(LimitedStorageOperations.class,
STORAGE_SERVICE_OBJ_NAME);
+ LimitedEndpointSnitchOperations endpointSnitchOperations =
+ jmxClient.proxy(LimitedEndpointSnitchOperations.class,
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+ String releaseVersion = storageOperations.getReleaseVersion();
+ String partitionerName = storageOperations.getPartitionerName();
+ List<String> tokens = maybeGetTokens(storageOperations);
+ String dataCenter = endpointSnitchOperations.getDatacenter();
+
+ return NodeSettings.builder()
+ .releaseVersion(releaseVersion)
+ .partitioner(partitionerName)
+ .sidecarVersion(sidecarVersion)
+ .datacenter(dataCenter)
+ .tokens(new LinkedHashSet<>(tokens))
+
.rpcAddress(localNativeTransportAddress.getAddress())
+ .rpcPort(localNativeTransportAddress.getPort())
+ .build();
+ }
+
+ /**
+ * Attempts to return the tokens assigned to the Cassandra instance.
+ *
+ * @param storageOperations the interface to perform the operations
+ * @return the list of tokens assigned to the Cassandra instance
+ */
+ protected List<String> maybeGetTokens(LimitedStorageOperations
storageOperations)
+ {
+ try
+ {
+ return storageOperations.getTokens();
+ }
+ catch (AssertionError aex)
+ {
+ // On a joining node, the JMX call will fail with an
AssertionError; we catch this scenario to prevent
+ // failure and just return an empty list of tokens. This is
technically correct, because the node, while
+ // joining, doesn't actually own any tokens until it has
successfully completed joining.
+ return Collections.emptyList();
+ }
+ }
+
+ protected Host getHost(Metadata metadata)
{
if (host == null)
{
@@ -271,13 +350,17 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
}
/**
- * @return a cached {@link NodeSettings}. The returned value could be null
when no CQL connection is established
+ * Returns the cached node settings value obtained during scheduled health
checks. This method does not delegate
+ * to the internal adapter, as the information is retrieved on the
configured health check interval.
+ *
+ * @return a cached {@link NodeSettings}. The returned value will be
{@code null} when no JMX connection is
+ * established
*/
@Nullable
@Override
public NodeSettings nodeSettings()
{
- return nodeSettings;
+ return nodeSettingsFromJmx;
}
public ResultSet executeLocal(Statement statement)
@@ -326,7 +409,7 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
@Override
public void onDown(Host host)
{
- runIfThisHost(host, this::markAsDownAndMaybeNotify);
+ runIfThisHost(host, this::markNativeDownAndMaybeNotifyDisconnection);
}
@Override
@@ -345,14 +428,27 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
{
}
- public boolean isUp()
+ /**
+ * @return {@code true} if the native protocol is enabled on the Cassandra
instance, {@code false} otherwise
+ */
+ public boolean isNativeUp()
+ {
+ return isNativeUp;
+ }
+
+ /**
+ * @return {@code true} if JMX connectivity has been established to the
Cassandra instance, {@code false} otherwise
+ */
+ public boolean isJmxUp()
{
- return nodeSettings != null;
+ return nodeSettingsFromJmx != null;
}
public void close()
{
- markAsDownAndMaybeNotify();
+ closed = true;
+ markNativeDownAndMaybeNotifyDisconnection();
+ markJmxDownAndMaybeNotifyDisconnection();
Session activeSession = cqlSessionProvider.getIfConnected();
if (activeSession != null)
{
@@ -360,6 +456,7 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
}
if (jmxClient != null)
{
+ jmxClient.unregisterListener(notificationListener);
try
{
jmxClient.close();
@@ -377,7 +474,15 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
return currentVersion;
}
- protected void notifyCqlConnection()
+ protected void notifyJmxConnection()
+ {
+ JsonObject connectMessage = new JsonObject()
+ .put("cassandraInstanceId",
cassandraInstanceId);
+ vertx.eventBus().publish(ON_CASSANDRA_JMX_READY.address(),
connectMessage);
+ LOGGER.info("JMX connected to cassandraInstanceId={}",
cassandraInstanceId);
+ }
+
+ protected void notifyNativeConnection()
{
JsonObject connectMessage = new JsonObject()
.put("cassandraInstanceId",
cassandraInstanceId);
@@ -385,11 +490,11 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
LOGGER.info("CQL connected to cassandraInstanceId={}",
cassandraInstanceId);
}
- protected void markAsDownAndMaybeNotify()
+ protected void markNativeDownAndMaybeNotifyDisconnection()
{
- NodeSettings currentNodeSettings = nodeSettings;
- nodeSettings = null;
- if (currentNodeSettings != null)
+ boolean wasCqlConnected = isNativeUp;
+ isNativeUp = false;
+ if (wasCqlConnected)
{
JsonObject disconnectMessage = new JsonObject()
.put("cassandraInstanceId",
cassandraInstanceId);
@@ -398,6 +503,21 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
}
}
+ protected void markJmxDownAndMaybeNotifyDisconnection()
+ {
+ NodeSettings currentNodeSettings = nodeSettingsFromJmx;
+ nodeSettingsFromJmx = null;
+ currentVersion = null;
+ adapter = null;
+ if (currentNodeSettings != null)
+ {
+ JsonObject disconnectMessage = new JsonObject()
+ .put("cassandraInstanceId",
cassandraInstanceId);
+ vertx.eventBus().publish(ON_CASSANDRA_JMX_DISCONNECTED.address(),
disconnectMessage);
+ LOGGER.info("JMX disconnection from cassandraInstanceId={}",
cassandraInstanceId);
+ }
+ }
+
@Nullable
private <T> T fromAdapter(Function<ICassandraAdapter, T> getter)
{
@@ -412,4 +532,78 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
runnable.run();
}
}
+
+ /**
+ * A {@link NotificationListener} implementation that reacts to {@link
JMXConnectionNotification} notifications
+ * and updates the state of the JMX connection internally.
+ */
+ protected class JmxNotificationListener implements NotificationListener
+ {
+ @Override
+ public void handleNotification(Notification notification, Object
handback)
+ {
+ if (notification instanceof JMXConnectionNotification)
+ {
+ JMXConnectionNotification connectNotice =
(JMXConnectionNotification) notification;
+ String type = connectNotice.getType();
+ switch (type)
+ {
+ case JMXConnectionNotification.OPENED:
+ // Do not notify here as we may not have set up our
own delegate yet
+ // Instead, run the JMX Health Check, which will
notify once we have
+ // created or updated the adapter.
+ jmxHealthCheck();
+ break;
+
+ case JMXConnectionNotification.CLOSED:
+ case JMXConnectionNotification.FAILED:
+ case JMXConnectionNotification.NOTIFS_LOST:
+ markJmxDownAndMaybeNotifyDisconnection();
+ break;
+
+ default:
+ LOGGER.warn("Encountered unexpected JMX notification
type={}", type);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Limited StorageOperations to obtain information required for node
settings. Interface visibility is public
+ * because JMX proxy works on public interfaces only.
+ */
+ public interface LimitedStorageOperations
+ {
+ /**
+ * Fetch a string representation of the Cassandra version.
+ *
+ * @return A string representation of the Cassandra version.
+ */
+ String getReleaseVersion();
+
+ /**
+ * @return the cluster partitioner
+ */
+ String getPartitionerName();
+
+ /**
+ * Fetch string representations of the tokens for this node.
+ *
+ * @return a collection of tokens formatted as strings
+ */
+ List<String> getTokens();
+ }
+
+ /**
+ * Limited standard Snitch info to obtain information required for node
settings. Interface visibility is public
+ * because JMX proxy works on public interfaces only.
+ */
+ public interface LimitedEndpointSnitchOperations
+ {
+ /**
+ * @return the Datacenter name depending on the respective snitch used
for this node
+ */
+ String getDatacenter();
+ }
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
index 74e1b0e..f149f15 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
@@ -87,11 +87,11 @@ class SidecarLoadBalancingPolicy implements
LoadBalancingPolicy
@Override
public HostDistance distance(Host host)
{
- if (!selectedHosts.contains(host))
+ if (selectedHosts.contains(host) || isLocalHost(host))
{
- return HostDistance.IGNORED;
+ return childPolicy.distance(host);
}
- return childPolicy.distance(host);
+ return HostDistance.IGNORED;
}
@Override
@@ -197,7 +197,7 @@ class SidecarLoadBalancingPolicy implements
LoadBalancingPolicy
List<Host> nonLocalHosts = partitionedHosts.get(false);
if (nonLocalHosts == null || nonLocalHosts.isEmpty())
{
- LOGGER.warn("Did not find any non-local hosts in allHosts");
+ LOGGER.debug("Did not find any non-local hosts in allHosts");
return;
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
index c00dc37..26da25e 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.cluster.instance;
import java.util.List;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.jetbrains.annotations.Nullable;
/**
@@ -56,5 +57,5 @@ public interface InstanceMetadata
/**
* @return a {@link CassandraAdapterDelegate} specific for the instance
*/
- CassandraAdapterDelegate delegate();
+ @Nullable CassandraAdapterDelegate delegate();
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index c1a36a7..ca39b6c 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import org.jetbrains.annotations.Nullable;
/**
* Local implementation of InstanceMetadata.
@@ -35,6 +36,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
private final int port;
private final List<String> dataDirs;
private final String stagingDir;
+ @Nullable
private final CassandraAdapterDelegate delegate;
protected InstanceMetadataImpl(Builder builder)
@@ -78,7 +80,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
}
@Override
- public CassandraAdapterDelegate delegate()
+ public @Nullable CassandraAdapterDelegate delegate()
{
return delegate;
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
index 4713535..509aecb 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JMX;
import static org.apache.cassandra.sidecar.server.MainModule.NOT_OK_STATUS;
import static org.apache.cassandra.sidecar.server.MainModule.OK_STATUS;
@@ -72,7 +73,11 @@ public class CassandraHealthHandler extends
AbstractHandler<Void>
Void request)
{
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
- if (delegate != null && delegate.isUp())
+
+ boolean isServiceUp = context.request().path().contains(JMX)
+ ? delegate != null && delegate.isJmxUp()
+ : delegate != null && delegate.isNativeUp();
+ if (isServiceUp)
{
context.json(OK_STATUS);
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
index 8ed3900..468d90a 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
@@ -63,6 +63,12 @@ public class RingHandler extends AbstractHandler<RingRequest>
RingRequest request)
{
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+ if (delegate == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
+
StorageOperations storageOperations = delegate.storageOperations();
if (storageOperations == null)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
index aea5ffc..fa00e84 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
@@ -69,6 +69,11 @@ public class TokenRangeReplicaMapHandler extends
AbstractHandler<TokenRangeRepli
TokenRangeReplicasRequest request)
{
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+ if (delegate == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
StorageOperations operations = delegate.storageOperations();
Metadata metadata = delegate.metadata();
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
index bc72cc5..264db54 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
@@ -60,6 +60,11 @@ public class NodeSettingsHandler extends
AbstractHandler<Void>
Void request)
{
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+ if (delegate == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
NodeSettings nodeSettings = delegate.nodeSettings();
if (nodeSettings == null)
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
index 97b8290..8f60820 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
@@ -165,6 +165,11 @@ public class SSTableImportHandler extends
AbstractHandler<SSTableImportRequest>
private Future<Void> importSSTablesAsync(SSTableImporter.ImportOptions
importOptions)
{
CassandraAdapterDelegate cassandra =
metadataFetcher.delegate(importOptions.host());
+ if (cassandra == null)
+ {
+ return Future.failedFuture(cassandraServiceUnavailable());
+ }
+
TableOperations tableOperations = cassandra.tableOperations();
if (tableOperations == null)
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index fec3f4c..9e11cb8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -162,9 +162,17 @@ public class MainModule extends AbstractModule
router.get(ApiEndpointsV1.HEALTH_ROUTE)
.handler(context -> context.json(OK_STATUS));
+ // Backwards compatibility for the Cassandra health endpoint
+ //noinspection deprecation
router.get(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE)
.handler(cassandraHealthHandler);
+ router.get(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE)
+ .handler(cassandraHealthHandler);
+
+ router.get(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE)
+ .handler(cassandraHealthHandler);
+
//noinspection deprecation
router.get(ApiEndpointsV1.DEPRECATED_COMPONENTS_ROUTE)
.handler(streamSSTableComponentHandler)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
index d15423e..51c94c1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
@@ -69,6 +69,18 @@ public enum SidecarServerEvents
* for the Sidecar-managed Cassandra instances are available.
*/
ON_ALL_CASSANDRA_CQL_READY,
+
+ /**
+ * The {@link io.vertx.core.eventbus.EventBus} address where events will
be published when a JMX connection for
+ * a given instance has been established. The instance identifier will be
passed as part of the message.
+ */
+ ON_CASSANDRA_JMX_READY,
+
+ /**
+ * The {@link io.vertx.core.eventbus.EventBus} address where events will
be published when a JMX connection for
+ * a given instance has been disconnected. The instance identifier will be
passed as part of the message.
+ */
+ ON_CASSANDRA_JMX_DISCONNECTED,
;
public String address()
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
index 0c51bf0..518af51 100644
---
a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
@@ -75,6 +75,7 @@ public class InstanceMetadataFetcher
* @return the {@link CassandraAdapterDelegate} for the given {@code
host}, or the first instance when {@code host}
* is {@code null}
*/
+ @Nullable
public CassandraAdapterDelegate delegate(String host)
{
return instance(host).delegate();
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index eb3b29a..02ba0a2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -207,6 +207,12 @@ public class SSTableImporter
ImportOptions options = pair.getValue();
CassandraAdapterDelegate cassandra =
metadataFetcher.delegate(options.host);
+ if (cassandra == null)
+ {
+ promise.fail(HttpExceptions.cassandraServiceUnavailable());
+ continue;
+ }
+
TableOperations tableOperations = cassandra.tableOperations();
if (tableOperations == null)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
index 2a3d1b4..54d54a5 100644
---
a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
@@ -62,7 +62,7 @@ public class SimpleCassandraVersion implements
Comparable<SimpleCassandraVersion
* @throws IllegalArgumentException if the provided string does not
* represent a version
*/
- public static SimpleCassandraVersion create(String version)
+ public static SimpleCassandraVersion create(String version) throws
IllegalArgumentException
{
String stripped = version.toUpperCase().replace(SNAPSHOT, "");
Matcher matcher = PATTERN.matcher(stripped);
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
similarity index 56%
rename from
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
rename to
src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
index 2061c89..4733817 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++
b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
@@ -16,31 +16,15 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.client.request;
-
-import io.netty.handler.codec.http.HttpMethod;
-import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
-import org.apache.cassandra.sidecar.common.data.HealthResponse;
+package org.apache.cassandra.distributed.impl;
/**
- * Represents a request to retrieve the Cassandra health
+ * Utility class to interact with protected methods in AbstractCluster
*/
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class AbstractClusterUtils
{
- /**
- * Constructs a request to retrieve the Cassandra health
- */
- public CassandraHealthRequest()
- {
- super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public HttpMethod method()
+ public static InstanceConfig createInstanceConfig(AbstractCluster cluster,
int nodeNumber)
{
- return HttpMethod.GET;
+ return cluster.createInstanceConfig(nodeNumber);
}
}
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
index 04bddec..d77f5f8 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
@@ -53,7 +53,7 @@ public class CQLSessionProviderTest extends
IntegrationTestBase
void testCqlSessionProviderWorksAsExpected(VertxTestContext context,
CassandraTestContext cassandraTestContext)
throws Exception
{
- UpgradeableCluster cluster = cassandraTestContext.getCluster();
+ UpgradeableCluster cluster = cassandraTestContext.cluster();
testWithClient(context, false, webClient -> {
// To start, both instances are stopped, so we
should get 503s for both
buildInstanceHealthRequest(webClient, "1")
@@ -129,7 +129,7 @@ public class CQLSessionProviderTest extends
IntegrationTestBase
{
return webClient.get(server.actualPort(),
"localhost",
- "/api/v1/cassandra/__health?instanceId=" +
instanceId)
+ "/api/v1/cassandra/native/__health?instanceId=" +
instanceId)
.as(BodyCodec.string());
}
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
new file mode 100644
index 0000000..9bc1c82
--- /dev/null
+++
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.impl.ConcurrentHashSet;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.Timeout;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Ensures the Delegate works correctly
+ */
+@ExtendWith(VertxExtension.class)
+class DelegateIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest()
+ void testCorrectVersionIsEnabled()
+ {
+ CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
+
.instanceFromId(1)
+ .delegate();
+ assertThat(delegate).isNotNull();
+ SimpleCassandraVersion version = delegate.version();
+ assertThat(version).isNotNull();
+ assertThat(version.major).isEqualTo(sidecarTestContext.version.major);
+ assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor);
+ assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version);
+ }
+
+ @CassandraIntegrationTest()
+ void testHealthCheck(VertxTestContext context)
+ {
+ EventBus eventBus = vertx.eventBus();
+ Checkpoint cqlReady = context.checkpoint();
+ Checkpoint cqlDisconnected = context.checkpoint();
+
+ CassandraAdapterDelegate adapterDelegate =
sidecarTestContext.instancesConfig()
+
.instanceFromId(1)
+
.delegate();
+ assertThat(adapterDelegate).isNotNull();
+ assertThat(adapterDelegate.isJmxUp()).as("jmx health check
succeeds").isTrue();
+ assertThat(adapterDelegate.isNativeUp()).as("native health check
succeeds").isTrue();
+
+ // Set up test listeners before disabling/enabling binary to avoid
race conditions
+ // where the event happens before the consumer is registered.
+ eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
+ int instanceId = message.body().getInteger("cassandraInstanceId");
+ CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
+
.instanceFromId(instanceId)
+ .delegate();
+
+ assertThat(delegate).isNotNull();
+ assertThat(delegate.isNativeUp()).as("health check fails after
binary has been disabled").isFalse();
+ cqlDisconnected.flag();
+ sidecarTestContext.cluster().get(1).nodetool("enablebinary");
+ });
+
+ eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> reconnectMessage) -> {
+ int instanceId =
reconnectMessage.body().getInteger("cassandraInstanceId");
+ CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
+
.instanceFromId(instanceId)
+ .delegate();
+ assertThat(delegate).isNotNull();
+ assertThat(delegate.isNativeUp()).as("health check succeeds after
binary has been enabled")
+ .isTrue();
+ cqlReady.flag();
+ });
+
+ // Disable binary
+ NodeToolResult nodetoolResult =
sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
+ assertThat(nodetoolResult.getRc())
+ .withFailMessage("Failed to disable binary:\nstdout:" +
nodetoolResult.getStdout()
+ + "\nstderr: " + nodetoolResult.getStderr())
+ .isEqualTo(0);
+ // NOTE: enable binary happens inside the disable binary handler
above, which then will trigger the
+ // cqlReady flag.
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 3)
+ void testAllInstancesHealthCheck(VertxTestContext context)
+ {
+ EventBus eventBus = vertx.eventBus();
+ Checkpoint allCqlReady = context.checkpoint();
+
+ Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
+ eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
+ JsonArray cassandraInstanceIds =
message.body().getJsonArray("cassandraInstanceIds");
+ assertThat(cassandraInstanceIds).hasSize(3);
+ assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
+ .allMatch(expectedCassandraInstanceIds::contains);
+
+ allCqlReady.flag();
+ });
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 3)
+ void testStoppingAnInstance(VertxTestContext context)
+ {
+ EventBus eventBus = vertx.eventBus();
+ Checkpoint allCqlReady = context.checkpoint();
+ Checkpoint cqlDisconnected = context.checkpoint();
+ Checkpoint jmxDisconnected = context.checkpoint();
+
+ Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
+ eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
+ JsonArray cassandraInstanceIds =
message.body().getJsonArray("cassandraInstanceIds");
+ assertThat(cassandraInstanceIds).hasSize(3);
+ assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
+ .allMatch(expectedCassandraInstanceIds::contains);
+
+ allCqlReady.flag();
+
+ // Stop instance 2
+ ClusterUtils.stopUnchecked(sidecarTestContext.cluster().get(2));
+ });
+
+ eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ assertThat(instanceId).isEqualTo(2);
+
+ buildNativeHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
+ });
+
+ eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ assertThat(instanceId).isEqualTo(2);
+
+ buildJmxHealthRequest(client,
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
+ });
+ }
+
+ @Timeout(value = 2, timeUnit = TimeUnit.MINUTES)
+ @CassandraIntegrationTest(nodesPerDc = 2, newNodesPerDc = 1, startCluster
= false)
+ public void testChangingClusterSize(VertxTestContext context) throws
InterruptedException
+ {
+ EventBus eventBus = vertx.eventBus();
+
+ Checkpoint jmxConnected = context.checkpoint(3);
+ Checkpoint nativeConnected = context.checkpoint(3);
+ Checkpoint jmxNotConnected = context.checkpoint();
+ Checkpoint nativeNotConnected = context.checkpoint();
+
+ CountDownLatch firstTwoConnected = new CountDownLatch(2);
+
+ Set<Integer> nativeConnectedInstances = new ConcurrentHashSet<>();
+ Set<Integer> jmxConnectedInstances = new ConcurrentHashSet<>();
+
+ eventBus.localConsumer(ON_CASSANDRA_JMX_READY.address(),
(Message<JsonObject> message) -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ logger.info("DBG: Received JMX connection notification for {}",
instanceId);
+ // make sure the instance wasn't already in the set before
validating
+ if (jmxConnectedInstances.add(instanceId))
+ {
+ jmxConnected.flag();
+ validateJmxConnections(context, jmxConnectedInstances,
jmxNotConnected, firstTwoConnected);
+ }
+ });
+
+ eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ logger.info("DBG: Received native connection notification for {}",
instanceId);
+ buildNativeHealthRequest(client,
instanceId).send(assertHealthCheckOk(context, nativeConnected));
+ // make sure the instance wasn't already in the set before
validating/flagging
+ if (nativeConnectedInstances.add(instanceId))
+ {
+ nativeConnected.flag();
+ validateNativeConnections(context, nativeNotConnected,
firstTwoConnected, nativeConnectedInstances);
+ }
+ });
+
+ // Now that the event listeners are set up, start the cluster
+ sidecarTestContext.cluster().startup();
+
+ // Wait for the first two instances to get connected
+ assertThat(firstTwoConnected.await(2, TimeUnit.MINUTES)).isTrue();
+
+ // now start the 3rd instance - the test will complete when it's
connected
+ addNewInstance();
+ }
+
+ private void validateJmxConnections(VertxTestContext context, Set<Integer>
jmxConnectedInstances,
+ Checkpoint notOkCheckpoint,
CountDownLatch firstTwoConnected)
+ {
+ int upInstanceCount = jmxConnectedInstances.size();
+ if (upInstanceCount == 2)
+ {
+ buildJmxHealthRequest(client,
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
+ logger.info("DBG: First two instances connected via JMX, third is
down");
+ firstTwoConnected.countDown();
+ }
+ else if (upInstanceCount == 3)
+ {
+ assertThat(jmxConnectedInstances).containsExactly(1, 2, 3);
+ }
+ }
+
+ private void validateNativeConnections(VertxTestContext context,
+ Checkpoint notOkCheckpoint,
+ CountDownLatch firstTwoConnected,
+ Set<Integer>
nativeConnectedInstances)
+ {
+ int upInstanceCount = nativeConnectedInstances.size();
+ if (upInstanceCount == 2)
+ {
+ assertThat(nativeConnectedInstances).containsExactly(1, 2);
+ buildNativeHealthRequest(client,
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
+ logger.info("DBG: First two instances connected via native, third
is down");
+ firstTwoConnected.countDown();
+ }
+ else if (upInstanceCount == 3)
+ {
+ assertThat(nativeConnectedInstances).containsExactly(1, 2, 3);
+ }
+ }
+
+ private static Handler<AsyncResult<HttpResponse<Buffer>>>
assertHealthCheckOk(VertxTestContext context,
+
Checkpoint checkpoint)
+ {
+ return context.succeeding(response -> context.verify(() -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
+ }));
+ }
+
+ private Handler<AsyncResult<HttpResponse<Buffer>>>
assertHealthCheckNotOk(VertxTestContext context,
+
Checkpoint checkpoint)
+ {
+ return context.succeeding(response -> context.verify(() -> {
+
assertThat(response.statusCode()).isEqualTo(SERVICE_UNAVAILABLE.code());
+
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("NOT_OK");
+ checkpoint.flag();
+ }));
+ }
+
+ private HttpRequest<Buffer> buildNativeHealthRequest(WebClient webClient,
int instanceId)
+ {
+ return webClient.get(server.actualPort(),
+ "localhost",
+ "/api/v1/cassandra/native/__health?instanceId=" +
instanceId);
+ }
+
+ private HttpRequest<Buffer> buildJmxHealthRequest(WebClient webClient, int
instanceId)
+ {
+ return webClient.get(server.actualPort(),
+ "localhost",
+ "/api/v1/cassandra/jmx/__health?instanceId=" +
instanceId);
+ }
+
+ private void addNewInstance()
+ {
+ UpgradeableCluster cluster = sidecarTestContext.cluster();
+ IUpgradeableInstance newInstance = ClusterUtils.addInstance(cluster,
cluster.get(1).config(), config -> {
+ config.set("auto_bootstrap", true);
+ config.with(Feature.GOSSIP,
+ Feature.JMX,
+ Feature.NATIVE_PROTOCOL);
+ });
+ newInstance.startup(cluster);
+ }
+}
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
deleted file mode 100644
index 7586a22..0000000
--- a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.common;
-
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import com.google.common.collect.ImmutableSet;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.Message;
-import io.vertx.core.json.JsonArray;
-import io.vertx.core.json.JsonObject;
-import io.vertx.junit5.Checkpoint;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.distributed.api.NodeToolResult;
-import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
-import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
-import org.apache.cassandra.testing.CassandraIntegrationTest;
-
-import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
-import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
-import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Ensures the Delegate works correctly
- */
-@ExtendWith(VertxExtension.class)
-class DelegateTest extends IntegrationTestBase
-{
- @CassandraIntegrationTest(jmx = false)
- void testCorrectVersionIsEnabled()
- {
- CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
-
.instanceFromId(1)
- .delegate();
- SimpleCassandraVersion version = delegate.version();
- assertThat(version).isNotNull();
- assertThat(version.major).isEqualTo(sidecarTestContext.version.major);
- assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor);
- assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version);
- }
-
- @CassandraIntegrationTest(jmx = false)
- void testHealthCheck(VertxTestContext context)
- {
- EventBus eventBus = vertx.eventBus();
- Checkpoint cqlReady = context.checkpoint();
- Checkpoint cqlDisconnected = context.checkpoint();
-
- CassandraAdapterDelegate adapterDelegate =
sidecarTestContext.instancesConfig()
-
.instanceFromId(1)
-
.delegate();
- assertThat(adapterDelegate.isUp()).as("health check
succeeds").isTrue();
-
- // Set up test listeners before disabling/enabling binary to avoid
race conditions
- // where the event happens before the consumer is registered.
- eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(),
(Message<JsonObject> message) -> {
- int instanceId = message.body().getInteger("cassandraInstanceId");
- CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
-
.instanceFromId(instanceId)
- .delegate();
-
- assertThat(delegate.isUp()).as("health check fails after binary
has been disabled").isFalse();
- cqlDisconnected.flag();
- sidecarTestContext.cluster().get(1).nodetool("enablebinary");
- });
-
- eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> reconnectMessage) -> {
- int instanceId =
reconnectMessage.body().getInteger("cassandraInstanceId");
- CassandraAdapterDelegate delegate =
sidecarTestContext.instancesConfig()
-
.instanceFromId(instanceId)
- .delegate();
- assertThat(delegate.isUp()).as("health check succeeds after binary
has been enabled")
- .isTrue();
- cqlReady.flag();
- });
-
- // Disable binary
- NodeToolResult nodetoolResult =
sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
- assertThat(nodetoolResult.getRc())
- .withFailMessage("Failed to disable binary:\nstdout:" +
nodetoolResult.getStdout()
- + "\nstderr: " + nodetoolResult.getStderr())
- .isEqualTo(0);
- // NOTE: enable binary happens inside the disable binary handler
above, which then will trigger the
- // cqlReady flag.
- }
-
- @CassandraIntegrationTest(jmx = false, nodesPerDc = 3)
- void testAllInstancesHealthCheck(VertxTestContext context)
- {
- EventBus eventBus = vertx.eventBus();
- Checkpoint allCqlReady = context.checkpoint();
-
- Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
- eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
(Message<JsonObject> message) -> {
- JsonArray cassandraInstanceIds =
message.body().getJsonArray("cassandraInstanceIds");
- assertThat(cassandraInstanceIds).hasSize(3);
- assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
- .allMatch(expectedCassandraInstanceIds::contains);
-
- allCqlReady.flag();
- });
- }
-}
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
index e643918..e152850 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
@@ -46,7 +46,7 @@ public class JmxClientTest
assertThat(opMode).isNotNull();
assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL",
"DECOMMISSIONED", "CLIENT");
- IUpgradeableInstance instance =
context.getCluster().getFirstRunningInstance();
+ IUpgradeableInstance instance =
context.cluster().getFirstRunningInstance();
IInstanceConfig config = instance.config();
assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
@@ -101,7 +101,7 @@ public class JmxClientTest
private static JmxClient createJmxClient(CassandraTestContext context)
{
- IUpgradeableInstance instance =
context.getCluster().getFirstRunningInstance();
+ IUpgradeableInstance instance =
context.cluster().getFirstRunningInstance();
IInstanceConfig config = instance.config();
return JmxClient.builder()
.host(config.broadcastAddress().getAddress().getHostAddress())
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
index 8a91324..4b0ab8c 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
@@ -52,7 +52,7 @@ class GossipInfoHandlerIntegrationTest extends
IntegrationTestBase
assertThat(gossipInfo.generation()).isNotNull();
assertThat(gossipInfo.heartbeat()).isNotNull();
assertThat(gossipInfo.hostId()).isNotNull();
- String releaseVersion =
cassandraTestContext.getCluster().getFirstRunningInstance()
+ String releaseVersion =
cassandraTestContext.cluster().getFirstRunningInstance()
.getReleaseVersionString();
assertThat(gossipInfo.releaseVersion()).startsWith(releaseVersion);
context.completeNow();
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
index 51f6828..80d86e2 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
@@ -78,7 +78,7 @@ class RingHandlerIntegrationTest extends IntegrationTestBase
@CassandraIntegrationTest(gossip = true)
void ringFailsWhenGossipIsDisabled(CassandraTestContext context,
VertxTestContext testContext) throws Exception
{
- int disableGossip =
context.getCluster().getFirstRunningInstance().nodetool("disablegossip");
+ int disableGossip =
context.cluster().getFirstRunningInstance().nodetool("disablegossip");
assertThat(disableGossip).isEqualTo(0);
String testRoute = "/api/v1/cassandra/ring";
testWithClient(testContext, client -> {
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
index 6331bd2..576ad2f 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
@@ -39,7 +39,7 @@ public class BasicGossipDisabledTest extends
BaseTokenRangeIntegrationTest
void tokenRangeEndpointFailsWhenGossipIsDisabled(CassandraTestContext
context, VertxTestContext testContext)
throws Exception
{
- int disableGossip =
context.getCluster().getFirstRunningInstance().nodetool("disablegossip");
+ int disableGossip =
context.cluster().getFirstRunningInstance().nodetool("disablegossip");
assertThat(disableGossip).isEqualTo(0);
retrieveMappingWithKeyspace(testContext, TEST_KEYSPACE, response -> {
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE.code());
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 0faa420..35a64bb 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -25,12 +25,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import com.datastax.driver.core.Session;
import io.vertx.core.Vertx;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.impl.AbstractClusterUtils;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
@@ -46,6 +48,7 @@ import
org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
import org.apache.cassandra.testing.AbstractCassandraTestContext;
+import org.jetbrains.annotations.NotNull;
import static org.assertj.core.api.Assertions.assertThat;
@@ -208,13 +211,13 @@ public class CassandraSidecarTestContext implements
AutoCloseable
UpgradeableCluster cluster = cluster();
List<InstanceMetadata> metadata = new ArrayList<>();
jmxClients = new ArrayList<>();
- List<InetSocketAddress> addresses = buildContactList(cluster);
+ List<InstanceConfig> configs = buildInstanceConfigs(cluster);
+ List<InetSocketAddress> addresses = buildContactList(configs);
sessionProvider = new CQLSessionProviderImpl(addresses, addresses,
500, null,
0,
SharedExecutorNettyOptions.INSTANCE);
- for (int i = 0; i < numInstancesToManage; i++)
+ for (int i = 0; i < configs.size(); i++)
{
- IUpgradeableInstance instance = cluster.get(i + 1); // 1-based
indexing to match node names;
- IInstanceConfig config = instance.config();
+ IInstanceConfig config = configs.get(i);
String hostName = JMXUtil.getJmxHost(config);
int nativeTransportPort = tryGetIntConfig(config,
"native_transport_port", 9042);
// The in-jvm dtest framework sometimes returns a cluster before
all the jmx infrastructure is initialized.
@@ -255,15 +258,25 @@ public class CassandraSidecarTestContext implements
AutoCloseable
return new InstancesConfigImpl(metadata, dnsResolver);
}
- private List<InetSocketAddress> buildContactList(UpgradeableCluster
cluster)
+ private List<InetSocketAddress> buildContactList(List<InstanceConfig>
configs)
{
- return cluster.stream()
- .map(i -> new
InetSocketAddress(i.config().broadcastAddress().getAddress(),
-
tryGetIntConfig(i.config(), "native_transport_port", 9042)))
- .limit(numInstancesToManage)
+ // Always return the complete list of addresses even if the cluster
isn't yet that large
+ // this way, we populate the entire local instance list
+ return configs.stream()
+ .map(config -> new
InetSocketAddress(config.broadcastAddress().getAddress(),
+
tryGetIntConfig(config, "native_transport_port", 9042)))
.collect(Collectors.toList());
}
+ @NotNull
+ private List<InstanceConfig> buildInstanceConfigs(UpgradeableCluster
cluster)
+ {
+ return IntStream.range(1, numInstancesToManage + 1)
+ .mapToObj(nodeNum ->
+
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum))
+ .collect(Collectors.toList());
+ }
+
/**
* A listener for {@link InstancesConfig} state changes
*/
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index c68eda2..64464dc 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -103,7 +103,8 @@ public abstract class IntegrationTestBase
if (sidecarTestContext.isClusterBuilt())
{
- MessageConsumer<Object> cqlReadyConsumer =
vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address());
+ MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus()
+
.localConsumer(ON_CASSANDRA_CQL_READY.address());
cqlReadyConsumer.handler(message -> {
cqlReadyConsumer.unregister();
context.completeNow();
@@ -166,7 +167,8 @@ public abstract class IntegrationTestBase
.instanceFromId(1)
.delegate();
- if (delegate.isUp() || !waitForCluster)
+ assertThat(delegate).isNotNull();
+ if (delegate.isNativeUp() || !waitForCluster)
{
tester.accept(client);
}
diff --git
a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
index 4aaa055..c63809f 100644
---
a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
+++
b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
@@ -87,6 +87,6 @@ public abstract class AbstractCassandraTestContext implements
AutoCloseable
public int clusterSize()
{
- return annotation.numDcs() * annotation.nodesPerDc();
+ return annotation.numDcs() * (annotation.nodesPerDc() +
annotation.newNodesPerDc());
}
}
diff --git
a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
index c52471d..29fa433 100644
---
a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
+++
b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
@@ -112,7 +112,7 @@ public @interface CassandraIntegrationTest
* or {@link
ConfigurableCassandraTestContext#configureAndStartCluster(Consumer)} to get the
cluster.
* NOTE: This cluster object must be closed by the test as the
framework doesn't have access to it.
* If true (the default), the test should take an instance of {@link
CassandraTestContext}
- * {@link CassandraTestContext#getCluster()} will contain the
built cluster.
+ * {@link CassandraTestContext#cluster()} will contain the built
cluster.
* @return true if the cluster should be built by the test framework,
false otherwise
*/
boolean buildCluster() default true;
diff --git
a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
index 8498776..09afe24 100644
---
a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
+++
b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
@@ -43,9 +43,4 @@ public class CassandraTestContext extends
AbstractCassandraTestContext
+ ", cluster=" + cluster
+ '}';
}
-
- public UpgradeableCluster getCluster()
- {
- return cluster;
- }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index d639944..0acd069 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -132,7 +132,7 @@ public class TestModule extends AbstractModule
.tokens(Collections.singleton("testToken"))
.build());
}
- when(delegate.isUp()).thenReturn(isUp);
+ when(delegate.isNativeUp()).thenReturn(isUp);
when(instanceMeta.delegate()).thenReturn(delegate);
return instanceMeta;
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
index f5001af..24536cd 100644
--- a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
@@ -371,9 +371,10 @@ class ServerSSLTest
assertThat(throwable).isNotNull()
.isInstanceOf(SSLHandshakeException.class)
.hasMessageContaining("Failed to create
SSL connection")
-
.hasCauseInstanceOf(SSLHandshakeException.class)
- .hasRootCauseMessage("No appropriate
protocol (protocol is disabled " +
- "or cipher suites
are inappropriate)");
+
.hasCauseInstanceOf(SSLHandshakeException.class);
+ assertThat(throwable.getCause().getMessage())
+ .containsAnyOf("No appropriate protocol (protocol is
disabled or cipher suites are inappropriate)",
+ "Received fatal alert: protocol_version");
context.completeNow();
}));
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
index 3ea3d84..28a44b0 100644
--- a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
@@ -36,6 +36,7 @@ import
org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.MockCassandraFactory;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
@@ -110,8 +111,9 @@ public class SnapshotUtils
if (delegate == null)
{
- delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider,
cqlSessionProvider1, null, null,
- "localhost1", 9042);
+ JmxClient mockJmxClient = mock(JmxClient.class);
+ delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider,
cqlSessionProvider1, mockJmxClient,
+ null, "localhost1", 9042);
}
InstanceMetadataImpl localhost = InstanceMetadataImpl.builder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]