This is an automated email from the ASF dual-hosted git repository.

jmckenzie-dev pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee180770 CASSANALYTICS-146: Clean up cluster re-use resource control 
(#203)
ee180770 is described below

commit ee180770dcb8f0f779ce5c866cdf96a9781645dd
Author: Josh McKenzie <[email protected]>
AuthorDate: Wed May 6 15:37:10 2026 -0400

    CASSANALYTICS-146: Clean up cluster re-use resource control (#203)
    
    Patch by Josh McKenzie; reviewed by Jyothsna Konisa for CASSANALYTICS-146
---
 .circleci/config.yml                               |  42 +++-
 build.gradle                                       |   8 +
 .../BulkReaderMultiDCConsistencyTest.java          | 272 +++++++++++++--------
 .../cassandra/analytics/ResiliencyTestBase.java    |  11 +-
 4 files changed, 220 insertions(+), 113 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 079e81a9..40a17230 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -119,11 +119,47 @@ commands:
             # Format the arguments to "./gradlew test"
             # GRADLE_ARGS=$(echo $CLASSNAMES | awk '{for (i=1; i<=NF; i++) 
print "--tests",$i}')
             echo "CircleCI assigned the following classes for testing: 
$CLASSNAMES"
+            # Each test class runs in its own ./gradlew invocation. Gradle 
treats
+            # build/test-reports/integration/ as a task output and wipes stale
+            # files on each run, so without aggregation only the last class's 
XML
+            # would survive, hiding most results from the CircleCI dashboard.
+            # Drain reports into a per-class subdirectory after each iteration.
+            SRC_REPORT_DIR="$(pwd)/build/test-reports/integration"
+            AGG_ROOT="$(pwd)/build/aggregated-test-reports/integration"
+            mkdir -p "$AGG_ROOT"
             # collect up exit statuses for all of the test classes and exit 
with that result at the end.
             # If no gradle processes exit with non-zero status, it will still 
be 0
             EXIT_STATUS=0
             for TEST_NAME in $CLASSNAMES; do
                 ./gradlew --stacktrace 
cassandra-analytics-integration-tests:test --tests $TEST_NAME --no-daemon || 
EXIT_STATUS=$?;
+                DEST="$AGG_ROOT/$TEST_NAME"
+                mkdir -p "$DEST"
+                MOVED=0
+                if [ -d "$SRC_REPORT_DIR" ]; then
+                    for f in "$SRC_REPORT_DIR"/TEST-*.xml; do
+                        [ -e "$f" ] || continue
+                        mv "$f" "$DEST"/
+                        MOVED=1
+                    done
+                    for f in "$SRC_REPORT_DIR"/*.html; do
+                        [ -e "$f" ] || continue
+                        mv "$f" "$DEST"/ 2>/dev/null || true
+                    done
+                fi
+                # If Gradle produced no XML (e.g. class-level crash before any
+                # @Test ran), synthesize a minimal JUnit record so CircleCI's
+                # dashboard still surfaces that the shard attempted the class.
+                if [ "$MOVED" = "0" ]; then
+                    MSG="Gradle produced no JUnit XML for $TEST_NAME; likely a 
class-level crash before tests ran. Exit status: $EXIT_STATUS. See job 
artifacts for full logs."
+                    {
+                      printf '%s\n' '<?xml version="1.0" encoding="UTF-8"?>'
+                      printf '<testsuite name="%s" tests="1" failures="1" 
errors="0" skipped="0">\n' "$TEST_NAME"
+                      printf '  <testcase classname="%s" 
name="noJUnitReportProduced">\n' "$TEST_NAME"
+                      printf '    <failure message="%s">%s</failure>\n' "$MSG" 
"$MSG"
+                      printf '  </testcase>\n'
+                      printf '</testsuite>\n'
+                    } > "$DEST/TEST-${TEST_NAME}.xml"
+                fi
             done;
             exit $EXIT_STATUS
           no_output_timeout: 30m
@@ -244,7 +280,7 @@ jobs:
 
       - store_test_results:
           when: always
-          path: build/test-reports
+          path: build/aggregated-test-reports
 
   int-c41-spark3-2_12-jdk11:
     parallelism: 8
@@ -281,7 +317,7 @@ jobs:
 
       - store_test_results:
           when: always
-          path: build/test-reports
+          path: build/aggregated-test-reports
 
   spark3-2_13-jdk11-bti-c50:
     docker:
@@ -344,7 +380,7 @@ jobs:
 
       - store_test_results:
           when: always
-          path: build/test-reports
+          path: build/aggregated-test-reports
 
 workflows:
   version: 2
diff --git a/build.gradle b/build.gradle
index 17146696..065e903f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -248,7 +248,15 @@ subprojects {
     }
   }
 
+  // We want to get all the exception information we can on test failure in a 
multi-node in-jvm env
   tasks.withType(Test).configureEach {
+    testLogging {
+      showExceptions true
+      exceptionFormat "full"
+      showCauses true
+      showStackTraces true
+    }
+
     def heapDumpPath = 
"${project.rootProject.rootDir}/build/${project.name}/heapDumps"
     Files.createDirectories(Paths.get(heapDumpPath))
     if (JavaVersion.current().isJava11Compatible()) {
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
index b4bf2065..fd87abd5 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -19,12 +19,14 @@
 
 package org.apache.cassandra.analytics;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
 import org.junit.jupiter.api.Test;
 
 import net.bytebuddy.ByteBuddy;
@@ -134,93 +136,140 @@ public class BulkReaderMultiDCConsistencyTest extends 
SharedClusterSparkIntegrat
      * @throws NoSuchMethodException
      */
     @Test
-    void eachQuorumIsNotQuorum() throws NoSuchMethodException
+    void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException
     {
-        List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
-        updatedDataSet.set(1, TEST_VAL);
-
-        // Internally update value for TEST_KEY for node5 and node6. This 
update doesn't propagate to other nodes.
-        updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
-        updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
-
-        // Bytecode injection to simulate a scenario where node5 and node6 are 
at the end of the replica list for bulk reader.
-        // This simulation mimics a real world scenario.
-        // With this arrangement PartitionedDataLayer.splitReplicas method for 
QUORUM will split the replicas like below:
-        // primaryReplicas: [Node1, Node2, Node3, Node4]
-        // secondaryReplicas: [Node5, Node6]
-        // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk 
reader will read from [Node1, Node2, Node3, Node4] only.
+        // The agent must be installed before 
ClassReloadingStrategy.fromInstalledAgent() — otherwise
+        // it throws IllegalStateException when the JVM hosting this test 
class was started without a
+        // prior ByteBuddy install (e.g. when the CI harness runs this class 
in its own gradle invocation).
         ByteBuddyAgent.install();
-        new ByteBuddy()
-        .redefine(CassandraDataLayer.class)
-        .method(ElementMatchers.named("getAvailability"))
-        .intercept(
-        
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
 CassandraInstance.class))
