This is an automated email from the ASF dual-hosted git repository. jyothsnakonisa pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
commit 27ea91cdb5990dbf2df19943a1eeacb731f041b5 Author: jkonisa <[email protected]> AuthorDate: Fri Apr 3 08:24:40 2026 -0700 CASSSIDECAR-458: Add SidecarReplicationFactorSupplier for live replication factor lookups Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-458 --- CHANGES.txt | 1 + .../apache/cassandra/sidecar/cdc/CdcManager.java | 4 + .../cdc/SidecarReplicationFactorSupplier.java | 73 ++++++++++++++ .../cdc/SidecarReplicationFactorSupplierTest.java | 107 +++++++++++++++++++++ 4 files changed, 185 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index bcba44a6..d1a25028 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Add SidecarReplicationFactorSupplier for live replication factor lookups (CASSSIDECAR-458) * Fix CDC message loss during Sidecar restarts (CASSSIDECAR-457) * Fix CDC resource leaks: thread leaks, Kafka resource cleanup, singleton SidecarCdcClient (CASSSIDECAR-456) * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index ef7802ac..76889ca4 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -33,6 +33,7 @@ import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; +import org.apache.cassandra.cdc.sidecar.ReplicationFactorSupplier; import org.apache.cassandra.cdc.sidecar.SidecarCdc; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.sidecar.SidecarCdcStats; @@ -83,6 +84,7 @@ public class CdcManager private final SidecarCdcClient sidecarCdcClient; private final ICdcStats cdcStats; private List<CdcConsumerEntry> entries = new ArrayList<>(); + private final ReplicationFactorSupplier rfSupplier; private final CdcOptions cdcOptions; private final AsyncExecutor asyncExecutor; private final StateSidecarCdcCassandraClient cassandraClient; @@ -111,6 +113,7 @@ public class CdcManager this.cdcOptions = cdcOptions; this.asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool); this.cassandraClient = new StateSidecarCdcCassandraClient(cdcDatabaseAccessor); + this.rfSupplier = new SidecarReplicationFactorSupplier(cdcOptions, schemaSupplier); } List<CdcConsumerEntry> buildCdcConsumers() @@ -206,6 +209,7 @@ public class CdcManager sidecarCdcClient, cdcStats) .withExecutor(asyncExecutor) + .withReplicationFactorSupplier(rfSupplier) .withSidecarStatePersister(persister) .build(); return new CdcConsumerEntry(consumer, persister); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java new file mode 100644 index 00000000..a2b72232 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import java.util.Comparator; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cdc.api.CdcOptions; +import org.apache.cassandra.cdc.api.SchemaSupplier; +import org.apache.cassandra.cdc.sidecar.ReplicationFactorSupplier; +import org.apache.cassandra.cdc.sidecar.SidecarCommitLogProvider; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.utils.FutureUtils; + +/** + * {@link ReplicationFactorSupplier} implementation that reads the actual replication factor + * from Cassandra cluster metadata via {@link CdcOptions}, rather than using the default RF=1 + * SimpleStrategy fallback. Used by {@link SidecarCommitLogProvider} to build a correctly + * replicated {@link org.apache.cassandra.spark.data.partitioner.CassandraRing}. + */ +public class SidecarReplicationFactorSupplier implements ReplicationFactorSupplier +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SidecarReplicationFactorSupplier.class); + private final CdcOptions cdcOptions; + private final SchemaSupplier schemaSupplier; + + public SidecarReplicationFactorSupplier(CdcOptions cdcOptions, SchemaSupplier schemaSupplier) + { + this.cdcOptions = cdcOptions; + this.schemaSupplier = schemaSupplier; + } + + @Override + public ReplicationFactor getReplicationFactor(String keyspace) + { + return cdcOptions.replicationFactor(keyspace); + } + + @Override + public ReplicationFactor getMaximalReplicationFactor() + { + String dc = cdcOptions.dc(); + Set<CqlTable> tables = FutureUtils.get(schemaSupplier.getCdcEnabledTables()); + return tables.stream() + .map(CqlTable::replicationFactor) + .filter(rf -> rf.getOptions().containsKey(dc)) + .max(Comparator.comparingInt(rf -> rf.getOptions().get(dc))) + .orElseGet(() -> { + LOGGER.warn("No CDC-enabled tables found for DC '{}'; falling back to RF=3 SimpleStrategy", dc); + return ReplicationFactor.simpleStrategy(3); + }); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java new file mode 100644 index 00000000..1872c670 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplierTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.cdc.api.CdcOptions; +import org.apache.cassandra.cdc.api.SchemaSupplier; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit tests for {@link SidecarReplicationFactorSupplier}. */ +public class SidecarReplicationFactorSupplierTest +{ + private static final String DC = "dc1"; + + private CdcOptions cdcOptions; + private SchemaSupplier schemaSupplier; + private SidecarReplicationFactorSupplier supplier; + + @BeforeEach + void setup() + { + cdcOptions = mock(CdcOptions.class); + schemaSupplier = mock(SchemaSupplier.class); + when(cdcOptions.dc()).thenReturn(DC); + supplier = new SidecarReplicationFactorSupplier(cdcOptions, schemaSupplier); + } + + @Test + void getReplicationFactorDelegatesToCdcOptions() + { + ReplicationFactor rf = ReplicationFactor.simpleStrategy(3); + when(cdcOptions.replicationFactor("ks1")).thenReturn(rf); + + assertThat(supplier.getReplicationFactor("ks1")).isSameAs(rf); + verify(cdcOptions).replicationFactor("ks1"); + } + + @Test + void getMaximalReplicationFactorReturnsHighestRfForDc() + { + CqlTable t1 = tableWithNtsRf(Map.of(DC, 2)); + CqlTable t2 = tableWithNtsRf(Map.of(DC, 5)); + CqlTable t3 = tableWithNtsRf(Map.of(DC, 3)); + when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of(t1, t2, t3))); + + ReplicationFactor result = supplier.getMaximalReplicationFactor(); + + assertThat(result.getOptions().get(DC)).isEqualTo(5); + } + + @Test + void getMaximalReplicationFactorFallsBackToSimpleStrategy1WhenNoDcMatch() + { + CqlTable table = tableWithNtsRf(Map.of("other_dc", 5)); + when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of(table))); + + ReplicationFactor result = supplier.getMaximalReplicationFactor(); + + assertThat(result).isEqualTo(ReplicationFactor.simpleStrategy(3)); + } + + @Test + void getMaximalReplicationFactorFallsBackToSimpleStrategy1WhenNoTablesPresent() + { + when(schemaSupplier.getCdcEnabledTables()).thenReturn(CompletableFuture.completedFuture(Set.of())); + + ReplicationFactor result = supplier.getMaximalReplicationFactor(); + + assertThat(result).isEqualTo(ReplicationFactor.simpleStrategy(3)); + } + + private static CqlTable tableWithNtsRf(Map<String, Integer> dcOptions) + { + CqlTable table = mock(CqlTable.class); + when(table.replicationFactor()).thenReturn( + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, dcOptions)); + return table; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
