This is an automated email from the ASF dual-hosted git repository.

sarankk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4aa927d CASSANALYTICS-171: Avoid Spark 4 partitioning warnings during 
reads (#213)
a4aa927d is described below

commit a4aa927db7986ad4f8e6039a647fbdb0c7a1dec5
Author: Liu Cao <[email protected]>
AuthorDate: Tue Jun 2 12:59:12 2026 -0700

    CASSANALYTICS-171: Avoid Spark 4 partitioning warnings during reads (#213)
    
    Spark 4 ignores custom DataSource V2 Partitioning implementations and logs 
a warning. Cassandra scan partitions are token ranges rather than keyed groups, 
so report Spark's UnknownPartitioning directly while preserving the input 
partition count.
    
    Patch by Liu Cao; reviewed by Francisco Guerrero, Saranya Krishnakumar for 
CASSANALYTICS-171
---
 CHANGES.txt                                        |  1 +
 .../spark/sparksql/CassandraScanBuilder.java       |  3 ++-
 .../spark/sparksql/CassandraScanBuilderTest.java}  | 30 ++++++++++++++--------
 3 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 573b879f..ec3409ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.5.0
 -----
+ * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
  * Spark 4.0 Support (CASSANALYTICS-34)
  * Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
  * Make BulkWriterConfig extensible (CASSANALYTICS-168)
diff --git 
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
 
b/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
index 2f821398..57538d33 100644
--- 
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
+++ 
b/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
@@ -43,6 +43,7 @@ import 
org.apache.spark.sql.connector.read.SupportsPushDownFilters;
 import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
 import org.apache.spark.sql.connector.read.partitioning.Partitioning;
+import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -121,7 +122,7 @@ class CassandraScanBuilder implements ScanBuilder, Scan, 
Batch, SupportsPushDown
     @Override
     public Partitioning outputPartitioning()
     {
-        return new CassandraPartitioning(dataLayer);
+        return new UnknownPartitioning(dataLayer.partitionCount());
     }
 
     private List<PartitionKeyFilter> buildPartitionKeyFilters()
diff --git 
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
 
b/cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
similarity index 51%
rename from 
cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
rename to 
cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
index e305f365..b9948d0b 100644
--- 
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
+++ 
b/cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
@@ -19,21 +19,31 @@
 
 package org.apache.cassandra.spark.sparksql;
 
+import org.junit.jupiter.api.Test;
+
 import org.apache.cassandra.spark.data.DataLayer;
 import org.apache.spark.sql.connector.read.partitioning.Partitioning;
+import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-class CassandraPartitioning implements Partitioning
-{
-    final DataLayer dataLayer;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-    CassandraPartitioning(DataLayer dataLayer)
+class CassandraScanBuilderTest
+{
+    @Test
+    void outputPartitioningReportsUnknownPartitioningWithPartitionCount()
     {
-        this.dataLayer = dataLayer;
-    }
+        DataLayer dataLayer = mock(DataLayer.class);
+        when(dataLayer.partitionCount()).thenReturn(7);
+        CassandraScanBuilder builder =
+            new CassandraScanBuilder(dataLayer, new StructType(), 
CaseInsensitiveStringMap.empty());
 
-    @Override
-    public int numPartitions()
-    {
-        return dataLayer.partitionCount();
+        Partitioning partitioning = builder.outputPartitioning();
+
+        assertThat(partitioning).isInstanceOf(UnknownPartitioning.class);
+        assertThat(partitioning.numPartitions()).isEqualTo(7);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to