This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72e38ea6 CASSSIDECAR-204: Improve integration test stability (#188)
72e38ea6 is described below

commit 72e38ea6eb181e6f1e60855b82f38dde13a8a812
Author: Yifan Cai <[email protected]>
AuthorDate: Mon Feb 10 10:10:52 2025 -0800

    CASSSIDECAR-204: Improve integration test stability (#188)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-204
---
 .circleci/config.yml                               |   4 +
 adapters/base/build.gradle                         |   1 +
 adapters/cassandra41/build.gradle                  |   1 +
 client-common/build.gradle                         |   1 +
 .../common/response/data/StreamsProgressStats.java |  15 ++
 client/build.gradle                                |   1 +
 gradle/common/integrationTestTask.gradle           |   1 +
 .../testing/SharedClusterIntegrationTestBase.java  |  13 +-
 .../routes/SchemaHandlerIntegrationTest.java       | 102 ++++++++++
 server-common/build.gradle                         |   1 +
 server/build.gradle                                |   2 +
 .../driver/SidecarLoadBalancingPolicyTest.java     |   4 +-
 .../sidecar/common/CQLSessionProviderTest.java     |   4 +-
 .../sidecar/common/DelegateIntegrationTest.java    |  87 ++++++---
 .../ClusterLeaseClaimTaskIntegrationTest.java      |   4 +-
 .../cassandra/sidecar/db/SidecarSchemaIntTest.java |  65 -------
 ...ConnectedClientStatsHandlerIntegrationTest.java |   8 +-
 .../routes/SchemaHandlerIntegrationTest.java       | 130 -------------
 .../sidecar/routes/StreamStatsIntegrationTest.java | 209 +++++++++------------
 .../SSTableImportHandlerIntegrationTest.java       |   3 +-
 .../routes/tokenrange/BasicMultiDCRf3Test.java     |   2 +
 .../testing/CassandraSidecarTestContext.java       |  65 +++++--
 .../sidecar/testing/IntegrationTestBase.java       |  88 +++++----
 .../sidecar/testing/IntegrationTestModule.java     |  60 +++++-
 .../testing/AbstractCassandraTestContext.java      |  22 ++-
 .../cassandra/testing/CassandraTestContext.java    |   3 +-
 .../cassandra/testing/CassandraTestTemplate.java   |  20 +-
 .../testing/ConfigurableCassandraTestContext.java  |   4 +-
 server/src/test/resources/logback-in-jvm-dtest.xml |  11 +-
 vertx-auth-mtls/build.gradle                       |   1 +
 vertx-client-shaded/build.gradle                   |   1 +
 vertx-client/build.gradle                          |   1 +
 32 files changed, 486 insertions(+), 448 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 47e509a5..1421a345 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -89,6 +89,10 @@ jobs:
           path: build/reports
           destination: test-reports
 
+      - store_artifacts:
+          path: build/test-results/
+          destination: test-results
+
       - store_test_results:
           path: build/test-results/
 
diff --git a/adapters/base/build.gradle b/adapters/base/build.gradle
index ad379914..ce80e8e7 100644
--- a/adapters/base/build.gradle
+++ b/adapters/base/build.gradle
@@ -52,6 +52,7 @@ test {
         println("Destination directory for adapters-base tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/adapters/cassandra41/build.gradle 
b/adapters/cassandra41/build.gradle
index 190e09ea..bc8f48ce 100644
--- a/adapters/cassandra41/build.gradle
+++ b/adapters/cassandra41/build.gradle
@@ -47,6 +47,7 @@ test {
         println("Destination directory for adapters-cassandra41 tests: 
${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/client-common/build.gradle b/client-common/build.gradle
index b2bb3cdb..605c5d1a 100644
--- a/client-common/build.gradle
+++ b/client-common/build.gradle
@@ -48,6 +48,7 @@ test {
         println("Destination directory for client-common tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
index 88d74077..9b1d990d 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
@@ -104,4 +104,19 @@ public class StreamsProgressStats
     {
         return totalBytesSent;
     }
+
+    @Override
+    public String toString()
+    {
+        return "StreamsProgressStats{" +
+               "totalFilesToReceive=" + totalFilesToReceive +
+               ", totalFilesReceived=" + totalFilesReceived +
+               ", totalBytesToReceive=" + totalBytesToReceive +
+               ", totalBytesReceived=" + totalBytesReceived +
+               ", totalFilesToSend=" + totalFilesToSend +
+               ", totalFilesSent=" + totalFilesSent +
+               ", totalBytesToSend=" + totalBytesToSend +
+               ", totalBytesSent=" + totalBytesSent +
+               '}';
+    }
 }
diff --git a/client/build.gradle b/client/build.gradle
index 194c6b66..1f886038 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -55,6 +55,7 @@ test {
         println("Destination directory for client tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/gradle/common/integrationTestTask.gradle 
b/gradle/common/integrationTestTask.gradle
index 38547b61..c0ce425b 100644
--- a/gradle/common/integrationTestTask.gradle
+++ b/gradle/common/integrationTestTask.gradle
@@ -66,6 +66,7 @@ apply from: 
"${project.rootDir}/gradle/common/java11Options.gradle"
             println("Destination directory for integration tests: ${destDir}")
             junitXml.getOutputLocation().set(destDir)
             html.setRequired(true)
+            html.getOutputLocation().set(destDir)
         }
         testLogging {
             events "started", "passed", "skipped", "failed"
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index e14977db..09f00803 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -156,6 +156,7 @@ public abstract class SharedClusterIntegrationTestBase
     protected Server server;
     protected TestVersion testVersion;
     protected MtlsTestHelper mtlsTestHelper;
+    private final CountDownLatch sidecarSchemaReadyLatch = new 
CountDownLatch(1);
     private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
     private Injector sidecarServerInjector;
 
@@ -350,6 +351,9 @@ public abstract class SharedClusterIntegrationTestBase
         AbstractModule testModule = new IntegrationTestModule(instances, 
classLoaderWrapper, mtlsTestHelper,
                                                               dnsResolver, 
configurationOverrides());
         sidecarServerInjector = Guice.createInjector(Modules.override(new 
MainModule()).with(testModule));
+        Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
+        vertx.eventBus()
+             
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg 
-> sidecarSchemaReadyLatch.countDown());
         Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
         sidecarServer.start()
                      .onSuccess(s -> context.completeNow())
@@ -362,15 +366,10 @@ public abstract class SharedClusterIntegrationTestBase
     protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
     {
         assertThat(sidecarServerInjector)
-        .describedAs("Sidecar is started")
+        .describedAs("Sidecar should be started")
         .isNotNull();
 
-        CountDownLatch latch = new CountDownLatch(1);
-        Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
-        vertx.eventBus()
-             
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg 
-> latch.countDown());
-
-        assertThat(Uninterruptibles.awaitUninterruptibly(latch, timeout, 
timeUnit))
+        
assertThat(Uninterruptibles.awaitUninterruptibly(sidecarSchemaReadyLatch, 
timeout, timeUnit))
         .describedAs("Sidecar schema is not initialized after " + timeout + ' 
' + timeUnit)
         .isTrue();
     }
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
new file mode 100644
index 00000000..c1c1e044
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import org.apache.cassandra.sidecar.common.response.SchemaResponse;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SchemaHandlerIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace("testkeyspace", Map.of("replication_factor", 1));
+        createTestKeyspace("\"Cycling\"", Map.of("replication_factor", 1));
+        createTestKeyspace("\"keyspace\"", Map.of("replication_factor", 1));
+    }
+
+    @Test
+    void testListKeyspaces()
+    {
+        String testRoute = "/api/v1/schema/keyspaces";
+        SchemaResponse response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+                                                             
.expect(ResponsePredicate.SC_OK)
+                                                             .send())
+                                  .bodyAsJson(SchemaResponse.class);
+        assertThat(response).isNotNull();
+        assertThat(response.keyspace()).isNull();
+        assertThat(response.schema()).isNotNull();
+    }
+
+    @Test
+    void testSchemaHandlerKeyspaceDoesNotExist()
+    {
+        String testRoute = "/api/v1/schema/keyspaces/non_existent";
+        getBlocking(trustedClient().get(server.actualPort(), "localhost", 
testRoute)
+                                   .expect(ResponsePredicate.SC_NOT_FOUND)
+                                   .send());
+    }
+
+    @Test
+    void testSchemaHandlerWithKeyspace()
+    {
+        String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
+        SchemaResponse response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+                                                             
.expect(ResponsePredicate.SC_OK)
+                                                             .send())
+                                  .bodyAsJson(SchemaResponse.class);
+        assertThat(response).isNotNull();
+        assertThat(response.keyspace()).isEqualTo("testkeyspace");
+        assertThat(response.schema()).isNotNull();
+    }
+
+    @Test
+    void testSchemaHandlerWithCaseSensitiveKeyspace()
+    {
+        String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
+        SchemaResponse response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+                                                             
.expect(ResponsePredicate.SC_OK)
+                                                             .send())
+                                  .bodyAsJson(SchemaResponse.class);
+        assertThat(response).isNotNull();
+        assertThat(response.keyspace()).isEqualTo("Cycling");
+        assertThat(response.schema()).isNotNull();
+    }
+
+    @Test
+    void testSchemaHandlerWithReservedKeywordKeyspace()
+    {
+        String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
+        SchemaResponse response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
+                                                             
.expect(ResponsePredicate.SC_OK)
+                                                             .send())
+                                  .bodyAsJson(SchemaResponse.class);
+        assertThat(response).isNotNull();
+        assertThat(response.keyspace()).isEqualTo("keyspace");
+        assertThat(response.schema()).isNotNull();
+    }
+}
diff --git a/server-common/build.gradle b/server-common/build.gradle
index 755b3dd1..1d900abf 100644
--- a/server-common/build.gradle
+++ b/server-common/build.gradle
@@ -48,6 +48,7 @@ test {
         println("Destination directory for server-common tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/server/build.gradle b/server/build.gradle
index 54c4c7bb..2cff67d0 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -205,6 +205,7 @@ test {
         println("Destination directory for unit tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
     testLogging {
         events "passed", "skipped", "failed"
@@ -230,6 +231,7 @@ tasks.register("containerTest", Test) {
         println("Destination directory for testcontainer tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
     testLogging {
         events "passed", "skipped", "failed"
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
index 8652c48d..0d94c30d 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/cluster/driver/SidecarLoadBalancingPolicyTest.java
@@ -55,9 +55,9 @@ public class SidecarLoadBalancingPolicyTest extends 
IntegrationTestBase
     }
 
     @Override
-    protected int getNumInstancesToManage(int clusterSize)
+    protected int[] getInstancesToManage(int clusterSize)
     {
-        return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 
2 instances in the "cluster"
+        return new int[] {1, 2}; // we only want to manage the first 2 
instances in the "cluster"
     }
 
     @CassandraIntegrationTest(nodesPerDc = 6)
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
index cf361d69..be12ac60 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
@@ -50,9 +50,9 @@ public class CQLSessionProviderTest extends 
IntegrationTestBase
     public static final String KEYSPACE_FAILED_RESPONSE_START = 
"{\"status\":\"Service Unavailable\",";
 
     @Override
-    protected int getNumInstancesToManage(int clusterSize)
+    protected int[] getInstancesToManage(int clusterSize)
     {
-        return 2;
+        return new int[] {1, 2};
     }
 
     @CassandraIntegrationTest(nodesPerDc = 2, startCluster = false)
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
index 94426fec..3bada1c4 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.sidecar.common;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import io.vertx.core.AsyncResult;
@@ -58,6 +60,7 @@ import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSAND
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -66,6 +69,20 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(VertxExtension.class)
 class DelegateIntegrationTest extends IntegrationTestBase
 {
+    private final AtomicReference<Message<JsonObject>> 
allCassandraCqlReadyMessage = new AtomicReference<>();
+
+    @BeforeEach
+    void reset()
+    {
+        allCassandraCqlReadyMessage.set(null);
+    }
+
+    @Override
+    protected void beforeServerStart()
+    {
+        vertx.eventBus().localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
allCassandraCqlReadyMessage::set);
+    }
+
     @CassandraIntegrationTest()
     void testCorrectVersionIsEnabled()
     {
@@ -102,8 +119,10 @@ class DelegateIntegrationTest extends IntegrationTestBase
                                                                   
.instanceFromId(instanceId)
                                                                   .delegate();
 
-            assertThat(delegate).isNotNull();
-            assertThat(delegate.isNativeUp()).as("health check fails after 
binary has been disabled").isFalse();
+            context.verify(() -> {
+                assertThat(delegate).isNotNull();
+                assertThat(delegate.isNativeUp()).as("health check fails after 
binary has been disabled").isFalse();
+            });
             cqlDisconnected.flag();
             sidecarTestContext.cluster().get(1).nodetool("enablebinary");
         });
@@ -113,9 +132,12 @@ class DelegateIntegrationTest extends IntegrationTestBase
             CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesMetadata()
                                                                   
.instanceFromId(instanceId)
                                                                   .delegate();
-            assertThat(delegate).isNotNull();
-            assertThat(delegate.isNativeUp()).as("health check succeeds after 
binary has been enabled")
-                                             .isTrue();
+            context.verify(() -> {
+                assertThat(delegate).isNotNull();
+                assertThat(delegate.isNativeUp()).as("health check succeeds 
after binary has been enabled")
+                                                 .isTrue();
+            });
+
             cqlReady.flag();
         });
 
@@ -130,19 +152,16 @@ class DelegateIntegrationTest extends IntegrationTestBase
     }
 
     @CassandraIntegrationTest(nodesPerDc = 3)
-    void testAllInstancesHealthCheck(VertxTestContext context)
+    void testAllInstancesHealthCheck()
     {
-        EventBus eventBus = vertx.eventBus();
-        Checkpoint allCqlReady = context.checkpoint();
-
         Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
-        eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+        loopAssert(30, 1000, () -> {
+            Message<JsonObject> message = allCassandraCqlReadyMessage.get();
+            assertThat(message).isNotNull();
             JsonArray cassandraInstanceIds = 
message.body().getJsonArray("cassandraInstanceIds");
             assertThat(cassandraInstanceIds).hasSize(3);
             assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
             .allMatch(expectedCassandraInstanceIds::contains);
-
-            allCqlReady.flag();
         });
     }
 
@@ -155,31 +174,37 @@ class DelegateIntegrationTest extends IntegrationTestBase
         Checkpoint jmxDisconnected = context.checkpoint();
 
         Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
-        eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+        eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
+            context.verify(() -> {
+                Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+                assertThat(instanceId).isEqualTo(2);
+
+                buildNativeHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
+            });
+        });
+
+        eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
+            context.verify(() -> {
+                Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+                assertThat(instanceId).isEqualTo(2);
+
+                buildJmxHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
+            });
+        });
+
+        loopAssert(30, 1000, () -> {
+            Message<JsonObject> message = allCassandraCqlReadyMessage.get();
+            assertThat(message).isNotNull();
             JsonArray cassandraInstanceIds = 
message.body().getJsonArray("cassandraInstanceIds");
             assertThat(cassandraInstanceIds).hasSize(3);
             assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
             .allMatch(expectedCassandraInstanceIds::contains);
 
             allCqlReady.flag();
-
             // Stop instance 2
             ClusterUtils.stopUnchecked(sidecarTestContext.cluster().get(2));
         });
 
-        eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
-            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
-            assertThat(instanceId).isEqualTo(2);
-
-            buildNativeHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
-        });
-
-        eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
-            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
-            assertThat(instanceId).isEqualTo(2);
-
-            buildJmxHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
-        });
     }
 
     @Timeout(value = 2, timeUnit = TimeUnit.MINUTES)
@@ -187,7 +212,7 @@ class DelegateIntegrationTest extends IntegrationTestBase
     public void testChangingClusterSize(VertxTestContext context) throws 
InterruptedException
     {
         // assume the sidecar has 3 managed instances, even though the cluster 
only starts with 2 instances initially
-        sidecarTestContext.setNumInstancesToManage(3);
+        sidecarTestContext.setInstancesToManage(1, 2, 3);
 
         EventBus eventBus = vertx.eventBus();
 
@@ -246,7 +271,7 @@ class DelegateIntegrationTest extends IntegrationTestBase
         }
         else if (upInstanceCount == 3)
         {
-            assertThat(jmxConnectedInstances).containsExactly(1, 2, 3);
+            context.verify(() -> 
assertThat(jmxConnectedInstances).containsExactly(1, 2, 3));
         }
     }
 
