skoppu22 commented on code in PR #169:
URL: 
https://github.com/apache/cassandra-analytics/pull/169#discussion_r2989688531


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java:
##########
@@ -395,4 +366,61 @@ Map<String, ClusterInfo> clusterInfoByIdUnsafe()
     {
         return clusterInfoById;
     }
+
+    /**
+     * Retrieves the lowest Cassandra version from all clusters.
+     *
+     * @param conf Bulk Spark configuration
+     * @return lowest Cassandra version string across all clusters
+     */
+    public static String getLowestCassandraVersion(BulkSparkConf conf)
+    {
+        CoordinatedWriteConf coordinatedWriteConf = 
conf.coordinatedWriteConf();
+        Preconditions.checkArgument(coordinatedWriteConf != null,
+                                    "CoordinatedWriteConf is required for 
multi-cluster operations");
+
+        Map<String, String> clusterVersions = new HashMap<>();
+        for (String clusterId : coordinatedWriteConf.clusters().keySet())
+        {
+            String version = 
CassandraClusterInfo.getLowestCassandraVersion(conf, clusterId);
+            clusterVersions.put(clusterId, version);
+        }
+
+        // Find the lowest version across all clusters
+        List<CassandraVersionFeatures> versions = clusterVersions.values()
+                                                                 .stream()
+                                                                 
.map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion)
+                                                                 .sorted()
+                                                                 
.collect(Collectors.toList());
+
+        CassandraVersionFeatures first = versions.get(0);
+        CassandraVersionFeatures last = versions.get(versions.size() - 1);
+        Preconditions.checkState(first.getMajorVersion() == 
last.getMajorVersion(),
+                                 "Cluster versions are not compatible. 
lowest=%s and highest=%s",
+                                 first.getRawVersionString(), 
last.getRawVersionString());
+
+        return first.getRawVersionString();
+    }
+
+    /**
+     * Retrieves aggregated SSTable versions from all clusters.
+     *
+     * @param conf Bulk Spark configuration
+     * @return set of SSTable versions present across all clusters
+     */
+    public static Set<String> getSSTableVersionsOnCluster(BulkSparkConf conf)
+    {
+        CoordinatedWriteConf coordinatedWriteConf = 
conf.coordinatedWriteConf();
+        Preconditions.checkArgument(coordinatedWriteConf != null,
+                                    "CoordinatedWriteConf is required for 
multi-cluster operations");
+
+        Set<String> aggregatedSSTableVersions = new HashSet<>();
+        for (String clusterId : coordinatedWriteConf.clusters().keySet())
+        {
+            Set<String> sstableVersions = 
CassandraClusterInfo.getSSTableVersionsOnCluster(conf, clusterId);
+            aggregatedSSTableVersions.addAll(sstableVersions);

Review Comment:
   We do that above in getLowestCassandraVersion function, line 393 sorts 
versions, line 396 picks the lowest and line 397 picks the highest.
   Whereas this function is getSSTableVersionsOnCluster, to get all sstable 
versions on the cluster. Then bridge determination logic sorts them, picks 
highest version and determines bridge version accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to