JeetKunDoug commented on code in PR #79:
URL: 
https://github.com/apache/cassandra-analytics/pull/79#discussion_r1755318108


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -223,7 +227,8 @@ public BulkSparkConf(SparkConf conf, Map<String, String> 
options)
         }
         this.jobTimeoutSeconds = MapUtils.getLong(options, 
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
         this.configuredJobId = MapUtils.getOrDefault(options, 
WriterOptions.JOB_ID.name(), null);
-
+        this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, 
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
+        this.coordinatedWriteConf = buildCoordinatedWriteConf(); // must only 
call the build method at this step, after the related fields have been resolved

Review Comment:
   If, instead, you passed the necessary parameters to 
`buildCoordinatedWriteConf` you could ensure the order was preserved, rather 
than just having a comment here. Move the build method to CoordinatedWriteConf 
as a static method and call it where necessary.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -282,6 +287,63 @@ Set<? extends SidecarInstance> sidecarContactPoints()
         return sidecarContactPoints;
     }
 
+    public boolean isCoordinatedWriteConfigured()
+    {
+        return coordinatedWriteConf != null;
+    }
+
+    public CoordinatedWriteConf coordinatedWriteConf()
+    {
+        if (coordinatedWriteConf == null)
+        {
+            coordinatedWriteConf = buildCoordinatedWriteConf();
+        }
+
+        return coordinatedWriteConf;
+    }
+
+    @Nullable
+    protected CoordinatedWriteConf buildCoordinatedWriteConf()

Review Comment:
   See above - this logic I think really belongs in 
CoordinatedWriteConf.build() or something.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Data class containing the configurations required for coordinated write.
+ * The serialization format is JSON string. The class takes care of 
serialization and deserialization.
+ */
+public class CoordinatedWriteConf
+{
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    // The runtime type of ClusterConfProvider is erased; use clustersOf 
method to read the desired type back
+    private final Map<String, ClusterConfProvider> clusters;
+
+    /**
+     * Parse JSON string and create a CoordinatedWriteConf object with the 
specified ClusterConfProvider format
+     *
+     * @param json JSON string
+     * @param clusterConfType concrete type of ClusterConfProvider that can be 
used for JSON serialization and deserialization
+     * @return CoordinatedWriteConf object
+     * @param <T> subtype of ClusterConfProvider
+     */
+    public static <T extends CoordinatedWriteConf.ClusterConfProvider>
+    CoordinatedWriteConf fromJson(String json, Class<T> clusterConfType)
+    {
+        JavaType javaType = 
TypeFactory.defaultInstance().constructMapType(Map.class, String.class, 
clusterConfType);
+        try
+        {
+            return new CoordinatedWriteConf(OBJECT_MAPPER.readValue(json, 
javaType));
+        }
+        catch (Exception e)
+        {
+            throw new IllegalArgumentException("Unable to parse json string 
into CoordinatedWriteConf of " + clusterConfType.getSimpleName() +
+                                               " due to " + e.getMessage(), e);
+        }
+    }
+
+    public CoordinatedWriteConf(Map<String, ? extends ClusterConfProvider> 
clusters)
+    {
+        this.clusters = Collections.unmodifiableMap(clusters);
+    }
+
+    public Map<String, ClusterConfProvider> clusters()
+    {
+        return clusters;
+    }
+
+    @Nullable
+    public ClusterConfProvider cluster(String clusterId)
+    {
+        return clusters.get(clusterId);
+    }
+
+    public <T extends ClusterConfProvider> Map<String, T> clustersOf(Class<T> 
clusterConfType)
+    {
+        // verify that map type can cast; there are only limited number of 
values and check is cheap
+        clusters.values().forEach(v -> 
Preconditions.checkState(clusterConfType.isInstance(v),
+                                                                
"ClusterConfProvider value is not instance of " + clusterConfType));
+        return (Map<String, T>) clusters;
+    }
+
+    public String toJson() throws JsonProcessingException
+    {
+        return OBJECT_MAPPER.writeValueAsString(clusters);
+    }
+
+    public interface ClusterConfProvider

Review Comment:
   `ClusterConfProvider` feels a lot like this should just be a functional 
interface that provides a ClusterConf object via a `get` method (similar to 
`Supplier`s). Isn't this just `ClusterConf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to