This is an automated email from the ASF dual-hosted git repository.

yifan-c pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 652cc172 CASSANALYTICS-168: Make BulkWriterConfig extensible
652cc172 is described below

commit 652cc17217433254d07c29ee37e4b32b3e723410
Author: Shailaja Koppu <[email protected]>
AuthorDate: Thu May 7 17:19:10 2026 +0100

    CASSANALYTICS-168: Make BulkWriterConfig extensible
    
    Patch by Shailaja Koppu; Reviewed by Josh McKenzie, Yifan Cai for 
CASSANALYTICS-168
---
 CHANGES.txt                                        |   4 +
 .../bulkwriter/AbstractBulkWriterContext.java      |   4 +-
 .../spark/bulkwriter/BulkWriterConfig.java         |  20 ++-
 .../spark/bulkwriter/BulkWriterContext.java        |  26 +--
 .../bulkwriter/CassandraBulkSourceRelation.java    |  59 +------
 .../bulkwriter/CassandraBulkWriterContext.java     |  18 ++-
 .../spark/bulkwriter/CassandraClusterInfo.java     |  22 ++-
 .../cassandra/spark/bulkwriter/RecordWriter.java   |   2 +-
 .../coordinated/CassandraClusterInfoGroup.java     |  10 +-
 .../CassandraCoordinatedBulkWriterContext.java     |  23 ++-
 .../BulkWriterConfigExtensibilityTest.java         | 175 +++++++++++++++++++++
 .../spark/bulkwriter/MockBulkWriterContext.java    |   7 +
 .../BulkReaderMultiDCConsistencyTest.java          |   6 +-
 13 files changed, 282 insertions(+), 94 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a3ec87e3..4e5f27ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+0.5.0
+-----
+ * Make BulkWriterConfig extensible (CASSANALYTICS-168)
+
 0.4.0
 -----
  * Fix com.apple.mg namespace in cdc_bytes.avsc and cdc_generic_record.avsc 
(CASSANALYTICS-166)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index b6eff3c8..fd2210d0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -54,7 +54,7 @@ import org.jetbrains.annotations.NotNull;
  *   <li>Driver creates BulkWriterContext using constructor</li>
  *   <li>Driver extracts BulkWriterConfig in {@link 
CassandraBulkSourceRelation} constructor</li>
  *   <li>BulkWriterConfig gets broadcast to executors</li>