@@ -258,14 +283,14 @@ class DelegateIntegrationTest extends IntegrationTestBase
         int upInstanceCount = nativeConnectedInstances.size();
         if (upInstanceCount == 2)
         {
-            assertThat(nativeConnectedInstances).containsExactly(1, 2);
+            context.verify(() -> 
assertThat(nativeConnectedInstances).containsExactly(1, 2));
             buildNativeHealthRequest(client, 
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
             logger.info("DBG: First two instances connected via native, third 
is down");
             firstTwoConnected.countDown();
         }
         else if (upInstanceCount == 3)
         {
-            assertThat(nativeConnectedInstances).containsExactly(1, 2, 3);
+            context.verify(() -> 
assertThat(nativeConnectedInstances).containsExactly(1, 2, 3));
         }
     }
 
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
index edc7d15b..71fe6b28 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java
@@ -436,7 +436,7 @@ class ClusterLeaseClaimTaskIntegrationTest
         SimpleQueryResult rows = cluster.getFirstRunningInstance()
                                         .coordinator()
                                         .executeWithResult("SELECT * FROM 
sidecar_internal.sidecar_lease_v1 ALLOW FILTERING",
-                                                           
ConsistencyLevel.LOCAL_QUORUM);
+                                                           
ConsistencyLevel.SERIAL);
         return StreamSupport.stream(rows.spliterator(), false).count();
     }
 