-                  .withAllArguments()
-        )
-        .make()
-        .load(
-        CassandraDataLayer.class.getClassLoader(),
-        ClassReloadingStrategy.fromInstalledAgent()
-        );
+        // Hold on to the original strategy to reset ByteBuddy after this test
+        ClassReloadingStrategy crStrategy = 
ClassReloadingStrategy.fromInstalledAgent();
 
-        // Bulk read with QUORUM consistency
-        List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
-        // Validate that the result doesn't have the updated data.
-        validateBulkReadRows(rowList, OG_DATASET);
-
-        // Message filter to mimic message drops from Node5 and Node6 to Node1.
-        // We are setting this up to simulate a scenario where reading values 
with QUORUM consistency with driver
-        // and using Node1 as the coordinator doesn't get the values from 
Node5 and Node6.
-        cluster.filters().allVerbs().from(5).to(1).drop();
-        cluster.filters().allVerbs().from(6).to(1).drop();
-
-        // Read value for TEST_KEY with driver using Node1 as coordinator
-        String quorumVal = readValueForKey(cluster.get(1).coordinator(), 
TEST_KEY, ConsistencyLevel.QUORUM);
-        // Validate that the updated value is not read
-        assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
-
-        // Cleanup message filter
-        cluster.filters().reset();
-
-        // Bulk read with EACH_QUORUM consistency
-        rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
-        // Validate that bulk reader was able to read the updated value
-        validateBulkReadRows(rowList, updatedDataSet);
-        // Read value using driver with EACH_QUORUM
-        String eachQuorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
-        // Validate that EACH_QUORUM read using driver and the bulk reader are 
the same
-        
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
-        // Revert the value update for all nodes
-        setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+        // We need the try/finally structure since this test mutates global 
state through the ByteBuddy changes; if we
+        // time out during the test run that change persists and, pending test 
ordering, will cascade and take the rest
+        // with it.
+        try
+        {
+            List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
+            updatedDataSet.set(1, TEST_VAL);
+
+            // Internally update value for TEST_KEY for node5 and node6. This 
update doesn't propagate to other nodes.
+            updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
+            updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
+
+            // Bytecode injection to simulate a scenario where node5 and node6 
are at the end of the replica list for bulk reader.
+            // This simulation mimics a real world scenario.
+            // With this arrangement PartitionedDataLayer.splitReplicas method 
for QUORUM will split the replicas like below:
+            // primaryReplicas: [Node1, Node2, Node3, Node4]
+            // secondaryReplicas: [Node5, Node6]
+            // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk 
reader will read from [Node1, Node2, Node3, Node4] only.
+            new ByteBuddy()
+                    .redefine(CassandraDataLayer.class)
+                    .method(ElementMatchers.named("getAvailability"))
+                    .intercept(
+                            
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
 CassandraInstance.class))
