This is an automated email from the ASF dual-hosted git repository.
yifan-c pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 652cc172 CASSANALYTICS-168: Make BulkWriterConfig extensible
652cc172 is described below
commit 652cc17217433254d07c29ee37e4b32b3e723410
Author: Shailaja Koppu <[email protected]>
AuthorDate: Thu May 7 17:19:10 2026 +0100
CASSANALYTICS-168: Make BulkWriterConfig extensible
Patch by Shailaja Koppu; Reviewed by Josh McKenzie, Yifan Cai for
CASSANALYTICS-168
---
CHANGES.txt | 4 +
.../bulkwriter/AbstractBulkWriterContext.java | 4 +-
.../spark/bulkwriter/BulkWriterConfig.java | 20 ++-
.../spark/bulkwriter/BulkWriterContext.java | 26 +--
.../bulkwriter/CassandraBulkSourceRelation.java | 59 +------
.../bulkwriter/CassandraBulkWriterContext.java | 18 ++-
.../spark/bulkwriter/CassandraClusterInfo.java | 22 ++-
.../cassandra/spark/bulkwriter/RecordWriter.java | 2 +-
.../coordinated/CassandraClusterInfoGroup.java | 10 +-
.../CassandraCoordinatedBulkWriterContext.java | 23 ++-
.../BulkWriterConfigExtensibilityTest.java | 175 +++++++++++++++++++++
.../spark/bulkwriter/MockBulkWriterContext.java | 7 +
.../BulkReaderMultiDCConsistencyTest.java | 6 +-
13 files changed, 282 insertions(+), 94 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a3ec87e3..4e5f27ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+0.5.0
+-----
+ * Make BulkWriterConfig extensible (CASSANALYTICS-168)
+
0.4.0
-----
* Fix com.apple.mg namespace in cdc_bytes.avsc and cdc_generic_record.avsc
(CASSANALYTICS-166)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index b6eff3c8..fd2210d0 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -54,7 +54,7 @@ import org.jetbrains.annotations.NotNull;
* <li>Driver creates BulkWriterContext using constructor</li>
* <li>Driver extracts BulkWriterConfig in {@link
CassandraBulkSourceRelation} constructor</li>
* <li>BulkWriterConfig gets broadcast to executors</li>
- * <li>Executors reconstruct BulkWriterContext via {@link
BulkWriterContext#from(BulkWriterConfig)}</li>
+ * <li>Executors reconstruct BulkWriterContext via {@link
BulkWriterConfig#toBulkWriterContext()}</li>
* </ol>
*
* <p>Broadcastable wrappers used in BulkWriterConfig:
@@ -112,7 +112,7 @@ public abstract class AbstractBulkWriterContext implements
BulkWriterContext, Kr
/**
* Constructor for executor usage.
* Reconstructs components from broadcast configuration on executors.
- * This is used by the factory method {@link
BulkWriterContext#from(BulkWriterConfig)}.
+ * This is used by {@link BulkWriterConfig#toBulkWriterContext()}.
*
* @param config immutable configuration for the bulk writer with
pre-computed values
*/
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
index e2a3283a..0e126fc9 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.bulkwriter;
import java.io.Serializable;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import org.jetbrains.annotations.NotNull;
/**
@@ -42,10 +43,10 @@ import org.jetbrains.annotations.NotNull;
* and minimize Spark SizeEstimator overhead.
* <p>
* On executors, {@link BulkWriterContext} instances are reconstructed from
this config using
- * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the
broadcastable
- * wrappers and reconstructs the full implementations with fresh data from
Cassandra Sidecar.
+ * {@link #toBulkWriterContext()}, which creates the appropriate context
implementation and
+ * reconstructs the full implementations with fresh data from Cassandra
Sidecar.
*/
-public final class BulkWriterConfig implements Serializable
+public class BulkWriterConfig implements Serializable
{
private static final long serialVersionUID = 1L;
@@ -111,4 +112,17 @@ public final class BulkWriterConfig implements Serializable
{
return lowestCassandraVersion;
}
+
+ /**
+ * Factory method that reconstructs a {@link BulkWriterContext} on
executors from this broadcast config.
+ * Subclasses may override this to return custom context implementations
for specialized reconstruction.
+ *
+ * @return a new BulkWriterContext instance appropriate for the current
configuration
+ */
+ public BulkWriterContext toBulkWriterContext()
+ {
+ return getConf().isCoordinatedWriteConfigured()
+ ? new CassandraCoordinatedBulkWriterContext(this)
+ : new CassandraBulkWriterContext(this);
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
index a76b7c60..8f240215 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.spark.api.java.JavaSparkContext;
/**
* Context for bulk write operations, providing access to cluster, job,
schema, and transport information.
@@ -29,7 +30,7 @@ import org.apache.cassandra.bridge.CassandraBridge;
* Serialization Architecture:
* This interface does NOT extend Serializable. BulkWriterContext instances
are never broadcast to executors.
* Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct
BulkWriterContext instances
- * from the config using the factory method {@link #from(BulkWriterConfig)}.
+ * from the config using {@link BulkWriterConfig#toBulkWriterContext()}.
* <p>
* The implementations ({@link CassandraBulkWriterContext}, {@link
CassandraCoordinatedBulkWriterContext})
* do NOT have serialVersionUID fields as they are never serialized.
@@ -53,23 +54,12 @@ public interface BulkWriterContext
TransportContext transportContext();
/**
- * Factory method to create a BulkWriterContext from a BulkWriterConfig on
executors.
- * This method reconstructs context instances on executors from the
broadcast configuration.
- * The driver creates contexts directly using constructors, not this
method.
+ * Converts this context into an immutable {@link BulkWriterConfig}
suitable for broadcasting to executors.
+ * Executors reconstruct a full {@link BulkWriterContext} from the config
via
+ * {@link BulkWriterConfig#toBulkWriterContext()}.
*
- * @param config the immutable configuration object broadcast from driver
- * @return a new BulkWriterContext instance
+ * @param sparkContext the Spark context (used to obtain default
parallelism)
+ * @return an immutable config containing all broadcastable state
*/
- static BulkWriterContext from(BulkWriterConfig config)
- {
- BulkSparkConf conf = config.getConf();
- if (conf.isCoordinatedWriteConfigured())
- {
- return new CassandraCoordinatedBulkWriterContext(config);
- }
- else
- {
- return new CassandraBulkWriterContext(config);
- }
- }
+ BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext);
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index f6323af3..44bd5e16 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -42,7 +42,6 @@ import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransf
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
-import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
@@ -90,69 +89,13 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
this.sqlContext = sqlContext;
this.sparkContext =
JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
// Extract immutable configuration from the context for broadcasting
- BulkWriterConfig config = extractConfig(writerContext,
sparkContext.defaultParallelism());
+ BulkWriterConfig config =
writerContext.toBulkWriterConfigForBroadcasting(sparkContext);
this.broadcastConfig =
sparkContext.<BulkWriterConfig>broadcast(config);
ReplicaAwareFailureHandler<RingInstance> failureHandler = new
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
this.writeValidator = new BulkWriteValidator(writerContext,
failureHandler);
this.simpleTaskScheduler = new SimpleTaskScheduler();
}
- /**
- * Extracts immutable configuration from a BulkWriterContext for
broadcasting.
- * Creates BroadcastableCluster, BroadcastableJobInfo, and
BroadcastableSchemaInfo
- * to ensure zero transient fields and avoid Logger references in the
broadcast object.
- */
- private static BulkWriterConfig extractConfig(BulkWriterContext context,
int sparkDefaultParallelism)
- {
- if (context instanceof AbstractBulkWriterContext)
- {
- AbstractBulkWriterContext abstractContext =
(AbstractBulkWriterContext) context;
- ClusterInfo originalClusterInfo = abstractContext.cluster();
-
- // Create BroadcastableCluster to avoid transient fields in
broadcast
- IBroadcastableClusterInfo broadcastableClusterInfo;
- if (originalClusterInfo instanceof CassandraClusterInfoGroup)
- {
- // Coordinated write scenario
- @SuppressWarnings("unchecked")
- CassandraClusterInfoGroup multiCluster =
(CassandraClusterInfoGroup) originalClusterInfo;
- broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(
- multiCluster,
- abstractContext.bulkSparkConf()
- );
- }
- else
- {
- // Single cluster scenario
- broadcastableClusterInfo = BroadcastableClusterInfo.from(
- originalClusterInfo,
- abstractContext.bulkSparkConf()
- );
- }
-
- // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner
- BroadcastableJobInfo broadcastableJobInfo =
BroadcastableJobInfo.from(
- abstractContext.job(),
- abstractContext.bulkSparkConf()
- );
-
- // Create BroadcastableSchemaInfo to avoid Logger in TableSchema
- BroadcastableSchemaInfo broadcastableSchemaInfo =
BroadcastableSchemaInfo.from(
- abstractContext.schema()
- );
-
- return new BulkWriterConfig(
- abstractContext.bulkSparkConf(),
- sparkDefaultParallelism,
- broadcastableJobInfo,
- broadcastableClusterInfo,
- broadcastableSchemaInfo,
- abstractContext.lowestCassandraVersion()
- );
- }
- throw new IllegalArgumentException("Cannot extract config from context
type: " + context.getClass().getName());
- }
-
@Override
@NotNull
public SQLContext sqlContext()
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 5c516837..e658b3dc 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -45,7 +46,7 @@ public class CassandraBulkWriterContext extends
AbstractBulkWriterContext
}
/**
- * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)}
factory method.
+ * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
* This constructor is only used on executors to reconstruct context from
broadcast config.
*
* @param config immutable configuration for the bulk writer
@@ -81,4 +82,19 @@ public class CassandraBulkWriterContext extends
AbstractBulkWriterContext
{
return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
}
+
+ @Override
+ public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext)
+ {
+ IBroadcastableClusterInfo broadcastableClusterInfo =
BroadcastableClusterInfo.from(cluster(), bulkSparkConf());
+ BroadcastableJobInfo broadcastableJobInfo =
BroadcastableJobInfo.from(job(), bulkSparkConf());
+ BroadcastableSchemaInfo broadcastableSchemaInfo =
BroadcastableSchemaInfo.from(schema());
+
+ return new BulkWriterConfig(bulkSparkConf(),
+ sparkContext.defaultParallelism(),
+ broadcastableJobInfo,
+ broadcastableClusterInfo,
+ broadcastableSchemaInfo,
+ lowestCassandraVersion());
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index ad143fb0..a9c12087 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -65,24 +65,32 @@ import static
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIden
/**
* Driver-only implementation of {@link ClusterInfo} for single cluster
operations.
* <p>
- * This class is NOT serialized and does NOT have a serialVersionUID.
- * When broadcasting to executors, the driver extracts information from this
class
- * and creates a {@link BroadcastableClusterInfo} instance, which is then
included
- * in the {@link BulkWriterConfig} that gets broadcast.
+ * This class is NOT serialized. When broadcasting to executors, the driver
extracts
+ * broadcast-safe fields via {@link BroadcastableClusterInfo#from(ClusterInfo,
BulkSparkConf)}
+ * and includes the result in the {@link BulkWriterConfig} that gets broadcast.
* <p>
- * This class implements Serializable only because the {@link ClusterInfo}
interface
- * requires it (for use as a field type in broadcast classes), but instances
of this
- * class are never directly serialized.
+ * On executors, a new instance is reconstructed from {@link
BroadcastableClusterInfo}
+ * using {@link #CassandraClusterInfo(BroadcastableClusterInfo)}, reusing
broadcast-safe
+ * fields and fetching other data fresh from Sidecar.
+ *
+ * @see BroadcastableClusterInfo for the broadcast-safe subset of fields
*/
public class CassandraClusterInfo implements ClusterInfo, Closeable
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterInfo.class);
+ // -- Broadcast-safe fields --
+ // Extracted by BroadcastableClusterInfo.from() and sent to executors.
+ // Changes here must be reflected in BroadcastableClusterInfo.
protected final BulkSparkConf conf;
protected final String clusterId;
protected String cassandraVersion;
protected Partitioner partitioner;
+ // -- Driver-only fields (not broadcast) --
+ // NOT included in BroadcastableClusterInfo. Either expensive to serialize
+ // (token mappings, schema) or non-serializable (CassandraContext,
Futures).
+ // Executors reconstruct these fresh from Sidecar via
CassandraClusterInfo(BroadcastableClusterInfo).
protected volatile TokenRangeMapping<RingInstance> tokenRangeReplicas;
protected volatile String keyspaceSchema;
protected volatile ReplicationFactor replicationFactor;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 33d094e4..d707a651 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -93,7 +93,7 @@ public class RecordWriter
*/
public RecordWriter(BulkWriterConfig config, String[] columnNames)
{
- this(BulkWriterContext.from(config), columnNames, TaskContext::get,
SortedSSTableWriter::new);
+ this(config.toBulkWriterContext(), columnNames, TaskContext::get,
SortedSSTableWriter::new);
}
@VisibleForTesting
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
index 9fd870cd..4a434a7a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.RingInstance;
import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo;
import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
+import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -136,7 +137,14 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
return new CassandraClusterInfoGroup(clusterInfos);
}
- @VisibleForTesting // ONLY FOR TESTING
+ /**
+ * Creates a {@link CassandraClusterInfoGroup} from a pre-built list of
{@link ClusterInfo} instances.
+ * This factory is intended for custom {@link IBroadcastableClusterInfo}
implementations that reconstruct
+ * cluster infos individually and need to wrap them in a group.
+ *
+ * @param clusterInfos the list of already-reconstructed ClusterInfo
instances
+ * @return a new CassandraClusterInfoGroup
+ */
public static CassandraClusterInfoGroup createFrom(List<ClusterInfo>
clusterInfos)
{
return new CassandraClusterInfoGroup(clusterInfos);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index 5329fc21..6cd199bf 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -25,10 +25,15 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableJobInfo;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableSchemaInfo;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.DataTransport;
+import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -50,7 +55,7 @@ public class CassandraCoordinatedBulkWriterContext extends
AbstractBulkWriterCon
}
/**
- * Constructor used by {@link
org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)}
factory method.
+ * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
* This constructor is only used on executors to reconstruct context from
broadcast config.
*
* @param config immutable configuration for the bulk writer
@@ -115,4 +120,20 @@ public class CassandraCoordinatedBulkWriterContext extends
AbstractBulkWriterCon
{
return (CassandraClusterInfoGroup) cluster();
}
+
+ @Override
+ public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext)
+ {
+ CassandraClusterInfoGroup multiCluster = clusterInfoGroup();
+ IBroadcastableClusterInfo broadcastableClusterInfo =
BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf());
+ BroadcastableJobInfo broadcastableJobInfo =
BroadcastableJobInfo.from(job(), bulkSparkConf());
+ BroadcastableSchemaInfo broadcastableSchemaInfo =
BroadcastableSchemaInfo.from(schema());
+
+ return new BulkWriterConfig(bulkSparkConf(),
+ sparkContext.defaultParallelism(),
+ broadcastableJobInfo,
+ broadcastableClusterInfo,
+ broadcastableSchemaInfo,
+ lowestCassandraVersion());
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
new file mode 100644
index 00000000..380c4460
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.CassandraBridge;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
+import org.jetbrains.annotations.NotNull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that verify the extensibility contract for the bulk writer
broadcast/reconstruction chain.
+ * These tests prove that downstream implementations can:
+ * <ul>
+ * <li>Subclass {@link BulkWriterConfig} and override {@link
BulkWriterConfig#toBulkWriterContext()}</li>
+ * <li>Implement {@link IBroadcastableClusterInfo} with custom {@code
reconstruct()} logic</li>
+ * <li>Subclass {@link AbstractBulkWriterContext} and override {@code
reconstructJobInfoOnExecutor()}</li>
+ * </ul>
+ */
+class BulkWriterConfigExtensibilityTest
+{
+ @Test
+ void testToBulkWriterContextCanBeOverridden()
+ {
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+ IBroadcastableClusterInfo mockClusterInfo =
mock(IBroadcastableClusterInfo.class);
+ BroadcastableSchemaInfo mockSchemaInfo =
mock(BroadcastableSchemaInfo.class);
+
+ // A custom BulkWriterConfig subclass overriding toBulkWriterContext()
+ BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4,
mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0")
+ {
+ @Override
+ public BulkWriterContext toBulkWriterContext()
+ {
+ return mock(BulkWriterContext.class);
+ }
+ };
+
+ BulkWriterContext context = customConfig.toBulkWriterContext();
+ assertThat(context).isNotNull();
+ // The base class would return CassandraBulkWriterContext or
CassandraCoordinatedBulkWriterContext,
+ // but our subclass returns a mock — proving the override is
dispatched.
+ assertThat(context).isNotInstanceOf(CassandraBulkWriterContext.class);
+ }
+
+ @Test
+ void testCustomIBroadcastableClusterInfoReconstructIsCalled()
+ {
+ ClusterInfo expectedCluster = mock(ClusterInfo.class);
+ IBroadcastableClusterInfo mockBroadcastable =
mock(IBroadcastableClusterInfo.class);
+ when(mockBroadcastable.reconstruct()).thenReturn(expectedCluster);
+
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+ BroadcastableSchemaInfo mockSchemaInfo =
mock(BroadcastableSchemaInfo.class);
+
+ BulkWriterConfig config = new BulkWriterConfig(mockConf, 4,
mockJobInfo, mockBroadcastable, mockSchemaInfo, "4.0.0");
+
+ TestBulkWriterContext context = new TestBulkWriterContext(config);
+
+ assertThat(context.cluster()).isSameAs(expectedCluster);
+ }
+
+ @Test
+ void testReconstructJobInfoOnExecutorCanBeOverridden()
+ {
+ JobInfo expectedJobInfo = mock(JobInfo.class);
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+ BroadcastableSchemaInfo mockSchemaInfo =
mock(BroadcastableSchemaInfo.class);
+ BulkWriterConfig config = new BulkWriterConfig(mockConf, 4,
mockJobInfo, mock(IBroadcastableClusterInfo.class), mockSchemaInfo, "4.0.0");
+
+ // Subclass that overrides reconstructJobInfoOnExecutor to return
custom JobInfo
+ TestBulkWriterContext context = new TestBulkWriterContext(config)
+ {
+ @Override
+ protected JobInfo
reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo)
+ {
+ return expectedJobInfo;
+ }
+ };
+
+ assertThat(context.job()).isSameAs(expectedJobInfo);
+ }
+
+ /**
+ * Minimal AbstractBulkWriterContext subclass for testing executor-side
reconstruction
+ * without requiring real Cassandra infrastructure.
+ */
+ private static class TestBulkWriterContext extends
AbstractBulkWriterContext
+ {
+ TestBulkWriterContext(@NotNull BulkWriterConfig config)
+ {
+ super(config);
+ }
+
+ @Override
+ protected ClusterInfo buildClusterInfo()
+ {
+ throw new UnsupportedOperationException("Driver-only");
+ }
+
+ @Override
+ protected void validateKeyspaceReplication()
+ {
+ }
+
+ @Override
+ protected MultiClusterContainer<UUID> generateRestoreJobIds()
+ {
+ throw new UnsupportedOperationException("Driver-only");
+ }
+
+ @Override
+ protected CassandraBridge buildCassandraBridge()
+ {
+ return mock(CassandraBridge.class);
+ }
+
+ @Override
+ protected TransportContext buildTransportContext(boolean isOnDriver)
+ {
+ return mock(TransportContext.class);
+ }
+
+ @Override
+ protected JobStatsPublisher buildJobStatsPublisher()
+ {
+ return mock(JobStatsPublisher.class);
+ }
+
+ @Override
+ protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo
jobInfo)
+ {
+ return mock(JobInfo.class);
+ }
+
+ @Override
+ protected SchemaInfo
reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo)
+ {
+ return mock(SchemaInfo.class);
+ }
+
+ @Override
+ public BulkWriterConfig
toBulkWriterConfigForBroadcasting(org.apache.spark.api.java.JavaSparkContext
sparkContext)
+ {
+ throw new UnsupportedOperationException("Not needed for test");
+ }
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index d9cefac4..cb5564f0 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -60,6 +60,7 @@ import
org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.exception.SidecarApiCallException;
import org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
import org.apache.cassandra.spark.validation.StartupValidator;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -551,4 +552,10 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
{
StartupValidator.instance().perform();
}
+
+ @Override
+ public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext)
+ {
+ throw new UnsupportedOperationException("Not implemented in mock");
+ }
}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
index fd87abd5..80867b2d 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -26,7 +26,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
-import com.datastax.driver.core.exceptions.ReadTimeoutException;
import org.junit.jupiter.api.Test;
import net.bytebuddy.ByteBuddy;
@@ -204,7 +203,10 @@ public class BulkReaderMultiDCConsistencyTest extends
SharedClusterSparkIntegrat
}
catch (Exception e)
{
- if (attempt == 10 || !(e instanceof ReadTimeoutException))
+ // ReadTimeoutException here is of type
org.apache.cassandra.exceptions.ReadTimeoutException
+ // which is not available on the integration-tests compile
classpath
+ // Hence checking for class name instead of using
instanceof
+ if (attempt == 10 ||
!e.getClass().getName().endsWith("ReadTimeoutException"))
{
throw e;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]