- *   <li>Executors reconstruct BulkWriterContext via {@link 
BulkWriterContext#from(BulkWriterConfig)}</li>
+ *   <li>Executors reconstruct BulkWriterContext via {@link 
BulkWriterConfig#toBulkWriterContext()}</li>
  * </ol>
  *
  * <p>Broadcastable wrappers used in BulkWriterConfig:
@@ -112,7 +112,7 @@ public abstract class AbstractBulkWriterContext implements 
BulkWriterContext, Kr
     /**
      * Constructor for executor usage.
      * Reconstructs components from broadcast configuration on executors.
-     * This is used by the factory method {@link 
BulkWriterContext#from(BulkWriterConfig)}.
+     * This is used by {@link BulkWriterConfig#toBulkWriterContext()}.
      *
      * @param config immutable configuration for the bulk writer with 
pre-computed values
      */
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
index e2a3283a..0e126fc9 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.io.Serializable;
 
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -42,10 +43,10 @@ import org.jetbrains.annotations.NotNull;
  * and minimize Spark SizeEstimator overhead.
  * <p>
  * On executors, {@link BulkWriterContext} instances are reconstructed from 
this config using
- * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the 
broadcastable
- * wrappers and reconstructs the full implementations with fresh data from 
Cassandra Sidecar.
+ * {@link #toBulkWriterContext()}, which creates the appropriate context 
implementation and
+ * reconstructs the full implementations with fresh data from Cassandra 
Sidecar.
  */
-public final class BulkWriterConfig implements Serializable
+public class BulkWriterConfig implements Serializable
 {
     private static final long serialVersionUID = 1L;
 
@@ -111,4 +112,17 @@ public final class BulkWriterConfig implements Serializable
     {
         return lowestCassandraVersion;
     }
+
+    /**
+     * Factory method that reconstructs a {@link BulkWriterContext} on 
executors from this broadcast config.
+     * Subclasses may override this to return custom context implementations 
for specialized reconstruction.
+     *
+     * @return a new BulkWriterContext instance appropriate for the current 
configuration
+     */
+    public BulkWriterContext toBulkWriterContext()
+    {
+        return getConf().isCoordinatedWriteConfigured()
+               ? new CassandraCoordinatedBulkWriterContext(this)
+               : new CassandraBulkWriterContext(this);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
index a76b7c60..8f240215 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
 import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
 import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * Context for bulk write operations, providing access to cluster, job, 
schema, and transport information.
@@ -29,7 +30,7 @@ import org.apache.cassandra.bridge.CassandraBridge;
  * Serialization Architecture:
  * This interface does NOT extend Serializable. BulkWriterContext instances 
are never broadcast to executors.
  * Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct 
BulkWriterContext instances
- * from the config using the factory method {@link #from(BulkWriterConfig)}.
+ * from the config using {@link BulkWriterConfig#toBulkWriterContext()}.
  * <p>
  * The implementations ({@link CassandraBulkWriterContext}, {@link 
CassandraCoordinatedBulkWriterContext})
  * do NOT have serialVersionUID fields as they are never serialized.
@@ -53,23 +54,12 @@ public interface BulkWriterContext
     TransportContext transportContext();
 
     /**
-     * Factory method to create a BulkWriterContext from a BulkWriterConfig on 
executors.
-     * This method reconstructs context instances on executors from the 
broadcast configuration.
-     * The driver creates contexts directly using constructors, not this 
method.
+     * Converts this context into an immutable {@link BulkWriterConfig} 
suitable for broadcasting to executors.
+     * Executors reconstruct a full {@link BulkWriterContext} from the config 
via
+     * {@link BulkWriterConfig#toBulkWriterContext()}.
      *
-     * @param config the immutable configuration object broadcast from driver
-     * @return a new BulkWriterContext instance
+     * @param sparkContext the Spark context (used to obtain default 
parallelism)
+     * @return an immutable config containing all broadcastable state
      */
-    static BulkWriterContext from(BulkWriterConfig config)
-    {
-        BulkSparkConf conf = config.getConf();
-        if (conf.isCoordinatedWriteConfigured())
-        {
-            return new CassandraCoordinatedBulkWriterContext(config);
-        }
-        else
-        {
-            return new CassandraBulkWriterContext(config);
-        }
-    }
+    BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext);
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index f6323af3..44bd5e16 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -42,7 +42,6 @@ import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransf
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
 import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
-import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
@@ -90,69 +89,13 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         this.sqlContext = sqlContext;
         this.sparkContext = 
JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
         // Extract immutable configuration from the context for broadcasting
-        BulkWriterConfig config = extractConfig(writerContext, 
sparkContext.defaultParallelism());
+        BulkWriterConfig config = 
writerContext.toBulkWriterConfigForBroadcasting(sparkContext);
         this.broadcastConfig = 
sparkContext.<BulkWriterConfig>broadcast(config);
         ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
         this.writeValidator = new BulkWriteValidator(writerContext, 
failureHandler);
         this.simpleTaskScheduler = new SimpleTaskScheduler();
     }
 
-    /**
-     * Extracts immutable configuration from a BulkWriterContext for 
broadcasting.
-     * Creates BroadcastableCluster, BroadcastableJobInfo, and 
BroadcastableSchemaInfo
-     * to ensure zero transient fields and avoid Logger references in the 
broadcast object.
-     */
-    private static BulkWriterConfig extractConfig(BulkWriterContext context, 
int sparkDefaultParallelism)
-    {
-        if (context instanceof AbstractBulkWriterContext)
-        {
-            AbstractBulkWriterContext abstractContext = 
(AbstractBulkWriterContext) context;
-            ClusterInfo originalClusterInfo = abstractContext.cluster();
-
-            // Create BroadcastableCluster to avoid transient fields in 
broadcast
-            IBroadcastableClusterInfo broadcastableClusterInfo;
-            if (originalClusterInfo instanceof CassandraClusterInfoGroup)
-            {
-                // Coordinated write scenario
-                @SuppressWarnings("unchecked")
-                CassandraClusterInfoGroup multiCluster = 
(CassandraClusterInfoGroup) originalClusterInfo;
-                broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(
-                    multiCluster,
-                    abstractContext.bulkSparkConf()
-                );
-            }
-            else
-            {
-                // Single cluster scenario
-                broadcastableClusterInfo = BroadcastableClusterInfo.from(
-                    originalClusterInfo,
-                    abstractContext.bulkSparkConf()
-                );
-            }
-
-            // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner
-            BroadcastableJobInfo broadcastableJobInfo = 
BroadcastableJobInfo.from(
-                abstractContext.job(),
-                abstractContext.bulkSparkConf()
-            );
-
-            // Create BroadcastableSchemaInfo to avoid Logger in TableSchema
-            BroadcastableSchemaInfo broadcastableSchemaInfo = 
BroadcastableSchemaInfo.from(
-                abstractContext.schema()
-            );
-
-            return new BulkWriterConfig(
-                abstractContext.bulkSparkConf(),
-                sparkDefaultParallelism,
-                broadcastableJobInfo,
-                broadcastableClusterInfo,
-                broadcastableSchemaInfo,
-                abstractContext.lowestCassandraVersion()
-            );
-        }
-        throw new IllegalArgumentException("Cannot extract config from context 
type: " + context.getClass().getName());
-    }
-
     @Override
     @NotNull
     public SQLContext sqlContext()
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 5c516837..e658b3dc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
@@ -45,7 +46,7 @@ public class CassandraBulkWriterContext extends 
AbstractBulkWriterContext
     }
 
     /**
-     * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)} 
factory method.
+     * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
      * This constructor is only used on executors to reconstruct context from 
broadcast config.
      *
      * @param config immutable configuration for the bulk writer
@@ -81,4 +82,19 @@ public class CassandraBulkWriterContext extends 
AbstractBulkWriterContext
     {
         return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
     }
+
+    @Override
+    public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext)
+    {
+        IBroadcastableClusterInfo broadcastableClusterInfo = 
BroadcastableClusterInfo.from(cluster(), bulkSparkConf());
+        BroadcastableJobInfo broadcastableJobInfo = 
BroadcastableJobInfo.from(job(), bulkSparkConf());
+        BroadcastableSchemaInfo broadcastableSchemaInfo = 
BroadcastableSchemaInfo.from(schema());
+
+        return new BulkWriterConfig(bulkSparkConf(),
+                                    sparkContext.defaultParallelism(),
+                                    broadcastableJobInfo,
+                                    broadcastableClusterInfo,
+                                    broadcastableSchemaInfo,
+                                    lowestCassandraVersion());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index ad143fb0..a9c12087 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -65,24 +65,32 @@ import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIden
 /**
  * Driver-only implementation of {@link ClusterInfo} for single cluster 
operations.
  * <p>
- * This class is NOT serialized and does NOT have a serialVersionUID.
- * When broadcasting to executors, the driver extracts information from this 
class
- * and creates a {@link BroadcastableClusterInfo} instance, which is then 
included
- * in the {@link BulkWriterConfig} that gets broadcast.
+ * This class is NOT serialized. When broadcasting to executors, the driver 
extracts
+ * broadcast-safe fields via {@link BroadcastableClusterInfo#from(ClusterInfo, 
BulkSparkConf)}
+ * and includes the result in the {@link BulkWriterConfig} that gets broadcast.
  * <p>
- * This class implements Serializable only because the {@link ClusterInfo} 
interface
- * requires it (for use as a field type in broadcast classes), but instances 
of this
- * class are never directly serialized.
+ * On executors, a new instance is reconstructed from {@link 
BroadcastableClusterInfo}
+ * using {@link #CassandraClusterInfo(BroadcastableClusterInfo)}, reusing 
broadcast-safe
+ * fields and fetching other data fresh from Sidecar.
+ *
+ * @see BroadcastableClusterInfo for the broadcast-safe subset of fields
  */
 public class CassandraClusterInfo implements ClusterInfo, Closeable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfo.class);
 
+    // -- Broadcast-safe fields --
+    // Extracted by BroadcastableClusterInfo.from() and sent to executors.
+    // Changes here must be reflected in BroadcastableClusterInfo.
     protected final BulkSparkConf conf;
     protected final String clusterId;
     protected String cassandraVersion;
     protected Partitioner partitioner;
 
+    // -- Driver-only fields (not broadcast) --
+    // NOT included in BroadcastableClusterInfo. Either expensive to serialize
+    // (token mappings, schema) or non-serializable (CassandraContext, 
Futures).
+    // Executors reconstruct these fresh from Sidecar via 
CassandraClusterInfo(BroadcastableClusterInfo).
     protected volatile TokenRangeMapping<RingInstance> tokenRangeReplicas;
     protected volatile String keyspaceSchema;
     protected volatile ReplicationFactor replicationFactor;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 33d094e4..d707a651 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -93,7 +93,7 @@ public class RecordWriter
      */
     public RecordWriter(BulkWriterConfig config, String[] columnNames)
     {
-        this(BulkWriterContext.from(config), columnNames, TaskContext::get, 
SortedSSTableWriter::new);
+        this(config.toBulkWriterContext(), columnNames, TaskContext::get, 
SortedSSTableWriter::new);
     }
 
     @VisibleForTesting
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
index 9fd870cd..4a434a7a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
+import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -136,7 +137,14 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
         return new CassandraClusterInfoGroup(clusterInfos);
     }
 
-    @VisibleForTesting // ONLY FOR TESTING
+    /**
+     * Creates a {@link CassandraClusterInfoGroup} from a pre-built list of 
{@link ClusterInfo} instances.
+     * This factory is intended for custom {@link IBroadcastableClusterInfo} 
implementations that reconstruct
+     * cluster infos individually and need to wrap them in a group.
+     *
+     * @param clusterInfos the list of already-reconstructed ClusterInfo 
instances
+     * @return a new CassandraClusterInfoGroup
+     */
     public static CassandraClusterInfoGroup createFrom(List<ClusterInfo> 
clusterInfos)
     {
         return new CassandraClusterInfoGroup(clusterInfos);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index 5329fc21..6cd199bf 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -25,10 +25,15 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableJobInfo;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableSchemaInfo;
 import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
 import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
 import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.DataTransport;
+import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
@@ -50,7 +55,7 @@ public class CassandraCoordinatedBulkWriterContext extends 
AbstractBulkWriterCon
     }
 
     /**
-     * Constructor used by {@link 
org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)} 
factory method.
+     * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
      * This constructor is only used on executors to reconstruct context from 
broadcast config.
      *
      * @param config immutable configuration for the bulk writer
@@ -115,4 +120,20 @@ public class CassandraCoordinatedBulkWriterContext extends 
AbstractBulkWriterCon
     {
         return (CassandraClusterInfoGroup) cluster();
     }
+
+    @Override
+    public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext)
+    {
+        CassandraClusterInfoGroup multiCluster = clusterInfoGroup();
+        IBroadcastableClusterInfo broadcastableClusterInfo = 
BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf());
+        BroadcastableJobInfo broadcastableJobInfo = 
BroadcastableJobInfo.from(job(), bulkSparkConf());
+        BroadcastableSchemaInfo broadcastableSchemaInfo = 
BroadcastableSchemaInfo.from(schema());
+
+        return new BulkWriterConfig(bulkSparkConf(),
+                                    sparkContext.defaultParallelism(),
+                                    broadcastableJobInfo,
+                                    broadcastableClusterInfo,
+                                    broadcastableSchemaInfo,
+                                    lowestCassandraVersion());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
new file mode 100644
index 00000000..380c4460
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.CassandraBridge;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
+import org.jetbrains.annotations.NotNull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that verify the extensibility contract for the bulk writer 
broadcast/reconstruction chain.
+ * These tests prove that downstream implementations can:
+ * <ul>
+ *   <li>Subclass {@link BulkWriterConfig} and override {@link 
BulkWriterConfig#toBulkWriterContext()}</li>
+ *   <li>Implement {@link IBroadcastableClusterInfo} with custom {@code 
reconstruct()} logic</li>
+ *   <li>Subclass {@link AbstractBulkWriterContext} and override {@code 
reconstructJobInfoOnExecutor()}</li>
+ * </ul>
+ */
+class BulkWriterConfigExtensibilityTest
+{
+    @Test
+    void testToBulkWriterContextCanBeOverridden()
+    {
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+        IBroadcastableClusterInfo mockClusterInfo = 
mock(IBroadcastableClusterInfo.class);
+        BroadcastableSchemaInfo mockSchemaInfo = 
mock(BroadcastableSchemaInfo.class);
+
+        // A custom BulkWriterConfig subclass overriding toBulkWriterContext()
+        BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4, 
mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0")
+        {
+            @Override
+            public BulkWriterContext toBulkWriterContext()
+            {
+                return mock(BulkWriterContext.class);
+            }
+        };
+
+        BulkWriterContext context = customConfig.toBulkWriterContext();
+        assertThat(context).isNotNull();
+        // The base class would return CassandraBulkWriterContext or 
CassandraCoordinatedBulkWriterContext,
+        // but our subclass returns a mock — proving the override is 
dispatched.
+        assertThat(context).isNotInstanceOf(CassandraBulkWriterContext.class);
+    }
+
+    @Test
+    void testCustomIBroadcastableClusterInfoReconstructIsCalled()
+    {
+        ClusterInfo expectedCluster = mock(ClusterInfo.class);
+        IBroadcastableClusterInfo mockBroadcastable = 
mock(IBroadcastableClusterInfo.class);
+        when(mockBroadcastable.reconstruct()).thenReturn(expectedCluster);
+
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+        BroadcastableSchemaInfo mockSchemaInfo = 
mock(BroadcastableSchemaInfo.class);
+
+        BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, 
mockJobInfo, mockBroadcastable, mockSchemaInfo, "4.0.0");
+
+        TestBulkWriterContext context = new TestBulkWriterContext(config);
+
+        assertThat(context.cluster()).isSameAs(expectedCluster);
+    }
+
+    @Test
+    void testReconstructJobInfoOnExecutorCanBeOverridden()
+    {
+        JobInfo expectedJobInfo = mock(JobInfo.class);
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+        BroadcastableSchemaInfo mockSchemaInfo = 
mock(BroadcastableSchemaInfo.class);
+        BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, 
mockJobInfo, mock(IBroadcastableClusterInfo.class), mockSchemaInfo, "4.0.0");
+
+        // Subclass that overrides reconstructJobInfoOnExecutor to return 
custom JobInfo
+        TestBulkWriterContext context = new TestBulkWriterContext(config)
+        {
+            @Override
+            protected JobInfo 
reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo)
+            {
+                return expectedJobInfo;
+            }
+        };
+
+        assertThat(context.job()).isSameAs(expectedJobInfo);
+    }
+
+    /**
+     * Minimal AbstractBulkWriterContext subclass for testing executor-side 
reconstruction
+     * without requiring real Cassandra infrastructure.
+     */
+    private static class TestBulkWriterContext extends 
AbstractBulkWriterContext
+    {
+        TestBulkWriterContext(@NotNull BulkWriterConfig config)
+        {
+            super(config);
+        }
+
+        @Override
+        protected ClusterInfo buildClusterInfo()
+        {
+            throw new UnsupportedOperationException("Driver-only");
+        }
+
+        @Override
+        protected void validateKeyspaceReplication()
+        {
+        }
+
+        @Override
+        protected MultiClusterContainer<UUID> generateRestoreJobIds()
+        {
+            throw new UnsupportedOperationException("Driver-only");
+        }
+
+        @Override
+        protected CassandraBridge buildCassandraBridge()
+        {
+            return mock(CassandraBridge.class);
+        }
+
+        @Override
+        protected TransportContext buildTransportContext(boolean isOnDriver)
+        {
+            return mock(TransportContext.class);
+        }
+
+        @Override
+        protected JobStatsPublisher buildJobStatsPublisher()
+        {
+            return mock(JobStatsPublisher.class);
+        }
+
+        @Override
+        protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo 
jobInfo)
+        {
+            return mock(JobInfo.class);
+        }
+
+        @Override
+        protected SchemaInfo 
reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo)
+        {
+            return mock(SchemaInfo.class);
+        }
+
+        @Override
+        public BulkWriterConfig 
toBulkWriterConfigForBroadcasting(org.apache.spark.api.java.JavaSparkContext 
sparkContext)
+        {
+            throw new UnsupportedOperationException("Not needed for test");
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index d9cefac4..cb5564f0 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -60,6 +60,7 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.exception.SidecarApiCallException;
 import org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
 import org.apache.cassandra.spark.validation.StartupValidator;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
@@ -551,4 +552,10 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     {
         StartupValidator.instance().perform();
     }
+
+    @Override
+    public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext)
+    {
+        throw new UnsupportedOperationException("Not implemented in mock");
+    }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
index fd87abd5..80867b2d 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -26,7 +26,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import com.datastax.driver.core.exceptions.ReadTimeoutException;
 import org.junit.jupiter.api.Test;
 
 import net.bytebuddy.ByteBuddy;
@@ -204,7 +203,10 @@ public class BulkReaderMultiDCConsistencyTest extends 
SharedClusterSparkIntegrat
                 }
                 catch (Exception e)
                 {
-                    if (attempt == 10 || !(e instanceof ReadTimeoutException))
+                    // ReadTimeoutException here is of type 
org.apache.cassandra.exceptions.ReadTimeoutException
+                    // which is not available on the integration-tests compile 
classpath
+                    // Hence checking for class name instead of using 
instanceof
+                    if (attempt == 10 || 
!e.getClass().getName().endsWith("ReadTimeoutException"))
                     {
                         throw e;
                     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to