+                                    .withAllArguments()
+                    )
+                    .make()
+                    .load(CassandraDataLayer.class.getClassLoader(), 
crStrategy);
+
+            // Bulk read with QUORUM consistency
+            List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+            // Validate that the result doesn't have the updated data.
+            validateBulkReadRows(rowList, OG_DATASET);
+
+            // Message filter to mimic message drops from Node5 and Node6 to 
Node1.
+            // We are setting this up to simulate a scenario where reading 
values with QUORUM consistency with driver
+            // and using Node1 as the coordinator doesn't get the values from 
Node5 and Node6.
+            cluster.filters().allVerbs().from(5).to(1).drop();
+            cluster.filters().allVerbs().from(6).to(1).drop();
+
+            // Read value for TEST_KEY with driver using Node1 as coordinator.
+            // The message filters above drop responses from Node5/Node6 to 
Node1, but the coordinator's
+            // snitch-based replica selection may still pick Node5 or Node6 as 
one of its 4 QUORUM replicas.
+            // When that happens the dropped response causes a 
ReadTimeoutException. This is infrastructure
+            // noise from the message filter and not a real test failure, 
which would manifest as an
+            // AssertionError (wrong value returned), not a timeout. Retry to 
tolerate the non-deterministic
+            // replica selection. This test was _very_ intermittently flaky 
before, so if we take something that
+            // flaked out 1% of the time for instance and then repeat 10x, we 
_should_ be in a better place.
+            // Another option here would be to just keep spinning on 
ReadTimeoutExceptions until the junit
+            // timeout timer but that just seems excessive.
+            String quorumVal = null;
+            for (int attempt = 1; attempt <= 10; attempt++)
+            {
+                try
+                {
+                    quorumVal = readValueForKey(cluster.get(1).coordinator(), 
TEST_KEY, ConsistencyLevel.QUORUM);
+                    break;
+                }
+                catch (Exception e)
+                {
+                    if (attempt == 10 || !(e instanceof ReadTimeoutException))
+                    {
+                        throw e;
+                    }
+                }
+            }
+            assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
+
+            // Bulk read with EACH_QUORUM consistency
+            rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+            // Validate that bulk reader was able to read the updated value
+            validateBulkReadRows(rowList, updatedDataSet);
+
+            // Read value using driver with EACH_QUORUM. Must use a DC2 
coordinator (node4) because the active message
+            // filters drop responses from nodes 5 and 6 back to node1 (DC1), 
making EACH_QUORUM unsatisfiable from
+            // node1's perspective. We used to reset our ByteBuddy filters 
mid-test but timeouts on the test would
+            // then cascade and cause all other tests to fail. Now we need to 
have this structured workaround
+            // to respect the message filters we have in place but still 
confirm the data is where we expect it
+            // and that the client version matches bulk's view of the world.
+            String eachQuorumVal = 
readValueForKey(cluster.get(4).coordinator(), TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
+            // Validate that EACH_QUORUM read using driver and the bulk reader 
are the same
+            
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+        }
+        finally
+        {
+            // Reset message filters and data unconditionally so a mid-test 
failure doesn't leave dirty
+            // state that corrupts subsequent tests (e.g. 
eachQuorumFailureWithTwoNodesDownOneDC).
+            cluster.filters().reset();
+            setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+            crStrategy.reset(CassandraDataLayer.class);
+        }
     }
 
     /**
      * Tests that EACH_QUORUM read succeeds with one node down in each DC.
      * Tests that value read using driver is the same as the value read using 
bulk reader.
      *
+     * As this is a topology destructive test, we tear down and recreate the 
cluster.
+     *
      * @throws Exception
      */
     @Test
     void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
     {
-        // Stop Node1(DC1)
-        cluster.stopUnchecked(cluster.get(1));
-        // Stop Node4(DC2)
-        cluster.stopUnchecked(cluster.get(4));
-
-        // Bulk read with EACH_QUORUM consistency
-        List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
-        validateBulkReadRows(rowList, OG_DATASET);
-
-        // Read TEST_KEY using driver
-        String eachQuorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
-        // Validate that data from driver and bulk reader are the same
-        
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
-        // Tear down and re-create the cluster
-        tearDown();
-        setup();
+        try
+        {
+            // Stop Node1(DC1)
+            cluster.stopUnchecked(cluster.get(1));
+            // Stop Node4(DC2)
+            cluster.stopUnchecked(cluster.get(4));
+
+            // Bulk read with EACH_QUORUM consistency
+            List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+            validateBulkReadRows(rowList, OG_DATASET);
+
+            // Read TEST_KEY using driver
+            String eachQuorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
+            // Validate that data from driver and bulk reader are the same
+            
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+        }
+        finally
+        {
+            // Tear down and re-create the cluster
+            tearDown();
+            setup();
+        }
     }
 
     /**
@@ -230,51 +279,58 @@ public class BulkReaderMultiDCConsistencyTest extends 
SharedClusterSparkIntegrat
      * EACH_QUORUM read with bulk reader fails with cause as 
NotEnoughReplicasException.
      * EACH_QUORUM read with driver fails.
      *
+     * As this is a topology destructive test, we tear down and recreate the 
cluster.
+     *
      * @throws Exception
      */
     @Test
     void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception
     {
-        // Stop Node4(DC2)
-        cluster.stopUnchecked(cluster.get(4));
-        // Stop Node5(DC2)
-        cluster.stopUnchecked(cluster.get(5));
-
-        // Bulk read with QUORUM
-        List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
-        validateBulkReadRows(rowList, OG_DATASET);
-        // Driver read with QUORUM
-        String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
-        // Bulk read and driver read values are the same
-        assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
-        // Try bulk reading with EACH_QUORUM consistency. Assert that it fails 
with the correct cause.
-        try
-        {
-            bulkRead(ConsistencyLevel.EACH_QUORUM.name());
-        }
-        catch (Exception ex)
-        {
-            assertThat(ex).isNotNull();
-            assertThat(ex).isInstanceOf(SparkException.class);
-            
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
-            assertThat(ex.getCause().getMessage()).isEqualTo("Required 2 
replicas but only 1 responded");
-        }
-
-        // Try driver reading with EACH_QUORUM consistency. Assert that it 
fails with the correct error.
         try
         {
-            readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+            // Stop Node4(DC2)
+            cluster.stopUnchecked(cluster.get(4));
+            // Stop Node5(DC2)
+            cluster.stopUnchecked(cluster.get(5));
+
+            // Bulk read with QUORUM
+            List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+            validateBulkReadRows(rowList, OG_DATASET);
+            // Driver read with QUORUM
+            String quorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.QUORUM);
+            // Bulk read and driver read values are the same
+            
assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+            // Try bulk reading with EACH_QUORUM consistency. Assert that it 
fails with the correct cause.
+            try
+            {
+                bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+            }
+            catch (Exception ex)
+            {
+                assertThat(ex).isNotNull();
+                assertThat(ex).isInstanceOf(SparkException.class);
+                
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
+                assertThat(ex.getCause().getMessage()).isEqualTo("Required 2 
replicas but only 1 responded");
+            }
+
+            // Try driver reading with EACH_QUORUM consistency. Assert that it 
fails with the correct error.
+            try
+            {
+                readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+            }
+            catch (Exception ex)
+            {
+                assertThat(ex).isNotNull();
+                assertThat(ex.getMessage()).isEqualTo("Cannot achieve 
consistency level EACH_QUORUM in DC datacenter2");
+            }
         }
-        catch (Exception ex)
+        finally
         {
-            assertThat(ex).isNotNull();
-            assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency 
level EACH_QUORUM in DC datacenter2");
+            // Tear down and re-create the cluster
+            tearDown();
+            setup();
         }
-
-        // Tear down and re-create the cluster
-        tearDown();
-        setup();
     }
 
     /**
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
index 86de5200..136adfbb 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
@@ -173,13 +173,20 @@ public abstract class ResiliencyTestBase extends 
SharedClusterSparkIntegrationTe
                 rows.add(id + ":" + course + ":" + marks);
             }
 
+            Set<String> expected = expectedInstanceData.get(instance);
+            String instanceLabel = String.format("instance=%s (broadcast=%s) 
table=%s actualSize=%d expectedSize=%d",
+                                                 instance.config().num(),
+                                                 instance.broadcastAddress(),
+                                                 table,
+                                                 rows.size(),
+                                                 expected.size());
             if (hasNewNodes)
             {
-                
assertThat(rows).containsExactlyInAnyOrderElementsOf(expectedInstanceData.get(instance));
+                
assertThat(rows).as(instanceLabel).containsExactlyInAnyOrderElementsOf(expected);
             }
             else
             {
-                
assertThat(rows).containsAll(expectedInstanceData.get(instance));
+                assertThat(rows).as(instanceLabel).containsAll(expected);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to