This is an automated email from the ASF dual-hosted git repository.

jyothsnakonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git

commit 27ea91cdb5990dbf2df19943a1eeacb731f041b5
Author: jkonisa <[email protected]>
AuthorDate: Fri Apr 3 08:24:40 2026 -0700

    CASSSIDECAR-458: Add SidecarReplicationFactorSupplier for live replication 
factor lookups
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-458
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/sidecar/cdc/CdcManager.java   |   4 +
 .../cdc/SidecarReplicationFactorSupplier.java      |  73 ++++++++++++++
 .../cdc/SidecarReplicationFactorSupplierTest.java  | 107 +++++++++++++++++++++
 4 files changed, 185 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index bcba44a6..d1a25028 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Add SidecarReplicationFactorSupplier for live replication factor lookups 
(CASSSIDECAR-458)
  * Fix CDC message loss during Sidecar restarts (CASSSIDECAR-457)
  * Fix CDC resource leaks: thread leaks, Kafka resource cleanup, singleton 
SidecarCdcClient (CASSSIDECAR-456)
  * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
index ef7802ac..76889ca4 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
 import org.apache.cassandra.cdc.api.TokenRangeSupplier;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.ReplicationFactorSupplier;
 import org.apache.cassandra.cdc.sidecar.SidecarCdc;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcStats;
@@ -83,6 +84,7 @@ public class CdcManager
     private final SidecarCdcClient sidecarCdcClient;
     private final ICdcStats cdcStats;
     private List<CdcConsumerEntry> entries = new ArrayList<>();
+    private final ReplicationFactorSupplier rfSupplier;
     private final CdcOptions cdcOptions;
     private final AsyncExecutor asyncExecutor;
     private final StateSidecarCdcCassandraClient cassandraClient;
@@ -111,6 +113,7 @@ public class CdcManager
         this.cdcOptions = cdcOptions;
         this.asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool);
         this.cassandraClient = new 
StateSidecarCdcCassandraClient(cdcDatabaseAccessor);
+        this.rfSupplier = new SidecarReplicationFactorSupplier(cdcOptions, 
schemaSupplier);
     }
 
     List<CdcConsumerEntry> buildCdcConsumers()
@@ -206,6 +209,7 @@ public class CdcManager
                                                   sidecarCdcClient,
                                                   cdcStats)
                                          .withExecutor(asyncExecutor)
+                                         
.withReplicationFactorSupplier(rfSupplier)
                                          .withSidecarStatePersister(persister)
                                          .build();
         return new CdcConsumerEntry(consumer, persister);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java
new file mode 100644
index 00000000..a2b72232
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Comparator;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.sidecar.ReplicationFactorSupplier;
+import org.apache.cassandra.cdc.sidecar.SidecarCommitLogProvider;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.utils.FutureUtils;
+
+/**
+ * {@link ReplicationFactorSupplier} implementation that reads the actual 
replication factor
+ * from Cassandra cluster metadata via {@link CdcOptions}, rather than using 
the default RF=1
+ * SimpleStrategy fallback. Used by {@link SidecarCommitLogProvider} to build 
a correctly
+ * replicated {@link 
org.apache.cassandra.spark.data.partitioner.CassandraRing}.
+ */
+public class SidecarReplicationFactorSupplier implements 
ReplicationFactorSupplier
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarReplicationFactorSupplier.class);
+    private final CdcOptions cdcOptions;
+    private final SchemaSupplier schemaSupplier;
+
+    public SidecarReplicationFactorSupplier(CdcOptions cdcOptions, 
SchemaSupplier schemaSupplier)
+    {
+        this.cdcOptions = cdcOptions;
+        this.schemaSupplier = schemaSupplier;
+    }
+
+    @Override
+    public ReplicationFactor getReplicationFactor(String keyspace)
+    {
+        return cdcOptions.replicationFactor(keyspace);
+    }
+
+    @Override
+    public ReplicationFactor getMaximalReplicationFactor()
+    {
+        String dc = cdcOptions.dc();
+        Set<CqlTable> tables = 
FutureUtils.get(schemaSupplier.getCdcEnabledTables());
+        return tables.stream()
+                     .map(CqlTable::replicationFactor)
+                     .filter(rf -> rf.getOptions().containsKey(dc))
+                     .max(Comparator.comparingInt(rf -> 
rf.getOptions().get(dc)))
+                     .orElseGet(() -> {
+                         LOGGER.warn("No CDC-enabled tables found for DC '{}'; 
falling back to RF=3 SimpleStrategy", dc);
+                         return ReplicationFactor.simpleStrategy(3);
+                     });
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java
new file mode 100644
index 00000000..1872c670
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit tests for {@link SidecarReplicationFactorSupplier}. */
+public class SidecarReplicationFactorSupplierTest
+{
+    private static final String DC = "dc1";
+
+    private CdcOptions cdcOptions;
+    private SchemaSupplier schemaSupplier;
+    private SidecarReplicationFactorSupplier supplier;
+
+    @BeforeEach
+    void setup()
+    {
+        cdcOptions = mock(CdcOptions.class);
+        schemaSupplier = mock(SchemaSupplier.class);
+        when(cdcOptions.dc()).thenReturn(DC);
+        supplier = new SidecarReplicationFactorSupplier(cdcOptions, 
schemaSupplier);
+    }
+
+    @Test
+    void getReplicationFactorDelegatesToCdcOptions()
+    {
+        ReplicationFactor rf = ReplicationFactor.simpleStrategy(3);
+        when(cdcOptions.replicationFactor("ks1")).thenReturn(rf);
+
+        assertThat(supplier.getReplicationFactor("ks1")).isSameAs(rf);
+        verify(cdcOptions).replicationFactor("ks1");
+    }
+
+    @Test
+    void getMaximalReplicationFactorReturnsHighestRfForDc()
+    {
+        CqlTable t1 = tableWithNtsRf(Map.of(DC, 2));
+        CqlTable t2 = tableWithNtsRf(Map.of(DC, 5));
+        CqlTable t3 = tableWithNtsRf(Map.of(DC, 3));
+        
when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of(t1,
 t2, t3)));
+
+        ReplicationFactor result = supplier.getMaximalReplicationFactor();
+
+        assertThat(result.getOptions().get(DC)).isEqualTo(5);
+    }
+
+    @Test
+    void getMaximalReplicationFactorFallsBackToSimpleStrategy1WhenNoDcMatch()
+    {
+        CqlTable table = tableWithNtsRf(Map.of("other_dc", 5));
+        
when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of(table)));
+
+        ReplicationFactor result = supplier.getMaximalReplicationFactor();
+
+        assertThat(result).isEqualTo(ReplicationFactor.simpleStrategy(3));
+    }
+
+    @Test
+    void 
getMaximalReplicationFactorFallsBackToSimpleStrategy1WhenNoTablesPresent()
+    {
+        
when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of()));
+
+        ReplicationFactor result = supplier.getMaximalReplicationFactor();
+
+        assertThat(result).isEqualTo(ReplicationFactor.simpleStrategy(3));
+    }
+
+    private static CqlTable tableWithNtsRf(Map<String, Integer> dcOptions)
+    {
+        CqlTable table = mock(CqlTable.class);
+        when(table.replicationFactor()).thenReturn(
+            new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
 dcOptions));
+        return table;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to