sarankk commented on code in PR #169:
URL: 
https://github.com/apache/cassandra-analytics/pull/169#discussion_r2825858928


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java:
##########
@@ -35,22 +37,49 @@
  */
 public enum CassandraVersion
 {
-    THREEZERO(30, "3.0", "three-zero", "big"),
-    FOURZERO(40, "4.0", "four-zero", "big"),
-    FOURONE(41, "4.1", "four-zero", "big"),
-    FIVEZERO(50, "5.0", "five-zero", "big", "bti");
+    THREEZERO(30, "3.0", "three-zero", new String[]{"big"},
+              new String[]{
+                  // Cassandra 3.x native sstable versions
+                  // order is important, used to determine the latest version

Review Comment:
   Array based ordering will be hard to debug in case of issues and hard to 
maintain as well. 



##########
cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/CassandraVersionTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for CassandraVersion SSTable version methods
+ */
+public class CassandraVersionTest
+{
+    @Test
+    void testFromSSTableVersionFourZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-na");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FOURZERO);
+
+        result = CassandraVersion.fromSSTableVersion("big-nb");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FOURZERO);
+    }
+
+    @Test
+    void testFromSSTableVersionFiveZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-oa");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FIVEZERO);
+
+        result = CassandraVersion.fromSSTableVersion("bti-da");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FIVEZERO);
+    }
+
+    @Test
+    void testFromSSTableVersionThreeZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-ma");

Review Comment:
   Could we make this parameterized test and test other SSTable versions in 3.0 
too?



