jmckenzie-dev commented on code in PR #205:
URL:
https://github.com/apache/cassandra-analytics/pull/205#discussion_r3209854713
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java:
##########
@@ -115,4 +120,31 @@ protected CassandraClusterInfoGroup clusterInfoGroup()
{
return (CassandraClusterInfoGroup) cluster();
}
+
+ @Override
+ public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext)
+ {
+ ClusterInfo originalClusterInfo = cluster();
+
+ // Extract only broadcast-safe cluster metadata
+
+ // ClusterInfo has transient fields (CassandraContext, token mappings)
that are not serializable
Review Comment:
Currently the distinction on what is transient and what is not is implicit,
derived from the verbiage in this method. How can we instead make it clear
within the ClusterInfo what fields are serializable and what are not?
Brainstorming, thinking:
- javadoc comments
- usage of an `@Serializable` and `@Serial` interface (kind of overloading
and using in a different way than the formal usage but would annotate the
intent)
- Adding our own `@Immutable` style interface for something or otherwise
denoting the fields final or pushing them to being final if appropriate
Having the serializability state of these fields denoted here in comments is
brittle and runs a real risk of drift; changes in ClusterInfo could easily
break these contracts in the future w/out another maintainer realizing it.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java:
##########
@@ -111,4 +112,23 @@ public String getLowestCassandraVersion()
{
return lowestCassandraVersion;
}
+
+ /**
+ * Factory method that reconstructs a {@link BulkWriterContext} on
executors from this broadcast config.
+ * Subclasses may override this to return custom context implementations
for specialized reconstruction.
+ *
+ * @return a new BulkWriterContext instance appropriate for the current
configuration
+ */
+ public BulkWriterContext toBulkWriterContext()
+ {
+ BulkSparkConf conf = getConf();
+ if (conf.isCoordinatedWriteConfigured())
Review Comment:
stylistic nit: you could rewrite this as:
```
return conf.isCoordinatedWriteConfigured() ?
new CassandraCoordinatedBulkWriterContext(this) :
new CassandraBulkWriterContext(this);
```
Whether or not you think that's more clear is another story entirely. :)
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java:
##########
@@ -81,4 +82,30 @@ protected MultiClusterContainer<UUID> generateRestoreJobIds()
{
return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
}
+
+ @Override
+ public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext
sparkContext)
+ {
+ ClusterInfo originalClusterInfo = cluster();
+
+ // Extract only broadcast-safe cluster metadata
+
+ // ClusterInfo has transient fields (CassandraContext, token mappings)
that are not serializable
Review Comment:
Same as above - how can we make this more explicit near the source of the
data and its serializability (and reflect the downstream expectation of that
serializability) instead of having that information and expectation only
reflected here?
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.CassandraBridge;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that verify the extensibility contract for the bulk writer
broadcast/reconstruction chain.
+ * These tests prove that downstream implementations can:
+ * <ul>
+ * <li>Subclass {@link BulkWriterConfig} and override {@link
BulkWriterConfig#toBulkWriterContext()}</li>
+ * <li>Implement {@link IBroadcastableClusterInfo} with custom {@code
reconstruct()} logic</li>
+ * <li>Subclass {@link AbstractBulkWriterContext} and override {@code
reconstructJobInfoOnExecutor()}</li>
+ * </ul>
+ */
+class BulkWriterConfigExtensibilityTest
+{
+ @Test
+ void testToBulkWriterContextCanBeOverridden()
+ {
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class);
+ IBroadcastableClusterInfo mockClusterInfo =
mock(IBroadcastableClusterInfo.class);
+ BroadcastableSchemaInfo mockSchemaInfo =
mock(BroadcastableSchemaInfo.class);
+
+ // A custom BulkWriterConfig subclass overriding toBulkWriterContext()
+ BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4,
mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0")
+ {
+ @Override
+ public BulkWriterContext toBulkWriterContext()
+ {
+ return mock(BulkWriterContext.class);
+ }
+ };
+
+ BulkWriterContext context = customConfig.toBulkWriterContext();
+ assertThat(context).isNotNull();
+ // The OSS default would return CassandraBulkWriterContext or
CassandraCoordinatedBulkWriterContext,
Review Comment:
`The OSS default` <- this is the OSS project. Is this comment from another
context and need to refine here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]