@@ -446,7 +446,7 @@ class ClusterLeaseClaimTaskIntegrationTest
         cluster.getFirstRunningInstance()
                .coordinator()
                .execute("SELECT writetime(owner), owner FROM 
sidecar_internal.sidecar_lease_v1 WHERE name = 'cluster_lease_holder'",
-                        ConsistencyLevel.LOCAL_QUORUM);
+                        ConsistencyLevel.SERIAL);
         assertThat(result).isNotNull();
         assertThat(result).hasDimensions(1, 2);
         return result;
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
index d2e79f42..98e7fb99 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SidecarSchemaIntTest.java
@@ -20,22 +20,8 @@ package org.apache.cassandra.sidecar.db;
 
 import java.util.concurrent.TimeUnit;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import io.vertx.core.Vertx;
-import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
-import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
-import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
-import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
-import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
-import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
@@ -43,57 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class SidecarSchemaIntTest extends IntegrationTestBase
 {
-    @Override
-    protected void beforeSetup()
-    {
-        installTestSpecificModule(new AbstractModule()
-        {
-            @Provides
-            @Singleton
-            public ClusterLease clusterLease()
-            {
-                // start with INDETERMINATE to compete for a leaseholder 
first, then init schema
-                return new ClusterLease(ClusterLease.Ownership.INDETERMINATE);
-            }
-
-            @Provides
-            @Singleton
-            public CoordinationConfiguration 
clusterLeaseClaimTaskConfiguration()
-            {
-                // increase the claim frequency
-                PeriodicTaskConfiguration taskConfig = new 
PeriodicTaskConfigurationImpl(true,
-                                                                               
          MillisecondBoundConfiguration.parse("1s"),
-                                                                               
          MillisecondBoundConfiguration.parse("1s"));
-                return new CoordinationConfigurationImpl(taskConfig);
-            }
-
-            @Provides
-            @Singleton
-            public ClusterLeaseClaimTask clusterLeaseClaimTask(Vertx vertx,
-                                                               
ServiceConfiguration serviceConfiguration,
-                                                               
ElectorateMembership electorateMembership,
-                                                               
SidecarLeaseDatabaseAccessor accessor,
-                                                               ClusterLease 
clusterLease,
-                                                               SidecarMetrics 
metrics)
-            {
-                return new ClusterLeaseClaimTask(vertx,
-                                                 serviceConfiguration,
-                                                 electorateMembership,
-                                                 accessor,
-                                                 clusterLease,
-                                                 metrics)
-                {
-                    @Override
-                    public DurationSpec delay()
-                    {
-                        // ignore the minimum delay check that is coded in 
ClusterLeaseClaimTask
-                        return MillisecondBoundConfiguration.parse("1s");
-                    }
-                };
-            }
-        });
-    }
-
     @CassandraIntegrationTest
     void testSidecarSchemaInitializationFromBlank()
     {
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
index 0a592bf6..b2ba2b1a 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandlerIntegrationTest.java
@@ -88,6 +88,9 @@ public class ConnectedClientStatsHandlerIntegrationTest 
extends IntegrationTestB
         createTestKeyspace();
         Session session = maybeGetSession();
         session.execute("USE " + TEST_KEYSPACE);
+        // create an additional pair of connections
+        sidecarTestContext.buildNewCqlSessionProvider()
+                          .get();
 
         Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", false);
         String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
@@ -105,8 +108,9 @@ public class ConnectedClientStatsHandlerIntegrationTest 
extends IntegrationTestB
     void retrieveClientStatsMultipleConnections(VertxTestContext context)
     throws Exception
     {
-        // Creates an additional connection pair
-        createTestKeyspace();
+        // create an additional pair of connections
+        sidecarTestContext.buildNewCqlSessionProvider()
+                          .get();
         Map<String, Boolean> expectedParams = 
Collections.singletonMap("summary", false);
         String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
         testWithClient(context, client -> {
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
deleted file mode 100644
index e985383a..00000000
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.sidecar.routes;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import com.datastax.driver.core.Session;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.common.response.SchemaResponse;
-import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.testing.CassandraIntegrationTest;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Integration tests for the {@link SchemaHandler}
- */
-@ExtendWith(VertxExtension.class)
-class SchemaHandlerIntegrationTest extends IntegrationTestBase
-{
-    @CassandraIntegrationTest
-    void schemaHandlerNoKeyspace(VertxTestContext context) throws Exception
-    {
-        String testRoute = "/api/v1/schema/keyspaces";
-        testWithClient(context, client -> {
-            client.get(server.actualPort(), "127.0.0.1", testRoute)
-                  .expect(ResponsePredicate.SC_OK)
-                  .send(context.succeeding(response -> {
-                      SchemaResponse schemaResponse = 
response.bodyAsJson(SchemaResponse.class);
-                      assertThat(schemaResponse).isNotNull();
-                      assertThat(schemaResponse.keyspace()).isNull();
-                      assertThat(schemaResponse.schema()).isNotNull();
-                      context.completeNow();
-                  }));
-        });
-    }
-
-    @CassandraIntegrationTest
-    void schemaHandlerKeyspaceDoesNotExist(VertxTestContext context) throws 
Exception
-    {
-        String testRoute = "/api/v1/schema/keyspaces/non_existent";
-        testWithClient(context, client -> {
-            client.get(server.actualPort(), "127.0.0.1", testRoute)
-                  .expect(ResponsePredicate.SC_NOT_FOUND)
-                  .send(context.succeedingThenComplete());
-        });
-    }
-
-    @CassandraIntegrationTest
-    void schemaHandlerWithKeyspace(VertxTestContext context) throws Exception
-    {
-        createTestKeyspace();
-
-        String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
-        testWithClient(context, client -> {
-            client.get(server.actualPort(), "127.0.0.1", testRoute)
-                  .expect(ResponsePredicate.SC_OK)
-                  .send(context.succeeding(response -> {
-                      SchemaResponse schemaResponse = 
response.bodyAsJson(SchemaResponse.class);
-                      assertThat(schemaResponse).isNotNull();
-                      
assertThat(schemaResponse.keyspace()).isEqualTo("testkeyspace");
-                      assertThat(schemaResponse.schema()).isNotNull();
-                      context.completeNow();
-                  }));
-        });
-    }
-
-    @CassandraIntegrationTest
-    void schemaHandlerWithCaseSensitiveKeyspace(VertxTestContext context) 
throws Exception
-    {
-        try (Session session = maybeGetSession())
-        {
-            session.execute("CREATE KEYSPACE \"Cycling\"" +
-                            " WITH REPLICATION = { 'class' : 
'NetworkTopologyStrategy', 'replication_factor' : 1 };");
-        }
-
-        String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
-        testWithClient(context, client -> {
-            client.get(server.actualPort(), "127.0.0.1", testRoute)
-                  .expect(ResponsePredicate.SC_OK)
-                  .send(context.succeeding(response -> {
-                      SchemaResponse schemaResponse = 
response.bodyAsJson(SchemaResponse.class);
-                      assertThat(schemaResponse).isNotNull();
-                      
assertThat(schemaResponse.keyspace()).isEqualTo("Cycling");
-                      assertThat(schemaResponse.schema()).isNotNull();
-                      context.completeNow();
-                  }));
-        });
-    }
-
-    @CassandraIntegrationTest
-    void schemaHandlerWithReservedKeywordKeyspace(VertxTestContext context) 
throws Exception
-    {
-        try (Session session = maybeGetSession())
-        {
-            session.execute("CREATE KEYSPACE \"keyspace\"" +
-                            " WITH REPLICATION = { 'class' : 
'NetworkTopologyStrategy', 'replication_factor' : 1 };");
-        }
-
-        String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
-        testWithClient(context, client -> {
-            client.get(server.actualPort(), "127.0.0.1", testRoute)
-                  .expect(ResponsePredicate.SC_OK)
-                  .send(context.succeeding(response -> {
-                      SchemaResponse schemaResponse = 
response.bodyAsJson(SchemaResponse.class);
-                      assertThat(schemaResponse).isNotNull();
-                      
assertThat(schemaResponse.keyspace()).isEqualTo("keyspace");
-                      assertThat(schemaResponse.schema()).isNotNull();
-                      context.completeNow();
-                  }));
-        });
-    }
-}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
index 6356ab3f..87d95ed0 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
@@ -18,40 +18,26 @@
 
 package org.apache.cassandra.sidecar.routes;
 
-import java.util.concurrent.Callable;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import com.datastax.driver.core.Session;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.dynamic.ClassFileLocator;
-import net.bytebuddy.dynamic.TypeResolutionStrategy;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.implementation.MethodDelegation;
-import net.bytebuddy.implementation.bind.annotation.SuperCall;
-import net.bytebuddy.pool.TypePool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
 import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
-import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.testing.CassandraTestContext;
 
-import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
 import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -59,74 +45,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * Tests the stream stats endpoint with cassandra container.
  */
-@ExtendWith(VertxExtension.class)
 public class StreamStatsIntegrationTest extends IntegrationTestBase
 {
-    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2, 
network = true, buildCluster = false)
-    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws Exception
-    {
-        BBHelperDecommissioningNode.reset();
-        UpgradeableCluster cluster = 
cassandraTestContext.configureAndStartCluster(
-        builder -> 
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
-        IUpgradeableInstance node = cluster.get(2);
-
-        createTestKeyspace();
-        createTestTableAndPopulate();
-
-        startAsync("Decommission node" + node.config().num(),
-                   () -> node.nodetoolResult("decommission", 
"--force").asserts().success());
-        AtomicBoolean hasStats = new AtomicBoolean(false);
-        AtomicBoolean dataReceived = new AtomicBoolean(false);
-
-        // Wait until nodes have reached expected state
-        awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, 
TimeUnit.MINUTES, "transientStateStart");
-
-        // optimal no. of attempts to poll for stats to capture streaming 
stats during node decommissioning
-        loopAssert(15, 200, () -> {
-            StreamsProgressStats progressStats = streamStats(hasStats, 
dataReceived);
-            assertThat(hasStats).isTrue();
-            assertThat(dataReceived)
-            .describedAs("Stream Progress Stats - totalFilesReceived:" + 
progressStats.totalFilesReceived() +
-                         " totalBytesReceived:" + 
progressStats.totalBytesReceived())
-            .isTrue();
-        });
-        ClusterUtils.awaitGossipStatus(node, node, "LEFT");
-        BBHelperDecommissioningNode.transientStateEnd.countDown();
-
-        context.completeNow();
-        context.awaitCompletion(2, TimeUnit.MINUTES);
-    }
-
-    private StreamsProgressStats streamStats(AtomicBoolean hasStats, 
AtomicBoolean dataReceived)
-    {
-        String testRoute = "/api/v1/cassandra/stats/streams";
-        HttpResponse<Buffer> resp;
-        resp = getBlocking(client.get(server.actualPort(), "127.0.0.1", 
testRoute)
-                                 .send());
-        return assertStreamStatsResponseOK(resp, hasStats, dataReceived);
-    }
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsIntegrationTest.class);
 
-    StreamsProgressStats assertStreamStatsResponseOK(HttpResponse<Buffer> 
response, AtomicBoolean hasStats, AtomicBoolean dataReceived)
+    @CassandraIntegrationTest(nodesPerDc = 2, network = true)
+    void streamStatsTest(CassandraTestContext cassandraTestContext)
     {
-        
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
-        StreamStatsResponse streamStatsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
-        assertThat(streamStatsResponse).isNotNull();
-        StreamsProgressStats streamProgress = 
streamStatsResponse.streamsProgressStats();
-        assertThat(streamProgress).isNotNull();
-        if (streamProgress.totalFilesToReceive() > 0)
-        {
-            hasStats.set(true);
-            if (streamProgress.totalFilesReceived() > 0)
-            {
-                dataReceived.set(true);
-                
assertThat(streamProgress.totalBytesReceived()).isGreaterThan(0);
-            }
-        }
-        return streamProgress;
-    }
+        UpgradeableCluster cluster = cassandraTestContext.cluster();
 
-    QualifiedTableName createTestTableAndPopulate()
-    {
+        createTestKeyspace(Map.of("datacenter1", 2));
         QualifiedTableName tableName = createTestTable(
         "CREATE TABLE %s ( \n" +
         "  race_year int, \n" +
@@ -135,54 +63,97 @@ public class StreamStatsIntegrationTest extends 
IntegrationTestBase
         "  rank int, \n" +
         "  PRIMARY KEY ((race_year, race_name), rank) \n" +
         ");");
-        Session session = maybeGetSession();
+        // craft inconsistency for repair
+        populateDataAtNode2Only(cluster, tableName);
+
+        // Poll stream stats while repair is running in the background.
+        CountDownLatch testStart = new CountDownLatch(1);
+        IUpgradeableInstance node = cluster.get(1);
+        AtomicReference<RuntimeException> nodetoolError = new 
AtomicReference<>();
+        startRepairAsync(node, testStart, tableName, nodetoolError);
+
+        TestState testState = new TestState();
+        testStart.countDown();
+        loopAssert(10, 500, () -> {
+            if (nodetoolError.get() != null)
+            {
+                throw nodetoolError.get();
+            }
+            streamStats(testState);
+            testState.assertCompletion();
+        });
+    }
 
-        session.execute("CREATE INDEX ryear ON " + tableName + " 
(race_year);");
+    private void startRepairAsync(IUpgradeableInstance node, CountDownLatch 
testStart, QualifiedTableName tableName, AtomicReference<RuntimeException> 
nodetoolError)
+    {
+        startAsync("Repairing node" + node.config().num(),
+                   () -> {
+                       Uninterruptibles.awaitUninterruptibly(testStart);
+                       try
+                       {
+                           node.nodetoolResult("repair", tableName.keyspace(), 
tableName.tableName(), "--full").asserts().success();
+                       }
+                       catch (Throwable cause)
+                       {
+                           nodetoolError.set(new RuntimeException("Nodetool 
failed", cause));
+                       }
+                   });
+    }
 
-        for (int i = 1; i <= 1000; i++)
-        {
-            session.execute("INSERT INTO " + tableName + " (race_year, 
race_name, rank, cyclist_name) " +
-                            "VALUES (2015, 'Tour of Japan - Stage 4 - Minami > 
Shinshu', " + i + ", 'Benjamin PRADES');");
-        }
-        return tableName;
+    private void streamStats(TestState testState)
+    {
+        String testRoute = "/api/v1/cassandra/stats/streams";
+        StreamStatsResponse streamStatsResponse = 
getBlocking(client.get(server.actualPort(), "127.0.0.1", testRoute)
+                                                                    
.expect(ResponsePredicate.SC_OK)
+                                                                    .send())
+                                                  
.bodyAsJson(StreamStatsResponse.class);
+        assertThat(streamStatsResponse).isNotNull();
+        StreamsProgressStats streamProgress = 
streamStatsResponse.streamsProgressStats();
+        assertThat(streamProgress).isNotNull();
+        LOGGER.info("Fetched {}", streamProgress);
+        testState.update(streamProgress);
     }
 
-    /**
-     * ByteBuddy Helper for decommissioning node
-     */
-    public static class BBHelperDecommissioningNode
+    static class TestState
     {
-        static CountDownLatch transientStateStart = new CountDownLatch(1);
-        static CountDownLatch transientStateEnd = new CountDownLatch(1);
+        StreamsProgressStats lastStats;
+        boolean streamStarted = false, streamCompleted = false;
 
-        public static void install(ClassLoader cl, Integer nodeNumber)
+        void update(StreamsProgressStats streamProgress)
         {
-            if (nodeNumber == 2)
+            lastStats = streamProgress;
+            if (streamProgress.totalFilesToReceive() > 0)
+            {
+                streamStarted = true;
+            }
+
+            if (streamStarted && streamProgress.totalFilesReceived() == 
streamProgress.totalFilesToReceive())
             {
-                TypePool typePool = TypePool.Default.of(cl);
-                TypeDescription description = 
typePool.describe("org.apache.cassandra.streaming.StreamCoordinator")
-                                                      .resolve();
-                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
-                               .method(named("connectAllStreamSessions"))
-                               
.intercept(MethodDelegation.to(BBHelperDecommissioningNode.class))
-                               // Defer class loading until all dependencies 
are loaded
-                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
-                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+                streamCompleted = true;
             }
         }
 
-        @SuppressWarnings("unused")
-        public static void connectAllStreamSessions(@SuperCall 
Callable<StreamOperation> orig) throws Exception
+        void assertCompletion()
         {
-            transientStateStart.countDown();
-            Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
-            orig.call();
+            assertThat(streamStarted)
+            .describedAs("Expecting to have non-empty stream stats. last 
stats: " + lastStats)
+            .isTrue();
+            assertThat(streamCompleted)
+            .describedAs("Expecting to complete. last stats: " + lastStats)
+            .isTrue();
         }
+    }
 
-        public static void reset()
+    void populateDataAtNode2Only(UpgradeableCluster cluster, 
QualifiedTableName tableName)
+    {
+        IInstance node = cluster.get(2);
+        // disable compaction for the table to have more file to stream
+        node.nodetoolResult("disableautocompaction", tableName.keyspace(), 
tableName.tableName()).asserts().success();
+        for (int i = 1; i <= 100; i++)
         {
-            transientStateStart = new CountDownLatch(1);
-            transientStateEnd = new CountDownLatch(1);
+            node.executeInternal("INSERT INTO " + tableName + " (race_year, 
race_name, rank, cyclist_name) " +
+                                 "VALUES (2015, 'Tour of Japan - Stage 4 - 
Minami > Shinshu', " + i + ", 'Benjamin PRADES');");
+            node.flush(TEST_KEYSPACE);
         }
     }
 }
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
index 21d46ce3..5b43c82f 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
@@ -75,9 +75,8 @@ public class SSTableImportHandlerIntegrationTest extends 
IntegrationTestBase
         // Truncate the table, insert more data.
         // Test the import SSTable endpoint by importing data that was 
originally truncated.
         // Verify by querying the table contains all the results before 
truncation and after truncation.
-
-        Session session = maybeGetSession();
         createTestKeyspace();
+        Session session = maybeGetSession();
         QualifiedTableName tableName = 
createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
 
         // create a snapshot called <tableName>-snapshot for tbl1
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
index ff0cbda3..0304ea5b 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicMultiDCRf3Test.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import io.vertx.junit5.VertxExtension;
@@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Note: Some related test classes are broken down to have a single test case 
to parallelize test execution and
  * therefore limit the instance size required to run the tests from CircleCI 
as the in-jvm-dtests tests are memory bound
  */
+@Tag("heavy")
 @ExtendWith(VertxExtension.class)
 class BasicMultiDCRf3Test extends BaseTokenRangeIntegrationTest
 {
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 7377cf26..ba2e7ecd 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -73,7 +74,8 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
     private final AbstractCassandraTestContext abstractCassandraTestContext;
     private final Vertx vertx;
     private final List<InstancesMetadataListener> instancesMetadataListeners;
-    private int numInstancesToManage;
+    // array of nodeNums that are 1-based
+    private int[] instancesToManage = null;
     public InstancesMetadata instancesMetadata;
     private List<JmxClient> jmxClients;
     private CQLSessionProvider sessionProvider;
@@ -86,11 +88,11 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
                                         SimpleCassandraVersion version,
                                         CassandraVersionProvider 
versionProvider,
                                         DnsResolver dnsResolver,
-                                        int numInstancesToManage,
+                                        int[] instancesToManage,
                                         SslConfiguration sslConfiguration)
     {
         this.vertx = vertx;
-        this.numInstancesToManage = numInstancesToManage;
+        this.instancesToManage = instancesToManage;
         this.instancesMetadataListeners = new ArrayList<>();
         this.abstractCassandraTestContext = abstractCassandraTestContext;
         this.version = version;
@@ -102,7 +104,7 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
     public static CassandraSidecarTestContext from(Vertx vertx,
                                                    
AbstractCassandraTestContext cassandraTestContext,
                                                    DnsResolver dnsResolver,
-                                                   int numInstancesToManage,
+                                                   int[] instancesToManage,
                                                    SslConfiguration 
sslConfiguration)
     {
         org.apache.cassandra.testing.SimpleCassandraVersion rootVersion = 
cassandraTestContext.version;
@@ -115,7 +117,7 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
                                                versionParsed,
                                                versionProvider,
                                                dnsResolver,
-                                               numInstancesToManage,
+                                               instancesToManage,
                                                sslConfiguration);
     }
 
@@ -165,9 +167,9 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         return cluster;
     }
 
-    public void setNumInstancesToManage(int numInstancesToManage)
+    public void setInstancesToManage(int... instancesToManage)
     {
-        this.numInstancesToManage = numInstancesToManage;
+        this.instancesToManage = instancesToManage;
         refreshInstancesMetadata();
     }
 
@@ -178,16 +180,16 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         refreshInstancesMetadata();
     }
 
-    public InstancesMetadata instancesMetadata()
+    public synchronized InstancesMetadata instancesMetadata()
     {
         if (instancesMetadata == null)
         {
-            refreshInstancesMetadata();
+            return refreshInstancesMetadata();
         }
         return this.instancesMetadata;
     }
 
-    public InstancesMetadata refreshInstancesMetadata()
+    public synchronized InstancesMetadata refreshInstancesMetadata()
     {
         // clean-up any open sessions or client resources
         close();
@@ -226,6 +228,8 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         {
             instancesMetadata.instances().forEach(instance -> 
instance.delegate().close());
         }
+
+        closeSessionProvider();
     }
 
     private void setInstancesMetadata()
@@ -237,8 +241,18 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         }
     }
 
-    private InstancesMetadata buildInstancesMetadata(CassandraVersionProvider 
versionProvider,
-                                                     DnsResolver dnsResolver)
+    public CQLSessionProviderImpl buildNewCqlSessionProvider()
+    {
+        UpgradeableCluster cluster = cluster();
+        List<IInstanceConfig> configs = buildInstanceConfigs(cluster);
+        List<InetSocketAddress> addresses = buildContactList(configs);
+        return new CQLSessionProviderImpl(addresses, addresses, 500, null,
+                                          0, username, password,
+                                          sslConfiguration, 
SharedExecutorNettyOptions.INSTANCE);
+    }
+
+    private synchronized InstancesMetadata 
buildInstancesMetadata(CassandraVersionProvider versionProvider,
+                                                                  DnsResolver 
dnsResolver)
     {
         UpgradeableCluster cluster = cluster();
         List<InstanceMetadata> metadata = new ArrayList<>();
@@ -317,25 +331,42 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
     @NotNull
     private List<IInstanceConfig> buildInstanceConfigs(UpgradeableCluster 
cluster)
     {
-        int nodes = numInstancesToManage == -1 ? cluster.size() : 
numInstancesToManage;
-        return IntStream.range(1, nodes + 1)
+        Set<Integer> testManagedInstances;
+        int maxNodeNum;
+        if (instancesToManage == null)
+        {
+            testManagedInstances = null;
+            maxNodeNum = cluster.size();
+        }
+        else
+        {
+            testManagedInstances = 
Arrays.stream(instancesToManage).boxed().collect(Collectors.toSet());
+            // throws if test sets an empty array, it is a test configuration 
error
+            maxNodeNum = Arrays.stream(instancesToManage).max().getAsInt();
+        }
+        return IntStream.range(1, maxNodeNum + 1)
                         .mapToObj(nodeNum -> {
                             // check whether the instances are managed by the 
test framework first. Because the nodeNum might be greater than the cluster size
                             if (manageInstanceByTestFramework() && 
cluster.get(nodeNum).isShutdown())
                             {
                                 return null;
                             }
-                            else
+
+                            // Test supplies instances to manage. However, the 
set does not contain this nodeNum
+                            if (testManagedInstances != null && 
!testManagedInstances.contains(nodeNum))
                             {
-                                return 
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum);
+                                return null;
                             }
+
+                            // The node should be managed by sidecar
+                            return 
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum);
                         })
                         .collect(Collectors.toList());
     }
 
     private boolean manageInstanceByTestFramework()
     {
-        return numInstancesToManage == -1;
+        return instancesToManage == null;
     }
 
     /**
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index 614cb445..55a5656d 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -113,6 +114,7 @@ public abstract class IntegrationTestBase
     protected Injector injector;
     private final List<Throwable> testExceptions = new ArrayList<>();
     private Module testSpecificModule;
+    private CountDownLatch schemaInitialized = new CountDownLatch(1);;
 
     @BeforeEach
     void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo 
testInfo) throws Exception
@@ -143,7 +145,9 @@ public abstract class IntegrationTestBase
         Module mergedModule = modules.stream().reduce((m1, m2) -> 
Modules.override(m1).with(m2)).get();
         injector = Guice.createInjector(mergedModule);
         vertx = injector.getInstance(Vertx.class);
-
+        // register the handler for ON_SIDECAR_SCHEMA_INITIALIZED the earliest
+        
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
+                                       msg -> schemaInitialized.countDown());
         SslConfiguration sslConfig = 
cassandraTestContext.annotation.authMode().equals(AuthMode.MUTUAL_TLS)
                                      ? sslConfigWithClientKeystoreTruststore() 
: null;
 
@@ -155,13 +159,14 @@ public abstract class IntegrationTestBase
             sslConfig = sslConfigWithTruststore();
         }
         sidecarTestContext = CassandraSidecarTestContext.from(vertx, 
cassandraTestContext, DnsResolver.DEFAULT,
-                                                              
getNumInstancesToManage(clusterSize), sslConfig);
+                                                              
getInstancesToManage(clusterSize), sslConfig);
         integrationTestModule.setCassandraTestContext(sidecarTestContext);
 
         server = injector.getInstance(Server.class);
         VertxTestContext context = new VertxTestContext();
 
-        if (sidecarTestContext.isClusterBuilt())
+        boolean isClusterBuilt = sidecarTestContext.isClusterBuilt();
+        if (isClusterBuilt)
         {
             MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus()
                                                                 
.localConsumer(ON_CASSANDRA_CQL_READY.address());
@@ -172,10 +177,11 @@ public abstract class IntegrationTestBase
         }
 
         client = mTLSClient();
+        beforeServerStart();
         server.start()
               .onSuccess(s -> {
                   
sidecarTestContext.registerInstanceConfigListener(this::healthCheck);
-                  if (!sidecarTestContext.isClusterBuilt())
+                  if (!isClusterBuilt)
                   {
                       // Give everything a moment to get started and connected
                       vertx.setTimer(TimeUnit.SECONDS.toMillis(1), id1 -> 
context.completeNow());
@@ -184,6 +190,12 @@ public abstract class IntegrationTestBase
               .onFailure(context::failNow);
 
         context.awaitCompletion(5, TimeUnit.SECONDS);
+
+        // add a listener to refresh instance metadata when cluster is not yet 
built when starting server
+        if (!isClusterBuilt)
+        {
+            cassandraTestContext.setClusterBuiltListener(cluster -> 
sidecarTestContext.refreshInstancesMetadata());
+        }
     }
 
     @AfterEach
@@ -203,6 +215,10 @@ public abstract class IntegrationTestBase
     {
     }
 
+    protected void beforeServerStart()
+    {
+    }
+
     protected void installTestSpecificModule(Module testSpecificModule)
     {
         this.testSpecificModule = testSpecificModule;
@@ -210,11 +226,7 @@ public abstract class IntegrationTestBase
 
     protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
     {
-        CountDownLatch latch = new CountDownLatch(1);
-        vertx.eventBus()
-             
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg 
-> latch.countDown());
-        awaitLatchOrTimeout(latch, timeout, timeUnit);
-        assertThat(latch.getCount()).describedAs("Sidecar schema not 
initialized").isZero();
+        awaitLatchOrTimeout(schemaInitialized, timeout, timeUnit, "Wait for 
schema initialization");
     }
 
     /**
@@ -224,11 +236,11 @@ public abstract class IntegrationTestBase
      * Defaults to the entire cluster.
      *
      * @param clusterSize the size of the cluster as defined by the 
integration test
-     * @return the number of instances to manage; or -1 to let test framework 
to determine the cluster size at the runtime
+     * @return the instances to manage; or null to let test framework to 
determine the cluster size at the runtime
      */
-    protected int getNumInstancesToManage(int clusterSize)
+    protected int[] getInstancesToManage(int clusterSize)
     {
-        return -1;
+        return null;
     }
 
     protected void testWithClient(Consumer<WebClient> tester)
@@ -274,36 +286,17 @@ public abstract class IntegrationTestBase
         }
     }
 
