This is an automated email from the ASF dual-hosted git repository.
jmckenzie-dev pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee180770 CASSANALYTICS-146: Clean up cluster re-use resource control
(#203)
ee180770 is described below
commit ee180770dcb8f0f779ce5c866cdf96a9781645dd
Author: Josh McKenzie <[email protected]>
AuthorDate: Wed May 6 15:37:10 2026 -0400
CASSANALYTICS-146: Clean up cluster re-use resource control (#203)
Patch by Josh McKenzie; reviewed by Jyothsna Konisa for CASSANALYTICS-146
---
.circleci/config.yml | 42 +++-
build.gradle | 8 +
.../BulkReaderMultiDCConsistencyTest.java | 272 +++++++++++++--------
.../cassandra/analytics/ResiliencyTestBase.java | 11 +-
4 files changed, 220 insertions(+), 113 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 079e81a9..40a17230 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -119,11 +119,47 @@ commands:
# Format the arguments to "./gradlew test"
# GRADLE_ARGS=$(echo $CLASSNAMES | awk '{for (i=1; i<=NF; i++)
print "--tests",$i}')
echo "CircleCI assigned the following classes for testing:
$CLASSNAMES"
+ # Each test class runs in its own ./gradlew invocation. Gradle
treats
+ # build/test-reports/integration/ as a task output and wipes stale
+ # files on each run, so without aggregation only the last class's
XML
+ # would survive, hiding most results from the CircleCI dashboard.
+ # Drain reports into a per-class subdirectory after each iteration.
+ SRC_REPORT_DIR="$(pwd)/build/test-reports/integration"
+ AGG_ROOT="$(pwd)/build/aggregated-test-reports/integration"
+ mkdir -p "$AGG_ROOT"
# collect up exit statuses for all of the test classes and exit
with that result at the end.
# If no gradle processes exit with non-zero status, it will still
be 0
EXIT_STATUS=0
for TEST_NAME in $CLASSNAMES; do
./gradlew --stacktrace
cassandra-analytics-integration-tests:test --tests $TEST_NAME --no-daemon ||
EXIT_STATUS=$?;
+ DEST="$AGG_ROOT/$TEST_NAME"
+ mkdir -p "$DEST"
+ MOVED=0
+ if [ -d "$SRC_REPORT_DIR" ]; then
+ for f in "$SRC_REPORT_DIR"/TEST-*.xml; do
+ [ -e "$f" ] || continue
+ mv "$f" "$DEST"/
+ MOVED=1
+ done
+ for f in "$SRC_REPORT_DIR"/*.html; do
+ [ -e "$f" ] || continue
+ mv "$f" "$DEST"/ 2>/dev/null || true
+ done
+ fi
+ # If Gradle produced no XML (e.g. class-level crash before any
+ # @Test ran), synthesize a minimal JUnit record so CircleCI's
+ # dashboard still surfaces that the shard attempted the class.
+ if [ "$MOVED" = "0" ]; then
+ MSG="Gradle produced no JUnit XML for $TEST_NAME; likely a
class-level crash before tests ran. Exit status: $EXIT_STATUS. See job
artifacts for full logs."
+ {
+ printf '%s\n' '<?xml version="1.0" encoding="UTF-8"?>'
+ printf '<testsuite name="%s" tests="1" failures="1"
errors="0" skipped="0">\n' "$TEST_NAME"
+ printf ' <testcase classname="%s"
name="noJUnitReportProduced">\n' "$TEST_NAME"
+ printf ' <failure message="%s">%s</failure>\n' "$MSG"
"$MSG"
+ printf ' </testcase>\n'
+ printf '</testsuite>\n'
+ } > "$DEST/TEST-${TEST_NAME}.xml"
+ fi
done;
exit $EXIT_STATUS
no_output_timeout: 30m
@@ -244,7 +280,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
int-c41-spark3-2_12-jdk11:
parallelism: 8
@@ -281,7 +317,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
spark3-2_13-jdk11-bti-c50:
docker:
@@ -344,7 +380,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
workflows:
version: 2
diff --git a/build.gradle b/build.gradle
index 17146696..065e903f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -248,7 +248,15 @@ subprojects {
}
}
+ // We want to get all the exception information we can on test failure in a
multi-node in-jvm env
tasks.withType(Test).configureEach {
+ testLogging {
+ showExceptions true
+ exceptionFormat "full"
+ showCauses true
+ showStackTraces true
+ }
+
def heapDumpPath =
"${project.rootProject.rootDir}/build/${project.name}/heapDumps"
Files.createDirectories(Paths.get(heapDumpPath))
if (JavaVersion.current().isJava11Compatible()) {
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
index b4bf2065..fd87abd5 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -19,12 +19,14 @@
package org.apache.cassandra.analytics;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
import org.junit.jupiter.api.Test;
import net.bytebuddy.ByteBuddy;
@@ -134,93 +136,140 @@ public class BulkReaderMultiDCConsistencyTest extends
SharedClusterSparkIntegrat
* @throws NoSuchMethodException
*/
@Test
- void eachQuorumIsNotQuorum() throws NoSuchMethodException
+ void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException
{
- List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
- updatedDataSet.set(1, TEST_VAL);
-
- // Internally update value for TEST_KEY for node5 and node6. This
update doesn't propagate to other nodes.
- updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
- updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
-
- // Bytecode injection to simulate a scenario where node5 and node6 are
at the end of the replica list for bulk reader.
- // This simulation mimics a real world scenario.
- // With this arrangement PartitionedDataLayer.splitReplicas method for
QUORUM will split the replicas like below:
- // primaryReplicas: [Node1, Node2, Node3, Node4]
- // secondaryReplicas: [Node5, Node6]
- // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk
reader will read from [Node1, Node2, Node3, Node4] only.
+ // The agent must be installed before
ClassReloadingStrategy.fromInstalledAgent() — otherwise
+ // it throws IllegalStateException when the JVM hosting this test
class was started without a
+ // prior ByteBuddy install (e.g. when the CI harness runs this class
in its own gradle invocation).
ByteBuddyAgent.install();
- new ByteBuddy()
- .redefine(CassandraDataLayer.class)
- .method(ElementMatchers.named("getAvailability"))
- .intercept(
-
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
CassandraInstance.class))
- .withAllArguments()
- )
- .make()
- .load(
- CassandraDataLayer.class.getClassLoader(),
- ClassReloadingStrategy.fromInstalledAgent()
- );
+ // Hold on to the original strategy to reset ByteBuddy after this test
+ ClassReloadingStrategy crStrategy =
ClassReloadingStrategy.fromInstalledAgent();
- // Bulk read with QUORUM consistency
- List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
- // Validate that the result doesn't have the updated data.
- validateBulkReadRows(rowList, OG_DATASET);
-
- // Message filter to mimic message drops from Node5 and Node6 to Node1.
- // We are setting this up to simulate a scenario where reading values
with QUORUM consistency with driver
- // and using Node1 as the coordinator doesn't get the values from
Node5 and Node6.
- cluster.filters().allVerbs().from(5).to(1).drop();
- cluster.filters().allVerbs().from(6).to(1).drop();
-
- // Read value for TEST_KEY with driver using Node1 as coordinator
- String quorumVal = readValueForKey(cluster.get(1).coordinator(),
TEST_KEY, ConsistencyLevel.QUORUM);
- // Validate that the updated value is not read
- assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
-
- // Cleanup message filter
- cluster.filters().reset();
-
- // Bulk read with EACH_QUORUM consistency
- rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- // Validate that bulk reader was able to read the updated value
- validateBulkReadRows(rowList, updatedDataSet);
- // Read value using driver with EACH_QUORUM
- String eachQuorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
- // Validate that EACH_QUORUM read using driver and the bulk reader are
the same
-
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Revert the value update for all nodes
- setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ // We need the try/finally structure since this test mutates global
state through the ByteBuddy changes; if we
+ // time out during the test run that change persists and, pending test
ordering, will cascade and take the rest
+ // with it.
+ try
+ {
+ List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
+ updatedDataSet.set(1, TEST_VAL);
+
+ // Internally update value for TEST_KEY for node5 and node6. This
update doesn't propagate to other nodes.
+ updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
+ updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
+
+ // Bytecode injection to simulate a scenario where node5 and node6
are at the end of the replica list for bulk reader.
+ // This simulation mimics a real world scenario.
+ // With this arrangement PartitionedDataLayer.splitReplicas method
for QUORUM will split the replicas like below:
+ // primaryReplicas: [Node1, Node2, Node3, Node4]
+ // secondaryReplicas: [Node5, Node6]
+ // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk
reader will read from [Node1, Node2, Node3, Node4] only.
+ new ByteBuddy()
+ .redefine(CassandraDataLayer.class)
+ .method(ElementMatchers.named("getAvailability"))
+ .intercept(
+
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
CassandraInstance.class))
+ .withAllArguments()
+ )
+ .make()
+ .load(CassandraDataLayer.class.getClassLoader(),
crStrategy);
+
+ // Bulk read with QUORUM consistency
+ List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ // Validate that the result doesn't have the updated data.
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Message filter to mimic message drops from Node5 and Node6 to
Node1.
+ // We are setting this up to simulate a scenario where reading
values with QUORUM consistency with driver
+ // and using Node1 as the coordinator doesn't get the values from
Node5 and Node6.
+ cluster.filters().allVerbs().from(5).to(1).drop();
+ cluster.filters().allVerbs().from(6).to(1).drop();
+
+ // Read value for TEST_KEY with driver using Node1 as coordinator.
+ // The message filters above drop responses from Node5/Node6 to
Node1, but the coordinator's
+ // snitch-based replica selection may still pick Node5 or Node6 as
one of its 4 QUORUM replicas.
+ // When that happens the dropped response causes a
ReadTimeoutException. This is infrastructure
+ // noise from the message filter and not a real test failure,
which would manifest as an
+ // AssertionError (wrong value returned), not a timeout. Retry to
tolerate the non-deterministic
+ // replica selection. This test was _very_ intermittently flaky
before, so if we take something that
+ // flaked out 1% of the time for instance and then repeat 10x, we
_should_ be in a better place.
+ // Another option here would be to just keep spinning on
ReadTimeoutExceptions until the junit
+ // timeout timer but that just seems excessive.
+ String quorumVal = null;
+ for (int attempt = 1; attempt <= 10; attempt++)
+ {
+ try
+ {
+ quorumVal = readValueForKey(cluster.get(1).coordinator(),
TEST_KEY, ConsistencyLevel.QUORUM);
+ break;
+ }
+ catch (Exception e)
+ {
+ if (attempt == 10 || !(e instanceof ReadTimeoutException))
+ {
+ throw e;
+ }
+ }
+ }
+ assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
+
+ // Bulk read with EACH_QUORUM consistency
+ rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ // Validate that bulk reader was able to read the updated value
+ validateBulkReadRows(rowList, updatedDataSet);
+
+ // Read value using driver with EACH_QUORUM. Must use a DC2
coordinator (node4) because the active message
+ // filters drop responses from nodes 5 and 6 back to node1 (DC1),
making EACH_QUORUM unsatisfiable from
+ // node1's perspective. We used to reset our ByteBuddy filters
mid-test but timeouts on the test would
+ // then cascade and cause all other tests to fail. Now we need to
have this structured workaround
+ // to respect the message filters we have in place but still
confirm the data is where we expect it
+ // and that the client version matches bulk's view of the world.
+ String eachQuorumVal =
readValueForKey(cluster.get(4).coordinator(), TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
+ // Validate that EACH_QUORUM read using driver and the bulk reader
are the same
+
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+ }
+ finally
+ {
+ // Reset message filters and data unconditionally so a mid-test
failure doesn't leave dirty
+ // state that corrupts subsequent tests (e.g.
eachQuorumFailureWithTwoNodesDownOneDC).
+ cluster.filters().reset();
+ setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ crStrategy.reset(CassandraDataLayer.class);
+ }
}
/**
* Tests that EACH_QUORUM read succeeds with one node down in each DC.
* Tests that value read using driver is the same as the value read using
bulk reader.
*
+ * As this is a topology destructive test, we tear down and recreate the
cluster.
+ *
* @throws Exception
*/
@Test
void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
{
- // Stop Node1(DC1)
- cluster.stopUnchecked(cluster.get(1));
- // Stop Node4(DC2)
- cluster.stopUnchecked(cluster.get(4));
-
- // Bulk read with EACH_QUORUM consistency
- List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- validateBulkReadRows(rowList, OG_DATASET);
-
- // Read TEST_KEY using driver
- String eachQuorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
- // Validate that data from driver and bulk reader are the same
-
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Tear down and re-create the cluster
- tearDown();
- setup();
+ try
+ {
+ // Stop Node1(DC1)
+ cluster.stopUnchecked(cluster.get(1));
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+
+ // Bulk read with EACH_QUORUM consistency
+ List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Read TEST_KEY using driver
+ String eachQuorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
+ // Validate that data from driver and bulk reader are the same
+
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+ }
+ finally
+ {
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
+ }
}
/**
@@ -230,51 +279,58 @@ public class BulkReaderMultiDCConsistencyTest extends
SharedClusterSparkIntegrat
* EACH_QUORUM read with bulk reader fails with cause as
NotEnoughReplicasException.
* EACH_QUORUM read with driver fails.
*
+ * As this is a topology destructive test, we tear down and recreate the
cluster.
+ *
* @throws Exception
*/
@Test
void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception
{
- // Stop Node4(DC2)
- cluster.stopUnchecked(cluster.get(4));
- // Stop Node5(DC2)
- cluster.stopUnchecked(cluster.get(5));
-
- // Bulk read with QUORUM
- List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
- validateBulkReadRows(rowList, OG_DATASET);
- // Driver read with QUORUM
- String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
- // Bulk read and driver read values are the same
- assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Try bulk reading with EACH_QUORUM consistency. Assert that it fails
with the correct cause.
- try
- {
- bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- }
- catch (Exception ex)
- {
- assertThat(ex).isNotNull();
- assertThat(ex).isInstanceOf(SparkException.class);
-
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
- assertThat(ex.getCause().getMessage()).isEqualTo("Required 2
replicas but only 1 responded");
- }
-
- // Try driver reading with EACH_QUORUM consistency. Assert that it
fails with the correct error.
try
{
- readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+ // Stop Node5(DC2)
+ cluster.stopUnchecked(cluster.get(5));
+
+ // Bulk read with QUORUM
+ List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+ // Driver read with QUORUM
+ String quorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.QUORUM);
+ // Bulk read and driver read values are the same
+
assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+ // Try bulk reading with EACH_QUORUM consistency. Assert that it
fails with the correct cause.
+ try
+ {
+ bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex).isInstanceOf(SparkException.class);
+
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
+ assertThat(ex.getCause().getMessage()).isEqualTo("Required 2
replicas but only 1 responded");
+ }
+
+ // Try driver reading with EACH_QUORUM consistency. Assert that it
fails with the correct error.
+ try
+ {
+ readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex.getMessage()).isEqualTo("Cannot achieve
consistency level EACH_QUORUM in DC datacenter2");
+ }
}
- catch (Exception ex)
+ finally
{
- assertThat(ex).isNotNull();
- assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency
level EACH_QUORUM in DC datacenter2");
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
}
-
- // Tear down and re-create the cluster
- tearDown();
- setup();
}
/**
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
index 86de5200..136adfbb 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
@@ -173,13 +173,20 @@ public abstract class ResiliencyTestBase extends
SharedClusterSparkIntegrationTe
rows.add(id + ":" + course + ":" + marks);
}
+ Set<String> expected = expectedInstanceData.get(instance);
+ String instanceLabel = String.format("instance=%s (broadcast=%s)
table=%s actualSize=%d expectedSize=%d",
+ instance.config().num(),
+ instance.broadcastAddress(),
+ table,
+ rows.size(),
+ expected.size());
if (hasNewNodes)
{
-
assertThat(rows).containsExactlyInAnyOrderElementsOf(expectedInstanceData.get(instance));
+
assertThat(rows).as(instanceLabel).containsExactlyInAnyOrderElementsOf(expected);
}
else
{
-
assertThat(rows).containsAll(expectedInstanceData.get(instance));
+ assertThat(rows).as(instanceLabel).containsAll(expected);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]