This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 090dd4fd CASSANALYTICS-20: CassandraDataLayer uses configuration list 
of IPs instead of the full ring/datacenter (#122)
090dd4fd is described below

commit 090dd4fdc86ea0ca9410140ce72e840e37497df7
Author: Yifan Cai <y...@apache.org>
AuthorDate: Sat Jun 21 20:45:26 2025 -0700

    CASSANALYTICS-20: CassandraDataLayer uses configuration list of IPs instead 
of the full ring/datacenter (#122)
    
    Patch by Serban Teodorescu, Yifan Cai; Reviewed by Francisco Guerrero, 
Yifan Cai for CASSANALYTICS-20
    
    ---------
    
    Co-authored-by: Serban Teodorescu <teodo...@adobe.com>
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/data/CassandraDataLayer.java   | 31 +++++++++++++----
 .../apache/cassandra/analytics/BulkReaderTest.java | 27 +++++++++++++++
 .../SharedClusterSparkIntegrationTestBase.java     | 21 ++++++++++--
 .../apache/cassandra/analytics/SparkTestUtils.java | 39 +++++++++++++---------
 5 files changed, 94 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1f074a1f..8fd4e7af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Use full ring instead of only IPs from configuration (CASSANALYTICS-20)
  * Bulk Reader should dynamically size the Spark job based on estimated table 
size (CASSANALYTICS-36)
  * Allow getting cassandra role in Spark options for use in Sidecar requests 
for RBAC (CASSANALYTICS-61)
  * Fix NPE in the deserialized CassandraClusterInfoGroup (CASSANALYTICS-59)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 5e7910ae..bf877f61 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -146,7 +146,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     private SslConfig sslConfig;
 
     @VisibleForTesting
-    transient Map<String, SidecarInstance> instanceMap;
+    transient Map<String, SidecarInstance> sidecarInstanceMap;
 
     public CassandraDataLayer(@NotNull ClientConfig options,
                               @NotNull Sidecar.ClientConfig 
sidecarClientConfig,
@@ -220,6 +220,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         this.rfMap = rfMap;
         this.timeProvider = timeProvider;
         this.maybeQuoteKeyspaceAndTable();
+        this.initSidecarClient();
         this.initInstanceMap();
         this.startupValidate();
     }
@@ -234,7 +235,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
 
         // Load cluster config from options
         clusterConfig = initializeClusterConfig(options);
-        initInstanceMap();
+        initSidecarClient();
 
         // Get cluster info from Cassandra Sidecar
         int effectiveNumberOfCores;
@@ -251,6 +252,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         {
             throw new RuntimeException(ThrowableUtils.rootCause(exception));
         }
+        initInstanceMap();
         LOGGER.info("Initialized Cassandra Bulk Reader with 
effectiveNumberOfCores={}", effectiveNumberOfCores);
     }
 
@@ -422,7 +424,21 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
 
     protected void initInstanceMap()
     {
-        instanceMap = 
clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, 
Function.identity()));
+        Preconditions.checkState(tokenPartitioner != null, "tokenPartitioner 
cannot be absent");
+        sidecarInstanceMap = tokenPartitioner
+                             .ring()
+                             .instances()
+                             .stream()
+                             .filter(instance -> datacenter == null || 
datacenter.equals(instance.dataCenter()))
+                             .map(CassandraInstance::nodeName)
+                             .distinct()
+                             .map(nodeName -> new 
SidecarInstanceImpl(nodeName, sidecarClientConfig.effectivePort()))
+                             
.collect(Collectors.toMap(SidecarInstance::hostname, Function.identity()));
+        LOGGER.info("Initialized CassandraDataLayer sidecarInstanceMap 
numInstances={}", sidecarInstanceMap.size());
+    }
+
+    protected void initSidecarClient()
+    {
         try
         {
             SslConfigSecretsProvider secretsProvider = sslConfig != null
@@ -436,7 +452,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         {
             throw new RuntimeException("Unable to build sidecar client", 
ioException);
         }
-        LOGGER.info("Initialized CassandraDataLayer instanceMap 
numInstances={}", instanceMap.size());
+        LOGGER.info("Initialized sidecar client");
     }
 
     @Override
@@ -516,7 +532,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                                            @NotNull 
Range<BigInteger> range,
                                                            @NotNull 
CassandraInstance instance)
     {
-        SidecarInstance sidecarInstance = instanceMap.get(instance.nodeName());
+        SidecarInstance sidecarInstance = 
sidecarInstanceMap.get(instance.nodeName());
         if (sidecarInstance == null)
         {
             throw new IllegalStateException("Could not find matching cassandra 
instance: " + instance.nodeName());
@@ -733,6 +749,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
         this.timeProvider = new ReaderTimeProvider(in.readInt());
         this.maybeQuoteKeyspaceAndTable();
+        this.initSidecarClient();
         this.initInstanceMap();
         this.startupValidate();
     }
@@ -944,10 +961,10 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
 
         LOGGER.info("Clearing snapshot at end of Spark job snapshotName={} 
keyspace={} table={} dc={}",
                     snapshotName, maybeQuotedKeyspace, maybeQuotedTable, 
datacenter);
-        CountDownLatch latch = new CountDownLatch(clusterConfig.size());
+        CountDownLatch latch = new CountDownLatch(sidecarInstanceMap.size());
         try
         {
-            for (SidecarInstance instance : clusterConfig)
+            for (SidecarInstance instance : sidecarInstanceMap.values())
             {
                 sidecar.clearSnapshot(instance, maybeQuotedKeyspace, 
maybeQuotedTable, snapshotName).whenComplete((resp, throwable) -> {
                     try
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 21637e85..7c5ebaff 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.analytics;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -48,6 +50,13 @@ class BulkReaderTest extends 
SharedClusterSparkIntegrationTestBase
     QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
     QualifiedName tableForNullStaticColumn = 
uniqueTestTableFullName(TEST_KEYSPACE);
 
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .nodesPerDc(2);
+    }
+
     @Test
     void testDynamicSizingOption()
     {
@@ -119,6 +128,24 @@ class BulkReaderTest extends 
SharedClusterSparkIntegrationTestBase
         }
     }
 
+    @Test
+    void testUsingSingleSidecarContactPoint()
+    {
+        String singleSidecar = 
SparkTestUtils.sidecarInstancesOptionStream(cluster, dnsResolver)
+                                             .limit(1)
+                                             .collect(Collectors.joining());
+        assertThat(cluster.size()).isEqualTo(2);
+        assertThat(singleSidecar.contains(","))
+        .describedAs("should not contain the separator ',' as it should have 
one single contact point")
+        .isFalse();
+        Dataset<Row> data = bulkReaderDataFrame(table1, 
Collections.singletonMap("sidecar_contact_points", singleSidecar)).load();
+
+        List<Row> rows = data.collectAsList().stream()
+                             .sorted(Comparator.comparing(row -> 
row.getInt(0)))
+                             .collect(Collectors.toList());
+        assertThat(rows.size()).isEqualTo(DATASET.size());
+    }
+
     @Override
     protected void initializeSchemaForTest()
     {
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
index 9a7d5889..c6a98ca7 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
@@ -91,8 +91,25 @@ public abstract class SharedClusterSparkIntegrationTestBase 
extends SharedCluste
      */
     protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName)
     {
-        return 
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(), 
getOrCreateSparkSession(),
-                                                         tableName);
+        return 
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+                                                         
getOrCreateSparkSession(),
+                                                         tableName, 
Collections.emptyMap());
+    }
+
+    /**
+     * A preconfigured {@link DataFrameReader} with pre-populated required 
options that can be overridden
+     * with additional options for every specific test.
+     *
+     * @param tableName the qualified name for the Cassandra table
+     * @param additionalOptions additional options for the data frame
+     * @return a {@link DataFrameReader} for Cassandra bulk reads
+     */
+    protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName, 
Map<String, String> additionalOptions)
+    {
+        return 
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+                                                         
getOrCreateSparkSession(),
+                                                         tableName,
+                                                         additionalOptions);
     }
 
     /**
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 0e3a1366..5eda84a9 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.analytics;
 
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -112,7 +113,8 @@ public class SparkTestUtils
      */
     public DataFrameReader defaultBulkReaderDataFrame(SparkConf sparkConf,
                                                       SparkSession spark,
-                                                      QualifiedName tableName)
+                                                      QualifiedName tableName,
+                                                      Map<String, String> 
additionalOptions)
     {
         SQLContext sql = spark.sqlContext();
         SparkContext sc = spark.sparkContext();
@@ -121,22 +123,27 @@ public class SparkTestUtils
         int numExecutors = 
sparkConf.getInt("spark.dynamicAllocation.maxExecutors", 
sparkConf.getInt("spark.executor.instances", 1));
         int numCores = coresPerExecutor * numExecutors;
 
+        Map<String, String> options = new HashMap<>();
+        options.put("sidecar_contact_points", sidecarInstancesOption(cluster, 
dnsResolver));
+        options.put("keyspace", tableName.keyspace());
+        options.put("table", tableName.table());
+        options.put("DC", "datacenter1");
+        options.put("snapshotName", UUID.randomUUID().toString());
+        options.put("createSnapshot", "true");
+        // Shutdown hooks are called after the job ends, and in the case of 
integration tests
+        // the sidecar is already shut down before this. Since the cluster 
will be torn
+        // down anyway, the integration job skips clearing snapshots.
+        options.put("clearSnapshotStrategy", "noop");
+        options.put("defaultParallelism", 
String.valueOf(sc.defaultParallelism()));
+        options.put("numCores", String.valueOf(numCores));
+        options.put("sizing", "default");
+        options.put("sidecar_port", String.valueOf(sidecarPort));
+        // merge in additionalOptions; note that for options with the same 
name, the entries in additionalOptions are kept
+        options.putAll(additionalOptions);
+
         return 
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
-                  .option("sidecar_contact_points", 
sidecarInstancesOption(cluster, dnsResolver))
-                  .option("keyspace", tableName.keyspace()) // unquoted
-                  .option("table", tableName.table()) // unquoted
-                  .option("DC", "datacenter1")
-                  .option("snapshotName", UUID.randomUUID().toString())
-                  .option("createSnapshot", "true")
-                  // Shutdown hooks are called after the job ends, and in the 
case of integration tests
-                  // the sidecar is already shut down before this. Since the 
cluster will be torn
-                  // down anyway, the integration job skips clearing snapshots.
-                  .option("clearSnapshotStrategy", "noop")
-                  .option("defaultParallelism", sc.defaultParallelism())
-                  .option("numCores", numCores)
-                  .option("sizing", "default")
-                  .options(mtlsTestHelper.mtlOptionMap())
-                  .option("sidecar_port", sidecarPort);
+                  .options(options)
+                  .options(mtlsTestHelper.mtlOptionMap());
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to