-    protected void testWithClientBlocking(boolean waitForCluster,
-                                     Consumer<WebClient> tester)
-    {
-        CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesMetadata()
-                                                              
.instanceFromId(1)
-                                                              .delegate();
-
-        assertThat(delegate).isNotNull();
-        if (delegate.isNativeUp() || !waitForCluster)
-        {
-            tester.accept(client);
-        }
-        else
-        {
-            vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
-                if (message.body().getInteger("cassandraInstanceId") == 1)
-                {
-                    tester.accept(client);
-                }
-            });
-        }
-
-    }
-
     protected void createTestKeyspace()
     {
         createTestKeyspace(ImmutableMap.of(DATA_CENTER_PREFIX + 1, 1));
     }
 
     protected void createTestKeyspace(Map<String, Integer> rf)
+    {
+        createKeyspace(TEST_KEYSPACE, rf);
+    }
+
+    protected void createKeyspace(String keyspaceName, Map<String, Integer> rf)
     {
         int attempts = 1;
         ArrayList<Throwable> thrown = new ArrayList<>(5);
@@ -311,12 +304,13 @@ public abstract class IntegrationTestBase
         {
             try
             {
-                sidecarTestContext.refreshInstancesMetadata();
-
                 Session session = maybeGetSession();
 
-                session.execute("CREATE KEYSPACE " + IF_NOT_EXISTS + " " + 
TEST_KEYSPACE
-                              + " WITH REPLICATION = { 'class' : 
'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+                ResultSet rs = session.execute("CREATE KEYSPACE " + 
IF_NOT_EXISTS + " " + keyspaceName
+                                               + " WITH REPLICATION = { 
'class' : 'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+                assertThat(rs.getExecutionInfo().isSchemaInAgreement())
+                .describedAs("Schema agreement is not reached")
+                .isTrue();
                 return;
             }
             catch (Throwable t)
@@ -396,7 +390,7 @@ public abstract class IntegrationTestBase
     }
 
     // similar to awaitLatchOrTimeout, it throws either test exceptions (due 
to startAsync failures) or timeout exception
-    protected void awaitLatchOrThrow(CountDownLatch latch, long duration, 
TimeUnit timeUnit, String latchName)
+    public void awaitLatchOrThrow(CountDownLatch latch, long duration, 
TimeUnit timeUnit, String latchName)
     {
         String hint = latchName == null ? "" : '(' + latchName + ')';
         boolean completed = Uninterruptibles.awaitUninterruptibly(latch, 
duration, timeUnit);
@@ -409,7 +403,7 @@ public abstract class IntegrationTestBase
         throw new AssertionError("Latch " + hint + " times out after " + 
duration + ' ' + timeUnit.name());
     }
 
-    protected static void awaitLatchOrTimeout(CountDownLatch latch, long 
duration, TimeUnit timeUnit, String latchName)
+    public static void awaitLatchOrTimeout(CountDownLatch latch, long 
duration, TimeUnit timeUnit, String latchName)
     {
         String hint = latchName == null ? "" : '(' + latchName + ')';
         assertThat(Uninterruptibles.awaitUninterruptibly(latch, duration, 
timeUnit))
@@ -417,7 +411,7 @@ public abstract class IntegrationTestBase
         .isTrue();
     }
 
-    protected static void awaitLatchOrTimeout(CountDownLatch latch, long 
duration, TimeUnit timeUnit)
+    public static void awaitLatchOrTimeout(CountDownLatch latch, long 
duration, TimeUnit timeUnit)
     {
         awaitLatchOrTimeout(latch, duration, timeUnit, null);
     }
@@ -468,7 +462,6 @@ public abstract class IntegrationTestBase
         context.completeNow();
     }
 
