This is an automated email from the ASF dual-hosted git repository.
sarankk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4aa927d CASSANALYTICS-171: Avoid Spark 4 partitioning warnings during
reads (#213)
a4aa927d is described below
commit a4aa927db7986ad4f8e6039a647fbdb0c7a1dec5
Author: Liu Cao <[email protected]>
AuthorDate: Tue Jun 2 12:59:12 2026 -0700
CASSANALYTICS-171: Avoid Spark 4 partitioning warnings during reads (#213)
Spark 4 ignores custom DataSource V2 Partitioning implementations and logs
a warning. Cassandra scan partitions are token ranges rather than keyed groups,
so report Spark's UnknownPartitioning directly while preserving the input
partition count.
Patch by Liu Cao; reviewed by Francisco Guerrero, Saranya Krishnakumar for
CASSANALYTICS-171
---
CHANGES.txt | 1 +
.../spark/sparksql/CassandraScanBuilder.java | 3 ++-
.../spark/sparksql/CassandraScanBuilderTest.java} | 30 ++++++++++++++--------
3 files changed, 23 insertions(+), 11 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 573b879f..ec3409ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.5.0
-----
+ * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
* Make BulkWriterConfig extensible (CASSANALYTICS-168)
diff --git
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
b/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
index 2f821398..57538d33 100644
---
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
+++
b/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java
@@ -43,6 +43,7 @@ import
org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
+import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -121,7 +122,7 @@ class CassandraScanBuilder implements ScanBuilder, Scan,
Batch, SupportsPushDown
@Override
public Partitioning outputPartitioning()
{
- return new CassandraPartitioning(dataLayer);
+ return new UnknownPartitioning(dataLayer.partitionCount());
}
private List<PartitionKeyFilter> buildPartitionKeyFilters()
diff --git
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
b/cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
similarity index 51%
rename from
cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
rename to
cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
index e305f365..b9948d0b 100644
---
a/cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java
+++
b/cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java
@@ -19,21 +19,31 @@
package org.apache.cassandra.spark.sparksql;
+import org.junit.jupiter.api.Test;
+
import org.apache.cassandra.spark.data.DataLayer;
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
+import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-class CassandraPartitioning implements Partitioning
-{
- final DataLayer dataLayer;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
- CassandraPartitioning(DataLayer dataLayer)
+class CassandraScanBuilderTest
+{
+ @Test
+ void outputPartitioningReportsUnknownPartitioningWithPartitionCount()
{
- this.dataLayer = dataLayer;
- }
+ DataLayer dataLayer = mock(DataLayer.class);
+ when(dataLayer.partitionCount()).thenReturn(7);
+ CassandraScanBuilder builder =
+ new CassandraScanBuilder(dataLayer, new StructType(),
CaseInsensitiveStringMap.empty());
- @Override
- public int numPartitions()
- {
- return dataLayer.partitionCount();
+ Partitioning partitioning = builder.outputPartitioning();
+
+ assertThat(partitioning).isInstanceOf(UnknownPartitioning.class);
+ assertThat(partitioning.numPartitions()).isEqualTo(7);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]