frankgh commented on code in PR #146:
URL:
https://github.com/apache/cassandra-analytics/pull/146#discussion_r2471171915
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -84,6 +84,18 @@ public class RecordWriter
private final CqlTable cqlTable;
private StreamSession<?> streamSession = null;
+ /**
+ * Constructor that accepts a BulkWriterConfig and constructs the context
on the executor.
+ * This is used when the config is broadcast to executors.
+ *
+ * @param config immutable configuration broadcast from driver
+ * @param columnNames column names array
+ */
+ public RecordWriter(BulkWriterConfig config, String[] columnNames)
+ {
+ this(BulkWriterContext.from(config), columnNames);
Review Comment:
We only create a record writer from the executor, the other constructor is
not necessary.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java:
##########
@@ -39,13 +38,12 @@ interface SerializableFunction<T, R> extends Function<T,
R>, Serializable
private final TokenUtils tokenUtils;
private final SerializableFunction<Object[], Object[]> keyColumnProvider;
- public Tokenizer(BulkWriterContext writerContext)
+ public Tokenizer(BroadcastableTableSchema broadcastableTableSchema,
boolean isMurmur3Partitioner)
{
- TableSchema tableSchema = writerContext.schema().getTableSchema();
- this.tokenUtils = new TokenUtils(tableSchema.partitionKeyColumns,
- tableSchema.partitionKeyColumnTypes,
-
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner);
- this.keyColumnProvider = tableSchema::getKeyColumns;
+ this.tokenUtils = new
TokenUtils(broadcastableTableSchema.getPartitionKeyColumns(),
+
broadcastableTableSchema.getPartitionKeyColumnTypes(),
+ isMurmur3Partitioner);
Review Comment:
it seems a little silly that we pass a boolean flag here for determining
whether the partitioner is murmur3 or not. Can't we just directly pass the
partitioner to TokenUtils instead?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java:
##########
@@ -59,17 +61,29 @@
/**
* A group of ClusterInfo. One per cluster.
* The class does the aggregation over all clusters for applicable operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this
class
+ * and creates a {@link
org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup} instance,
+ * which is then included in the {@link
org.apache.cassandra.spark.bulkwriter.BulkWriterConfig}
+ * that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link
org.apache.cassandra.spark.bulkwriter.ClusterInfo}
+ * interface requires it (for use as a field type in broadcast classes), but
instances of this
+ * class are never directly serialized.
*/
public class CassandraClusterInfoGroup implements ClusterInfo,
MultiClusterSupport<ClusterInfo>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
- private static final long serialVersionUID = 5337884321245616172L;
-
// immutable
private final List<ClusterInfo> clusterInfos;
- private transient volatile Map<String, ClusterInfo> clusterInfoById;
- private transient volatile TokenRangeMapping<RingInstance>
consolidatedTokenRangeMapping;
+ private final String clusterId;
+ private volatile Map<String, ClusterInfo> clusterInfoById;
+ private volatile TokenRangeMapping<RingInstance>
consolidatedTokenRangeMapping;
+ // Pre-computed values from BroadcastableClusterInfoGroup (only set when
reconstructed on executors)
+ private volatile Partitioner cachedPartitioner;
+ private volatile String cachedLowestCassandraVersion;
Review Comment:
I don't think `cachedPartitioner` or `cachedLowestCassandraVersion` need to
be volatile
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]