-
     private static QualifiedTableName uniqueTestTableFullName(String 
tablePrefix)
     {
         String uniqueTableName = tablePrefix + TEST_TABLE_ID.getAndIncrement();
@@ -520,7 +513,7 @@ public abstract class IntegrationTestBase
         CertificateBuilder builder = new CertificateBuilder()
                             .subject("CN=Apache Cassandra, OU=ssl_test, 
O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
                             .addSanDnsName("localhost")
-                            .addSanIpAddress("127.0.0.1")
+                            .addSanIpAddress(subjectAlternativeNameIpAddress())
                             .addSanUriName(identity);
         if (expired)
         {
@@ -548,6 +541,11 @@ public abstract class IntegrationTestBase
                                    .build();
     }
 
+    protected String subjectAlternativeNameIpAddress()
+    {
+        return "127.0.0.1";
+    }
+
     protected WebClient createClient(Path clientKeystorePath, Path 
truststorePath)
     {
         WebClientOptions options = new WebClientOptions();
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index df165815..ba5cf3b9 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -32,6 +32,7 @@ import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
@@ -47,12 +48,18 @@ import 
org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SSTableUploadConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
+import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
+import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
 import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
 import org.jetbrains.annotations.NotNull;
 
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -99,6 +106,7 @@ public class IntegrationTestModule extends AbstractModule
                                                                                
               .isEnabled(true)
                                                                                
               .build())
                                   
.coordinationConfiguration(clusterLeaseClaimTaskConfiguration)
+                                  .sstableUploadConfiguration(new 
SSTableUploadConfigurationImpl(0F))
                                   .build();
         PeriodicTaskConfiguration healthCheckConfiguration
         = new PeriodicTaskConfigurationImpl(true,
@@ -125,11 +133,54 @@ public class IntegrationTestModule extends AbstractModule
                                        .build();
     }
 
+    @Provides
+    @Singleton
+    public ClusterLeaseClaimTask clusterLeaseClaimTask(Vertx vertx,
+                                                       ServiceConfiguration 
serviceConfiguration,
+                                                       ElectorateMembership 
electorateMembership,
+                                                       
SidecarLeaseDatabaseAccessor accessor,
+                                                       ClusterLease 
clusterLease,
+                                                       SidecarMetrics metrics)
+    {
+        return new ClusterLeaseClaimTask(vertx,
+                                         serviceConfiguration,
+                                         electorateMembership,
+                                         accessor,
+                                         clusterLease,
+                                         metrics)
+        {
+            @Override
+            public DurationSpec delay()
+            {
+                return 
serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().executeInterval();
+            }
+
+            @Override
+            public DurationSpec initialDelay()
+            {
+                return 
serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay();
+            }
+
+            @Override
+            public ScheduleDecision scheduleDecision()
+            {
+                // stop further executions if cluster lease is already 
claimed; otherwise, run it, regardless of ElectorateMembership
+                if (!accessor.isAvailable() || 
clusterLease.isClaimedByLocalSidecar())
+                {
+                    return ScheduleDecision.SKIP;
+                }
+                return ScheduleDecision.EXECUTE;
+            }
+        };
+    }
+
     @Provides
     @Singleton
     public CoordinationConfiguration clusterLeaseClaimTaskConfiguration()
     {
-        return new CoordinationConfigurationImpl(new 
PeriodicTaskConfigurationImpl());
+        return new CoordinationConfigurationImpl(new 
PeriodicTaskConfigurationImpl(true,
+                                                                               
    MillisecondBoundConfiguration.parse("1s"),
+                                                                               
    MillisecondBoundConfiguration.parse("1s")));
     }
 
     @Provides
@@ -161,13 +212,6 @@ public class IntegrationTestModule extends AbstractModule
         return cqlSessionProvider;
     }
 
-    @Provides
-    @Singleton
-    public ClusterLease clusterLease()
-    {
-        return new ClusterLease(ClusterLease.Ownership.CLAIMED);
-    }
-
     private AccessControlConfiguration accessControlConfiguration()
     {
         Map<String, String> params = new HashMap<String, String>()
diff --git 
a/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
 
b/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
index 111205a8..2ff58a1b 100644
--- 
a/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
+++ 
b/server/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,8 @@ public abstract class AbstractCassandraTestContext implements 
AutoCloseable
 
     public final SimpleCassandraVersion version;
     private final Map<String, String> initialProperties;
-    protected UpgradeableCluster cluster;
+    private UpgradeableCluster cluster;
+    private Consumer<UpgradeableCluster> onClusterBuilt;
 
     // certificates created when cluster is started with auth
     public final CertificateBundle ca;
@@ -78,6 +80,24 @@ public abstract class AbstractCassandraTestContext 
implements AutoCloseable
         return cluster;
     }
 
+    public void setClusterBuiltListener(Consumer<UpgradeableCluster> listener)
+    {
+        this.onClusterBuilt = listener;
+        if (cluster != null)
+        {
+            onClusterBuilt.accept(cluster);
+        }
+    }
+
+    protected void setCluster(UpgradeableCluster cluster)
+    {
+        this.cluster = cluster;
+        if (onClusterBuilt != null)
+        {
+            onClusterBuilt.accept(cluster);
+        }
+    }
+
     @Override
     public void close()
     {
diff --git 
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
 
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
index 905daa45..5c7f47be 100644
--- 
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
+++ 
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
@@ -30,7 +30,6 @@ import 
org.apache.cassandra.testing.utils.tls.CertificateBundle;
  */
 public class CassandraTestContext extends AbstractCassandraTestContext
 {
-
     public CassandraTestContext(SimpleCassandraVersion version,
                                 UpgradeableCluster cluster,
                                 CertificateBundle ca,
@@ -46,7 +45,7 @@ public class CassandraTestContext extends 
AbstractCassandraTestContext
     {
         return "CassandraTestContext{"
                + "version=" + version
-               + ", cluster=" + cluster
+               + ", cluster=" + cluster()
                + '}';
     }
 }
diff --git 
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
 
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
index b494b8f2..ff50a761 100644
--- 
a/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
+++ 
b/server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
@@ -183,7 +183,7 @@ public class CassandraTestTemplate implements 
TestTemplateInvocationContextProvi
 
                 Path tempDirPath = Files.createTempDirectory("certs");
                 CertificateBundle ca = ca();
-                Path serverKeystorePath = serverKeystorePath(ca, tempDirPath);
+                Path serverKeystorePath = serverKeystorePath(ca, tempDirPath, 
finalNodeCount);
                 Path truststorePath = truststorePath(ca, tempDirPath);
 
                 switch (annotation.authMode())
@@ -382,13 +382,16 @@ public class CassandraTestTemplate implements 
TestTemplateInvocationContextProvi
         return ca.toTempKeyStorePath(path, truststorePassword.toCharArray(), 
truststorePassword.toCharArray());
     }
 
-    private Path serverKeystorePath(CertificateBundle ca, Path path) throws 
Exception
+    private Path serverKeystorePath(CertificateBundle ca, Path path, int 
totalNodes) throws Exception
     {
-        CertificateBundle keystore = new CertificateBuilder()
-                                     .subject("CN=Apache Cassandra, 
OU=ssl_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
-                                     .addSanDnsName("localhost")
-                                     .addSanIpAddress("127.0.0.1")
-                                     .buildIssuedBy(ca);
+        CertificateBuilder builder = new CertificateBuilder();
+        builder.subject("CN=Apache Cassandra, OU=ssl_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
+               .addSanDnsName("localhost");
+        for (int i = 1; i <= totalNodes; i++)
+        {
+            builder.addSanIpAddress("127.0.0." + i);
+        }
+        CertificateBundle keystore = builder.buildIssuedBy(ca);
         return keystore.toTempKeyStorePath(path, 
serverKeystorePassword.toCharArray(), serverKeystorePassword.toCharArray());
     }
 
@@ -427,8 +430,7 @@ public class CassandraTestTemplate implements 
TestTemplateInvocationContextProvi
 
     static
     {
-        // Settings to reduce the test setup delay incurred if gossip is 
enabled
-        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 
30s default
+        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 
30s default; this change has no effect if GOSSIP feature is enabled
         System.setProperty("cassandra.consistent.rangemovement", "false");
         System.setProperty("cassandra.consistent.simultaneousmoves.allow", 
"true");
         // End gossip delay settings
diff --git 
a/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
 
b/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
index d52d96de..7500a7e9 100644
--- 
a/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
+++ 
b/server/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
@@ -46,7 +46,8 @@ public class ConfigurableCassandraTestContext extends 
AbstractCassandraTestConte
     public UpgradeableCluster 
configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator)
     {
         configurator.accept(builder);
-        cluster = CassandraTestTemplate.retriableStartCluster(builder, 3);
+        UpgradeableCluster cluster = 
CassandraTestTemplate.retriableStartCluster(builder, 3);
+        setCluster(cluster);
         return cluster;
     }
 
@@ -55,6 +56,7 @@ public class ConfigurableCassandraTestContext extends 
AbstractCassandraTestConte
     {
         return "ConfigurableCassandraTestContext{"
                + ", version=" + version
+               + ", cluster=" + cluster()
                + ", builder=" + builder
                + '}';
     }
diff --git a/server/src/test/resources/logback-in-jvm-dtest.xml 
b/server/src/test/resources/logback-in-jvm-dtest.xml
index b6e574a2..9770bd2a 100644
--- a/server/src/test/resources/logback-in-jvm-dtest.xml
+++ b/server/src/test/resources/logback-in-jvm-dtest.xml
@@ -20,18 +20,23 @@
 
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
     <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-      <level>INFO</level>
+      <level>DEBUG</level>
     </filter>
     <encoder>
       <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
     </encoder>
   </appender>
 
-  <root level="INFO">
+  <root level="DEBUG">
     <appender-ref ref="STDOUT" />
   </root>
 
-
+  <logger name="org.apache.cassandra.sidecar" level="DEBUG" />
+  <logger name="org.apache.cassandra" level="INFO" />
+  <logger name="io.netty" level="WARN" />
+  <logger name="shaded.io.netty" level="WARN" />
+  <logger name="shaded.com.datastax.shaded.netty" level="WARN" />
+  <logger name="o.a.c.sidecar.client.shaded.io.netty" level="WARN" />
   <logger name="com.datastax.driver.core" level="ERROR" />
   <logger name="com.datastax.driver.core.ControlConnection" level="OFF" />
 </configuration>
diff --git a/vertx-auth-mtls/build.gradle b/vertx-auth-mtls/build.gradle
index ba2b248f..c4449efe 100644
--- a/vertx-auth-mtls/build.gradle
+++ b/vertx-auth-mtls/build.gradle
@@ -39,6 +39,7 @@ test {
         println("Destination directory for vertx-auth-mtls tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/vertx-client-shaded/build.gradle b/vertx-client-shaded/build.gradle
index 464df4fa..23bce206 100644
--- a/vertx-client-shaded/build.gradle
+++ b/vertx-client-shaded/build.gradle
@@ -61,6 +61,7 @@ tasks.named('test') {
         println("Destination directory for vertx-client-shaded tests: 
${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 
diff --git a/vertx-client/build.gradle b/vertx-client/build.gradle
index 2afd2260..b825cf08 100644
--- a/vertx-client/build.gradle
+++ b/vertx-client/build.gradle
@@ -44,6 +44,7 @@ test {
         println("Destination directory for vertx-client tests: ${destDir}")
         junitXml.getOutputLocation().set(destDir)
         html.setRequired(true)
+        html.getOutputLocation().set(destDir)
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to