##########
cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/CassandraVersionTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for CassandraVersion SSTable version methods
+ */
+public class CassandraVersionTest
+{
+    @Test
+    void testFromSSTableVersionFourZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-na");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FOURZERO);
+
+        result = CassandraVersion.fromSSTableVersion("big-nb");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FOURZERO);
+    }
+
+    @Test
+    void testFromSSTableVersionFiveZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-oa");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FIVEZERO);
+
+        result = CassandraVersion.fromSSTableVersion("bti-da");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.FIVEZERO);
+    }
+
+    @Test
+    void testFromSSTableVersionThreeZeroNativeVersions()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("big-ma");
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(CassandraVersion.THREEZERO);
+    }
+
+    @Test
+    void testFromSSTableVersionUnknownVersion()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion("unknown-xx");
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testFromSSTableVersionNullVersion()
+    {
+        Optional<CassandraVersion> result = 
CassandraVersion.fromSSTableVersion(null);
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testGetSupportedSStableVersionsForReadFourZero()
+    {
+        Set<String> supported = 
CassandraVersion.FOURZERO.getSupportedSStableVersionsForRead();
+
+        // C* 4.0 can read its own versions
+        assertThat(supported).contains("big-na", "big-nb");
+
+        // C* 4.0 can read C* 3.0 versions (previous major version family)
+        assertThat(supported).contains("big-ma", "big-mb", "big-mc", "big-md", 
"big-me", "big-mf");
+
+        // C* 4.0 cannot read C* 5.0 versions
+        assertThat(supported).doesNotContain("big-oa", "bti-da");
+    }
+
+    @Test
+    void testGetSupportedSStableVersionsForReadFiveZero()
+    {
+        Set<String> supported = 
CassandraVersion.FIVEZERO.getSupportedSStableVersionsForRead();
+
+        // C* 5.0 can read its own versions
+        assertThat(supported).contains("big-oa", "bti-da");
+
+        // C* 5.0 can read C* 4.0 and 4.1 versions (previous major version 
family)
+        assertThat(supported).contains("big-na", "big-nb");
+
+        // C* 5.0 cannot read C* 3.0 versions (not in previous major version 
family)
+        assertThat(supported).doesNotContain("big-ma", "big-mb", "big-mc", 
"big-md", "big-me", "big-mf");
+    }
+
+    @Test
+    void testGetSupportedSStableVersionsForReadThreeZero()
+    {
+        Set<String> supported = 
CassandraVersion.THREEZERO.getSupportedSStableVersionsForRead();
+
+        // C* 3.0 can read its own versions
+        assertThat(supported).contains("big-ma", "big-mb", "big-mc", "big-md", 
"big-me", "big-mf");
+
+        // C* 3.0 cannot read C* 4.0+ versions
+        assertThat(supported).doesNotContain("big-na", "big-nb", "big-oa", 
"bti-da");
+
+        // C* 3.0's previous major version (2.x) is not defined, so only its 
own versions are readable
+        assertThat(supported).hasSize(6);  // Only the 6 native 3.0 versions
+    }
+
+    @Test
+    void testGetSupportedSStableVersionsForReadFourOne()
+    {
+        Set<String> supported = 
CassandraVersion.FOURONE.getSupportedSStableVersionsForRead();
+
+        // C* 4.1 has no native SSTable versions of its own, but can read C* 
4.0 and C* 3.0 versions
+        // C* 4.1 can read C* 4.0 versions (same major version family)
+        assertThat(supported).contains("big-na", "big-nb");
+
+        // C* 4.1 can read C* 3.0 versions (previous major version family)
+        assertThat(supported).contains("big-ma", "big-mb", "big-mc", "big-md", 
"big-me", "big-mf");
+
+        // C* 4.1 cannot read C* 5.0 versions
+        assertThat(supported).doesNotContain("big-oa", "bti-da");
+    }
+
+    @Test
+    void testGetNativeSStableVersionsFourZero()
+    {
+        List<String> nativeVersions = 
CassandraVersion.FOURZERO.getNativeSStableVersions();
+        assertThat(nativeVersions).containsExactlyInAnyOrder("big-na", 
"big-nb");
+    }
+
+    @Test
+    void testGetNativeSStableVersionsFiveZero()
+    {
+        List<String> nativeVersions = 
CassandraVersion.FIVEZERO.getNativeSStableVersions();
+        assertThat(nativeVersions).containsExactlyInAnyOrder("big-oa", 
"bti-da");
+    }
+
+    @Test
+    void testGetNativeSStableVersionsFourOneEmpty()
+    {
+        // C* 4.1 did not introduce new native SSTable versions
+        List<String> nativeVersions = 
CassandraVersion.FOURONE.getNativeSStableVersions();
+        assertThat(nativeVersions).isEmpty();
+    }
+
+    @Test
+    void testGetSSTableVersionIndexValidVersion()
+    {
+        int index = CassandraVersion.FOURZERO.getSSTableVersionIndex("big-na");
+        assertThat(index).isEqualTo(0);

Review Comment:
   We could avoid the index checks if we treat the SSTable versions within a 
Cassandra version on the same level. 



##########
cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/SSTableVersionAnalyzerTest.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for SSTableVersionAnalyzer
+ */
+public class SSTableVersionAnalyzerTest
+{
+    @Test
+    void testDetermineBridgeVersionForWriteFallbackDisabledSingleVersion()

Review Comment:
   Nit: Could we make these tests parameterized, passing in expected result as 
well? 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/SSTableVersionAnalyzer.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes SSTable versions on a cluster to determine the appropriate
+ * Cassandra bridge to load for bulk write operations.
+ *
+ * <p>This class provides logic to select Cassandra bridge based on the 
highest SSTable
+ * version detected on the cluster and the user's requested format 
preference.</p>
+ */
+public final class SSTableVersionAnalyzer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableVersionAnalyzer.class);
+
+    private SSTableVersionAnalyzer()
+    {
+        // Utility class
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load based on:
+     * - Highest SSTable version detected on cluster
+     * - User's format preference
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param requestedFormat User's requested format, example: "big" or "bti"
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if format is invalid, or cluster 
doesn't support requested format,
+     *                                  or SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForWrite(Set<String> 
sstableVersionsOnCluster,
+                                                                  String 
requestedFormat,
+                                                                  String 
cassandraVersion,
+                                                                  boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion highestCassandraVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        // Check if highestCassandraVersion supports the requested format
+        boolean supportsRequestedFormat = 
highestCassandraVersion.getNativeSStableVersions()
+            .stream()
+            .anyMatch(v -> v.startsWith(requestedFormat + "-"));
+
+        if (supportsRequestedFormat)
+        {
+            return highestCassandraVersion;
+        }
+        else
+        {
+            throw new IllegalArgumentException(
+                String.format("Cluster does not support requested SSTable 
format '%s'. " +
+                              "Bridge version determined is %s, which only 
supports formats: %s",
+                    requestedFormat, highestCassandraVersion.versionName(),
+                    highestCassandraVersion.sstableFormats()));
+        }
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load for read operations 
based on:
+     * - Highest SSTable version detected on cluster
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForRead(Set<String> 
sstableVersionsOnCluster,
+                                                                 String 
cassandraVersion,
+                                                                 boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion bridgeVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        LOGGER.debug("Determined bridge version {} for read based on SSTable 
versions on cluster: {}",
+                     bridgeVersion.versionName(), sstableVersionsOnCluster);
+
+        return bridgeVersion;
+    }
+
+    /**
+     * Ensures that SSTable versions from cluster are not null or empty.
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions to validate
+     * @throws IllegalStateException if versions are null or empty
+     */
+    private static void ensureSSTableVersionsNotEmpty(Set<String> 
sstableVersionsOnCluster)
+    {
+        if (sstableVersionsOnCluster == null || 
sstableVersionsOnCluster.isEmpty())
+        {
+            throw new IllegalStateException(String.format(
+                "Unable to retrieve SSTable versions from cluster. " +

Review Comment:
   Why not support fallback here and return Cassandra version based bridge?



##########
cassandra-analytics-sidecar-client/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##########
@@ -149,6 +153,79 @@ public static List<CompletableFuture<NodeSettings>> 
allNodeSettings(SidecarClien
                         .collect(Collectors.toList());
     }
 
+    /**
+     * Retrieve gossip info from all nodes on the cluster
+     * @param client Sidecar client
+     * @param instances all Sidecar instances
+     * @return completable futures with GossipInfoResponse
+     */
+    public static List<CompletableFuture<GossipInfoResponse>> 
gossipInfoFromAllNodes(SidecarClient client,
+                                                                               
      Set<SidecarInstance> instances)
+    {
+        return instances.stream()
+                .map(instance -> client

Review Comment:
   Nit: alignment



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/SSTableVersionAnalyzer.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes SSTable versions on a cluster to determine the appropriate
+ * Cassandra bridge to load for bulk write operations.

Review Comment:
   We are using version analyzer for both read and write
   ```suggestion
    * Cassandra bridge to load for bulk read/write operations.
   ```



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java:
##########
@@ -35,22 +37,49 @@
  */
 public enum CassandraVersion
 {
-    THREEZERO(30, "3.0", "three-zero", "big"),
-    FOURZERO(40, "4.0", "four-zero", "big"),
-    FOURONE(41, "4.1", "four-zero", "big"),
-    FIVEZERO(50, "5.0", "five-zero", "big", "bti");
+    THREEZERO(30, "3.0", "three-zero", new String[]{"big"},
+              new String[]{
+                  // Cassandra 3.x native sstable versions
+                  // order is important, used to determine the latest version
+                  "big-ma",
+                  "big-mb",
+                  "big-mc",
+                  "big-md",
+                  "big-me",
+                  "big-mf"
+              }),
+    FOURZERO(40, "4.0", "four-zero", new String[]{"big"},
+             new String[]{
+                 // Cassandra 4.0 native sstable versions
+                 // order is important, used to determine the latest version
+                 "big-na",
+                 "big-nb",
+             }),
+    FOURONE(41, "4.1", "four-zero", new String[]{"big"},
+            new String[]{
+                // Cassandra 4.1 did not introduce new native SSTable versions
+            }),
+    FIVEZERO(50, "5.0", "five-zero", new String[]{"big", "bti"},
+             new String[]{
+                 // Cassandra 5.0 native sstable versions
+                 "big-oa",
+                 "bti-da",
+             });
 
     private final int number;
     private final String name;
     private final String jarBaseName;  // Must match shadowJar.archiveFileName 
from Gradle configuration (without extension)
     private final Set<String> sstableFormats;
+    private final List<String> nativeSStableVersions;  // Preserves array 
order for version comparison
 
-    CassandraVersion(int number, String name, String jarBaseName, String... 
sstableFormats)
+
+    CassandraVersion(int number, String name, String jarBaseName, String[] 
sstableFormats, String[] nativeSStableVersions)
     {
         this.number = number;
         this.name = name;
         this.jarBaseName = jarBaseName;
         this.sstableFormats = new HashSet<>(Arrays.asList(sstableFormats));
+        this.nativeSStableVersions = 
Collections.unmodifiableList(Arrays.asList(nativeSStableVersions));

Review Comment:
   Nit
   ```suggestion
           this.nativeSStableVersions = List.of(nativeSStableVersions);
   ```



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/SSTableVersionAnalyzer.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes SSTable versions on a cluster to determine the appropriate
+ * Cassandra bridge to load for bulk write operations.
+ *
+ * <p>This class provides logic to select Cassandra bridge based on the 
highest SSTable
+ * version detected on the cluster and the user's requested format 
preference.</p>
+ */
+public final class SSTableVersionAnalyzer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableVersionAnalyzer.class);
+
+    private SSTableVersionAnalyzer()
+    {
+        // Utility class
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load based on:
+     * - Highest SSTable version detected on cluster
+     * - User's format preference
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param requestedFormat User's requested format, example: "big" or "bti"
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if format is invalid, or cluster 
doesn't support requested format,
+     *                                  or SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForWrite(Set<String> 
sstableVersionsOnCluster,
+                                                                  String 
requestedFormat,
+                                                                  String 
cassandraVersion,
+                                                                  boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)

Review Comment:
   Nit: this conf check is duplicated both in read and write path, could we 
extract it out? 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/SSTableVersionAnalyzer.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes SSTable versions on a cluster to determine the appropriate
+ * Cassandra bridge to load for bulk write operations.
+ *
+ * <p>This class provides logic to select Cassandra bridge based on the 
highest SSTable
+ * version detected on the cluster and the user's requested format 
preference.</p>
+ */
+public final class SSTableVersionAnalyzer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableVersionAnalyzer.class);
+
+    private SSTableVersionAnalyzer()
+    {
+        // Utility class
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load based on:
+     * - Highest SSTable version detected on cluster
+     * - User's format preference
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param requestedFormat User's requested format, example: "big" or "bti"
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if format is invalid, or cluster 
doesn't support requested format,
+     *                                  or SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForWrite(Set<String> 
sstableVersionsOnCluster,
+                                                                  String 
requestedFormat,
+                                                                  String 
cassandraVersion,
+                                                                  boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion highestCassandraVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        // Check if highestCassandraVersion supports the requested format
+        boolean supportsRequestedFormat = 
highestCassandraVersion.getNativeSStableVersions()
+            .stream()
+            .anyMatch(v -> v.startsWith(requestedFormat + "-"));
+
+        if (supportsRequestedFormat)
+        {
+            return highestCassandraVersion;
+        }
+        else
+        {
+            throw new IllegalArgumentException(
+                String.format("Cluster does not support requested SSTable 
format '%s'. " +
+                              "Bridge version determined is %s, which only 
supports formats: %s",
+                    requestedFormat, highestCassandraVersion.versionName(),
+                    highestCassandraVersion.sstableFormats()));
+        }
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load for read operations 
based on:
+     * - Highest SSTable version detected on cluster
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForRead(Set<String> 
sstableVersionsOnCluster,
+                                                                 String 
cassandraVersion,
+                                                                 boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion bridgeVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        LOGGER.debug("Determined bridge version {} for read based on SSTable 
versions on cluster: {}",
+                     bridgeVersion.versionName(), sstableVersionsOnCluster);
+
+        return bridgeVersion;
+    }
+
+    /**
+     * Ensures that SSTable versions from cluster are not null or empty.
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions to validate
+     * @throws IllegalStateException if versions are null or empty
+     */
+    private static void ensureSSTableVersionsNotEmpty(Set<String> 
sstableVersionsOnCluster)
+    {
+        if (sstableVersionsOnCluster == null || 
sstableVersionsOnCluster.isEmpty())
+        {
+            throw new IllegalStateException(String.format(
+                "Unable to retrieve SSTable versions from cluster. " +
+                "This is required for SSTable version-based bridge selection. 
" +
+                "If you want to bypass this check and use cassandra.version 
for bridge selection, " +
+                "set %s=true", 
"spark.cassandra_analytics.bridge.disable_sstable_version_based"));
+        }
+    }
+
+    /**
+     * Finds the highest Cassandra version based on SSTable versions found on 
cluster.
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on cluster
+     * @return CassandraVersion corresponding to the highest SSTable version
+     * @throws IllegalArgumentException if highest version is unknown
+     */
+    private static CassandraVersion findHighestCassandraVersion(Set<String> 
sstableVersionsOnCluster)
+    {
+        String highestSSTableVersion = 
findHighestSSTableVersion(sstableVersionsOnCluster);
+        return CassandraVersion.fromSSTableVersion(highestSSTableVersion)
+                               .orElseThrow(() -> new IllegalArgumentException(
+                               String.format("Unknown SSTable version: %s. 
Cannot determine bridge version. " +
+                                             "SSTable versions on cluster: %s",
+                                             highestSSTableVersion, 
sstableVersionsOnCluster)));
+    }
+
+    /**
+     * Finds the highest SSTable version from the set using CassandraVersion 
mappings.
+     * Ordering is based on:
+     * 1. CassandraVersion number
+     * 2. Within same CassandraVersion, the order defined in 
nativeSStableVersions array
+     *
+     * @param versions Set of SSTable version strings
+     * @return Highest SSTable version string
+     * @throws IllegalArgumentException if versions is empty, contains null 
values, or contains unknown versions
+     */
+    public static String findHighestSSTableVersion(Set<String> versions)
+    {
+        if (versions == null || versions.isEmpty())
+        {
+            throw new IllegalArgumentException("SSTable versions set cannot be 
empty");
+        }
+
+        Comparator<String> sstableVersionComparator = (v1, v2) -> {
+            // Find which CassandraVersion each SSTable version belongs to
+            Optional<CassandraVersion> v1Opt = 
CassandraVersion.fromSSTableVersion(v1);
+            Optional<CassandraVersion> v2Opt = 
CassandraVersion.fromSSTableVersion(v2);
+
+            if (!v1Opt.isPresent())
+            {
+                throw new IllegalArgumentException(
+                    String.format("Unknown SSTable version: %s. Cannot 
determine Cassandra version.", v1));
+            }
+
+            if (!v2Opt.isPresent())
+            {
+                throw new IllegalArgumentException(
+                    String.format("Unknown SSTable version: %s. Cannot 
determine Cassandra version.", v2));

Review Comment:
   Instead of throwing even if one format is incorrect, we could log these as 
errors and throw in the end if no max version is found. 



##########
cassandra-analytics-sidecar-client/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##########
@@ -149,6 +153,79 @@ public static List<CompletableFuture<NodeSettings>> 
allNodeSettings(SidecarClien
                         .collect(Collectors.toList());
     }
 
+    /**
+     * Retrieve gossip info from all nodes on the cluster
+     * @param client Sidecar client
+     * @param instances all Sidecar instances
+     * @return completable futures with GossipInfoResponse
+     */
+    public static List<CompletableFuture<GossipInfoResponse>> 
gossipInfoFromAllNodes(SidecarClient client,

Review Comment:
   Not for addressing in this PR, but would be good to have a cluster wide 
gossip call available in Sidecar. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java:
##########
@@ -68,36 +97,126 @@ public String jarBaseName()
         return jarBaseName;
     }
 
-    private static final String sstableFormat;
+    /**
+     * Get the set of SSTable formats supported by this Cassandra version.
+     *
+     * @return Set of supported SSTable format strings
+     */
+    public Set<String> sstableFormats()
+    {
+        return sstableFormats;
+    }
+
+    /**
+     * Get the ordered list of native SSTable version strings for this 
Cassandra version.
+     * The order matches the definition in the enum and represents version 
progression.
+     * For example, in FOURZERO: ["big-na", "big-nb"], big-nb is considered 
newer/higher.
+     *
+     * @return Ordered list of native SSTable version strings
+     */
+    public List<String> getNativeSStableVersions()
+    {
+        return nativeSStableVersions;
+    }
+
+    /**
+     * Get the index/position of an SSTable version within this Cassandra 
version's native versions.
+     * This can be used for version comparison while sorting sstable versions 
from oldest to latest order.
+     *
+     * @param sstableVersion The SSTable version string to find
+     * @return Index of the version (0-based), or -1 if not found
+     */
+    public int getSSTableVersionIndex(String sstableVersion)
+    {
+        return nativeSStableVersions.indexOf(sstableVersion);
+    }
+
+    /**
+     * Get the set of SSTable version strings that this Cassandra version can 
read.
+     * This includes:
+     * - Native versions for this Cassandra version
+     * - All SSTable versions from the previous major version (including all 
minor versions)
+     * For example, Cassandra 5.0 can read:
+     * - 5.0 native versions (big-oa, bti-da)
+     * - 4.0 versions (big-na, big-nb)
+     * - 4.1 versions (if any)
+     * But NOT 3.0 versions
+     *
+     * @return Set of full SSTable version strings that can be read
+     */
+    public Set<String> getSupportedSStableVersionsForRead()
+    {
+        Set<String> readableVersions = new 
HashSet<>(this.nativeSStableVersions);
+
+        int previousMajor = getPreviousMajorVersion();
+
+        // Add all SSTable versions from the previous major version and its 
minors
+        // E.g., C* 5.0 (version 50) can read C* 4.0 (40) and C* 4.1 (41) 
SSTables, but not C* 3.x (30)
+        for (CassandraVersion version : CassandraVersion.values())
+        {
+            // Include versions from the previous major version family (e.g., 
40-49 for C* 5.0)
+            if (version.versionNumber() >= previousMajor && 
version.versionNumber() < this.number)
+            {
+                readableVersions.addAll(version.nativeSStableVersions);
+            }
+        }
+
+        return Collections.unmodifiableSet(readableVersions);
+    }
+
+    /**
+     * Get the previous major version number for this Cassandra version.
+     * Calculates dynamically using: (majorVersion - 1) * 10
+     * For example:
+     * - C5.0 (50) returns 40 (C4.x)
+     * - C4.1 (41) returns 30 (C3.x)
+     * - C4.0 (40) returns 30 (C3.x)
+     * - C3.0 (30) returns 20 (C2.x - which doesn't exist)
+     * - C10.0 (100) returns 90 (C9.x)
+     *
+     * @return previous major version number
+     */
+    @VisibleForTesting
+    int getPreviousMajorVersion()
+    {
+        // Get major version: 50 -> 5, 41 -> 4, 40 -> 4, 30 -> 3
+        int majorVersion = this.number / 10;
+
+        // Calculate previous major version: (majorVersion - 1) * 10
+        // E.g., 5 -> 40, 4 -> 30, 3 -> 20
+        return (majorVersion - 1) * 10;

Review Comment:
   Nit: How about we keep an ordered list of `CassandraVersion` enums and 
return the previous one, instead of computing the version. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java:
##########
@@ -35,22 +37,49 @@
  */
 public enum CassandraVersion
 {
-    THREEZERO(30, "3.0", "three-zero", "big"),
-    FOURZERO(40, "4.0", "four-zero", "big"),
-    FOURONE(41, "4.1", "four-zero", "big"),
-    FIVEZERO(50, "5.0", "five-zero", "big", "bti");
+    THREEZERO(30, "3.0", "three-zero", new String[]{"big"},
+              new String[]{
+                  // Cassandra 3.x native sstable versions
+                  // order is important, used to determine the latest version

Review Comment:
   After reading `SSTableVersionAnalyzer` we could do away without the internal 
ordering between the sstable versions. Since we are still interested in the 
associated cassandra version. We could treat the sstable versions within the 
Cassandra version on same level. Wdyt? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to