jmckenzie-dev commented on code in PR #205:
URL: 
https://github.com/apache/cassandra-analytics/pull/205#discussion_r3209854713


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java:
##########
@@ -115,4 +120,31 @@ protected CassandraClusterInfoGroup clusterInfoGroup()
     {
         return (CassandraClusterInfoGroup) cluster();
     }
+
+    @Override
+    public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext)
+    {
+        ClusterInfo originalClusterInfo = cluster();
+
+        // Extract only broadcast-safe cluster metadata
+
+        // ClusterInfo has transient fields (CassandraContext, token mappings) 
that are not serializable

Review Comment:
   Currently the distinction on what is transient and what is not is implicit, 
derived from the verbiage in this method. How can we instead make it clear 
within the ClusterInfo what fields are serializable and what are not? 
Brainstorming, thinking:
   - javadoc comments
   - usage of an `@Serializable` and `@Serial` interface (kind of overloading 
and using in a different way than the formal usage but would annotate the 
intent)
   - Adding our own `@Immutable` style interface for something or otherwise 
denoting the fields final or pushing them to being final if appropriate 
   
   Having the serializability state of these fields denoted here in comments is 
brittle and runs a real risk of drift; changes in ClusterInfo could easily 
break these contracts in the future w/out another maintainer realizing it.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java:
##########
@@ -111,4 +112,23 @@ public String getLowestCassandraVersion()
     {
         return lowestCassandraVersion;
     }
+
+    /**
+     * Factory method that reconstructs a {@link BulkWriterContext} on 
executors from this broadcast config.
+     * Subclasses may override this to return custom context implementations 
for specialized reconstruction.
+     *
+     * @return a new BulkWriterContext instance appropriate for the current 
configuration
+     */
+    public BulkWriterContext toBulkWriterContext()
+    {
+        BulkSparkConf conf = getConf();
+        if (conf.isCoordinatedWriteConfigured())

Review Comment:
   stylistic nit: you could rewrite this as:
   ```
           return conf.isCoordinatedWriteConfigured() ?
               new CassandraCoordinatedBulkWriterContext(this) :
               new CassandraBulkWriterContext(this);
   ```
   Whether or not you think that's more clear is another story entirely. :)



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java:
##########
@@ -81,4 +82,30 @@ protected MultiClusterContainer<UUID> generateRestoreJobIds()
     {
         return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
     }
+
+    @Override
+    public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext 
sparkContext)
+    {
+        ClusterInfo originalClusterInfo = cluster();
+
+        // Extract only broadcast-safe cluster metadata
+
+        // ClusterInfo has transient fields (CassandraContext, token mappings) 
that are not serializable

Review Comment:
   Same as above - how can we make this more explicit near the source of the 
data and its serializability (and reflect the downstream expectation of that 
serializability) instead of having that information and expectation only 
reflected here?



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.CassandraBridge;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that verify the extensibility contract for the bulk writer 
broadcast/reconstruction chain.
+ * These tests prove that downstream implementations can:
+ * <ul>
+ *   <li>Subclass {@link BulkWriterConfig} and override {@link 
BulkWriterConfig#toBulkWriterContext()}</li>
+ *   <li>Implement {@link IBroadcastableClusterInfo} with custom {@code 
reconstruct()} logic</li>
+ *   <li>Subclass {@link AbstractBulkWriterContext} and override {@code 
reconstructJobInfoOnExecutor()}</li>
+ * </ul>
+ */
+class BulkWriterConfigExtensibilityTest
+{
+    @Test
+    void testToBulkWriterContextCanBeOverridden()
+    {
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+        IBroadcastableClusterInfo mockClusterInfo = 
mock(IBroadcastableClusterInfo.class);
+        BroadcastableSchemaInfo mockSchemaInfo = 
mock(BroadcastableSchemaInfo.class);
+
+        // A custom BulkWriterConfig subclass overriding toBulkWriterContext()
+        BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4, 
mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0")
+        {
+            @Override
+            public BulkWriterContext toBulkWriterContext()
+            {
+                return mock(BulkWriterContext.class);
+            }
+        };
+
+        BulkWriterContext context = customConfig.toBulkWriterContext();
+        assertThat(context).isNotNull();
+        // The OSS default would return CassandraBulkWriterContext or 
CassandraCoordinatedBulkWriterContext,

Review Comment:
   `The OSS default` <- this is the OSS project. Is this comment from another 
context and need to refine here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to