This is an automated email from the ASF dual-hosted git repository. jyothsnakonisa pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
commit c2f352f2eac1b9b99adc0cf569370d6a8d2f35c8 Author: jkonisa <[email protected]> AuthorDate: Tue Mar 31 13:36:16 2026 -0700 CASSSIDECAR-456: Fix CDC resource leaks: thread leaks, Kafka resource cleanup, singleton SidecarCdcClient Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-456 --- CHANGES.txt | 1 + .../cassandra/sidecar/cdc/CdcConsumerEntry.java | 61 +++++ .../apache/cassandra/sidecar/cdc/CdcManager.java | 176 ++++++-------- .../sidecar/cdc/SidecarClientSecretsProvider.java | 65 ++++++ .../sidecar/cdc/CdcConsumerEntryTest.java | 73 ++++++ .../cassandra/sidecar/cdc/CdcManagerTest.java | 76 +++---- .../cdc/SidecarClientSecretsProviderTests.java | 252 +++++++++++++++++++++ 7 files changed, 556 insertions(+), 148 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 78c4e547..c2e668b0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Fix CDC resource leaks: thread leaks, Kafka resource cleanup, singleton SidecarCdcClient (CASSSIDECAR-456) * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455) * Support IAM instance profile credentials for S3 restore jobs (CASSSIDECAR-415) * Adding endpoint for verifying files post data copy during live migration (CASSSIDECAR-226) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java new file mode 100644 index 00000000..2e33982f --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import org.apache.cassandra.cdc.sidecar.SidecarCdc; +import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; + +/** + * Pairs a {@link SidecarCdc} consumer with its {@link SidecarStatePersister} so they are + * always stopped together in the correct order: consumer first (blocking), then persister flush. + */ +class CdcConsumerEntry +{ + private final SidecarCdc consumer; + private final SidecarStatePersister persister; + + CdcConsumerEntry(SidecarCdc consumer, SidecarStatePersister persister) + { + this.consumer = consumer; + this.persister = persister; + } + + SidecarCdc consumer() + { + return consumer; + } + + SidecarStatePersister persister() + { + return persister; + } + + void start() + { + persister.start(); + consumer.initSchema(); + consumer.start(); + } + + void stop() + { + consumer.stop(); // blocking — waits for any active run() to complete + persister.stop(true); // flush buffered state to Cassandra, then cancel timer + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index d5bb24e4..ef7802ac 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -18,7 +18,6 @@ package org.apache.cassandra.sidecar.cdc; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,14 +32,12 @@ import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdc; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.sidecar.SidecarCdcStats; import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.secrets.SecretsProvider; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; @@ -83,13 +80,12 @@ public class CdcManager private final EventConsumer eventConsumer; private final SchemaSupplier schemaSupplier; private final ClusterConfigProvider clusterConfigProvider; - private final CdcSidecarInstancesProvider sidecarInstancesProvider; - private final SecretsProvider secretsProvider; - private final SidecarCdcClient.ClientConfig clientConfig; + private final SidecarCdcClient sidecarCdcClient; private final ICdcStats cdcStats; - private List<SidecarCdc> consumers = new ArrayList<>(); - private final TaskExecutorPool taskExecutorPool; - private final CdcDatabaseAccessor cdcDatabaseAccessor; + private List<CdcConsumerEntry> entries = new ArrayList<>(); + private final CdcOptions cdcOptions; + private final AsyncExecutor asyncExecutor; + private final StateSidecarCdcCassandraClient cassandraClient; public CdcManager(EventConsumer eventConsumer, @@ -98,12 +94,11 @@ public class CdcManager RangeManager rangeManager, InstanceMetadataFetcher instanceFetcher, ClusterConfigProvider clusterConfigProvider, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SecretsProvider secretsProvider, - SidecarCdcClient.ClientConfig clientConfig, + SidecarCdcClient sidecarCdcClient, ICdcStats cdcStats, TaskExecutorPool taskExecutorPool, - CdcDatabaseAccessor cdcDatabaseAccessor) + CdcDatabaseAccessor cdcDatabaseAccessor, + CdcOptions cdcOptions) { this.eventConsumer = eventConsumer; this.schemaSupplier = schemaSupplier; @@ -111,15 +106,14 @@ public class CdcManager this.rangeManager = rangeManager; this.instanceFetcher = instanceFetcher; this.clusterConfigProvider = clusterConfigProvider; - this.sidecarInstancesProvider = sidecarInstancesProvider; - this.secretsProvider = secretsProvider; - this.clientConfig = clientConfig; + this.sidecarCdcClient = sidecarCdcClient; this.cdcStats = cdcStats; - this.taskExecutorPool = taskExecutorPool; - this.cdcDatabaseAccessor = cdcDatabaseAccessor; + this.cdcOptions = cdcOptions; + this.asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool); + this.cassandraClient = new StateSidecarCdcCassandraClient(cdcDatabaseAccessor); } - List<SidecarCdc> buildCdcConsumers() + List<CdcConsumerEntry> buildCdcConsumers() { Map<String, Set<TokenRange>> ownedRanges = rangeManager.ownedTokenRanges(); if (ownedRanges == null || ownedRanges.isEmpty()) @@ -127,10 +121,12 @@ public class CdcManager throw new IllegalStateException("No owned token ranges right now, cql session may still be initializing."); } - // NEW: Deduplicate by (instanceId, tokenRange) to prevent duplicate consumers - Map<String, SidecarCdc> uniqueConsumers = new HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum()); + // Deduplicate by (instanceId, tokenRange) to prevent duplicate consumers + Map<String, CdcConsumerEntry> uniqueCdcConsumers = new HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum()); - ownedRanges.entrySet().stream() + try + { + ownedRanges.entrySet().stream() .flatMap(entry -> entry.getValue().stream().map(range -> { Integer instanceId = getInstanceId(entry.getKey()); @@ -141,69 +137,38 @@ public class CdcManager range.startAsBigInt(), range.endAsBigInt()); - // Only create consumer if not already created for this (instance, range) - return uniqueConsumers.computeIfAbsent(uniqueKey, k -> { - try - { - return loadOrBuildCdcConsumer(instanceId, - clusterConfigProvider, - eventConsumer, - schemaSupplier, - () -> org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(), range.endAsBigInt()), - sidecarInstancesProvider, - secretsProvider, - clientConfig, - conf, - cdcStats, - taskExecutorPool); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); + return uniqueCdcConsumers.computeIfAbsent(uniqueKey, k -> + buildConsumer(conf.jobId(), + instanceId, + clusterConfigProvider, + eventConsumer, + schemaSupplier, + () -> org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(), range.endAsBigInt()), + sidecarCdcClient, + cdcStats)); })) .collect(Collectors.toList()); - consumers = new ArrayList<>(uniqueConsumers.values()); - return consumers; - } - - SidecarCdc loadOrBuildCdcConsumer(Integer instanceId, - ClusterConfigProvider clusterConfigProvider, - EventConsumer eventConsumer, - SchemaSupplier schemaSupplier, - TokenRangeSupplier tokenRangeSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SecretsProvider secretsProvider, - SidecarCdcClient.ClientConfig clientConfig, - CdcConfig conf, - ICdcStats cdcStats, - TaskExecutorPool taskExecutorPool) throws IOException - { - return buildConsumer(conf.jobId(), - instanceId, - new SidecarCdcOptions(instanceFetcher), - clusterConfigProvider, - eventConsumer, - schemaSupplier, - tokenRangeSupplier, - sidecarInstancesProvider, - clientConfig, - secretsProvider, - cdcStats, - taskExecutorPool); + entries = new ArrayList<>(uniqueCdcConsumers.values()); + return entries; + } + catch (RuntimeException e) + { + // Stop any already-built consumers/persisters so timers and threads don't leak + uniqueCdcConsumers.values().forEach(CdcConsumerEntry::stop); + throw e; + } } public void startConsumers() { - consumers.forEach(SidecarCdc::initSchema); - consumers.forEach(SidecarCdc::start); + entries.forEach(CdcConsumerEntry::start); } public void stopConsumers() { - consumers.forEach(SidecarCdc::stop); + entries.forEach(CdcConsumerEntry::stop); + entries.clear(); } @VisibleForTesting @@ -221,44 +186,37 @@ public class CdcManager } - public SidecarCdc buildConsumer(@NotNull String jobId, - int partitionId, - CdcOptions cdcOptions, - ClusterConfigProvider clusterConfigProvider, - EventConsumer eventConsumer, - SchemaSupplier schemaSupplier, - TokenRangeSupplier tokenRangeSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - SecretsProvider secretsProvider, - ICdcStats cdcStats, - TaskExecutorPool taskExecutorPool) throws IOException + CdcConsumerEntry buildConsumer(@NotNull String jobId, + int partitionId, + ClusterConfigProvider clusterConfigProvider, + EventConsumer eventConsumer, + SchemaSupplier schemaSupplier, + TokenRangeSupplier tokenRangeSupplier, + SidecarCdcClient sidecarCdcClient, + ICdcStats cdcStats) { - - AsyncExecutor asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool); - - final SidecarStatePersister sidecarStatePersister = getSidecarStatePersister(cdcOptions, asyncExecutor); - return (SidecarCdc) SidecarCdc.builder(jobId, - partitionId, - cdcOptions, - clusterConfigProvider, - eventConsumer, - schemaSupplier, - tokenRangeSupplier, - sidecarInstancesProvider, - clientConfig, - secretsProvider, - cdcStats).withExecutor(asyncExecutor).withStatePersister(sidecarStatePersister).build(); + SidecarStatePersister persister = getSidecarStatePersister(); + SidecarCdc consumer = SidecarCdc.builder(jobId, + partitionId, + cdcOptions, + clusterConfigProvider, + eventConsumer, + schemaSupplier, + tokenRangeSupplier, + sidecarCdcClient, + cdcStats) + .withExecutor(asyncExecutor) + .withSidecarStatePersister(persister) + .build(); + return new CdcConsumerEntry(consumer, persister); } - private @NotNull SidecarStatePersister getSidecarStatePersister(CdcOptions cdcOptions, AsyncExecutor asyncExecutor) + private @NotNull SidecarStatePersister getSidecarStatePersister() { - SidecarStatePersister sidecarStatePersister = new SidecarStatePersister(org.apache.cassandra.cdc.sidecar.SidecarCdcOptions.DEFAULT, - cdcOptions, - SidecarCdcStats.STUB, - new StateSidecarCdcCassandraClient(cdcDatabaseAccessor), - asyncExecutor); - sidecarStatePersister.start(); - return sidecarStatePersister; + return new SidecarStatePersister(org.apache.cassandra.cdc.sidecar.SidecarCdcOptions.DEFAULT, + cdcOptions, + SidecarCdcStats.STUB, + cassandraClient, + asyncExecutor); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java new file mode 100644 index 00000000..051ed3e9 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import java.util.TreeMap; + +import org.apache.cassandra.secrets.SslConfig; +import org.apache.cassandra.secrets.SslConfigSecretsProvider; +import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SslConfiguration; + +/** + * A {@link SslConfigSecretsProvider} that reads SSL configuration from {@link SidecarConfiguration} + * to provide keystore and truststore secrets for the sidecar client. + */ +public class SidecarClientSecretsProvider extends SslConfigSecretsProvider +{ + public SidecarClientSecretsProvider(SidecarConfiguration sidecarConfiguration) + { + super(SslConfig.create(buildSslConfigMap(sidecarConfiguration))); + } + + private static Map<String, String> buildSslConfigMap(SidecarConfiguration sidecarConfiguration) + { + SslConfiguration sslConfiguration = sidecarConfiguration.sidecarClientConfiguration().sslConfiguration(); + + Map<String, String> sslConfigMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + + if (sslConfiguration.isKeystoreConfigured()) + { + KeyStoreConfiguration keystore = sslConfiguration.keystore(); + sslConfigMap.put(SslConfig.KEYSTORE_PATH, keystore.path()); + sslConfigMap.put(SslConfig.KEYSTORE_PASSWORD, keystore.password()); + sslConfigMap.put(SslConfig.KEYSTORE_TYPE, keystore.type()); + } + + if (sslConfiguration.isTrustStoreConfigured()) + { + KeyStoreConfiguration truststore = sslConfiguration.truststore(); + sslConfigMap.put(SslConfig.TRUSTSTORE_PATH, truststore.path()); + sslConfigMap.put(SslConfig.TRUSTSTORE_PASSWORD, truststore.password()); + sslConfigMap.put(SslConfig.TRUSTSTORE_TYPE, truststore.type()); + } + + return sslConfigMap; + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java new file mode 100644 index 00000000..9891905f --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.cdc.sidecar.SidecarCdc; +import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; +import org.mockito.InOrder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +/** Unit tests for {@link CdcConsumerEntry}. */ +public class CdcConsumerEntryTest +{ + @Test + void startCallsPersisterBeforeConsumer() + { + SidecarCdc consumer = mock(SidecarCdc.class); + SidecarStatePersister persister = mock(SidecarStatePersister.class); + CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister); + + entry.start(); + + InOrder order = inOrder(persister, consumer); + order.verify(persister).start(); + order.verify(consumer).initSchema(); + order.verify(consumer).start(); + } + + @Test + void stopCallsConsumerBeforePersister() + { + SidecarCdc consumer = mock(SidecarCdc.class); + SidecarStatePersister persister = mock(SidecarStatePersister.class); + CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister); + + entry.stop(); + + InOrder order = inOrder(consumer, persister); + order.verify(consumer).stop(); + order.verify(persister).stop(true); + } + + @Test + void accessorsReturnConstructorArguments() + { + SidecarCdc consumer = mock(SidecarCdc.class); + SidecarStatePersister persister = mock(SidecarStatePersister.class); + CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister); + + assertThat(entry.consumer()).isSameAs(consumer); + assertThat(entry.persister()).isSameAs(persister); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java index 48fe70b5..55d7c169 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java @@ -32,14 +32,14 @@ import java.util.Set; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdc; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; +import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.secrets.SecretsProvider; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; @@ -80,17 +80,15 @@ public class CdcManagerTest @Mock private ClusterConfigProvider clusterConfigProvider; @Mock - private CdcSidecarInstancesProvider sidecarInstancesProvider; - @Mock - private SecretsProvider secretsProvider; - @Mock - private SidecarCdcClient.ClientConfig clientConfig; + private SidecarCdcClient sidecarCdcClient; @Mock private ICdcStats cdcStats; @Mock private TaskExecutorPool taskExecutorPool; @Mock private CdcDatabaseAccessor cdcDatabaseAccessor; + @Mock + private CdcOptions cdcOptions; private CdcManager cdcManager; @@ -106,12 +104,11 @@ public class CdcManagerTest rangeManager, instanceFetcher, clusterConfigProvider, - sidecarInstancesProvider, - secretsProvider, - clientConfig, + sidecarCdcClient, cdcStats, taskExecutorPool, - cdcDatabaseAccessor + cdcDatabaseAccessor, + cdcOptions ); } @@ -152,12 +149,12 @@ public class CdcManagerTest when(cdcConfig.jobId()).thenReturn("test-job"); CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer = mock(SidecarCdc.class); - doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(1); } @@ -183,13 +180,13 @@ public class CdcManagerTest when(cdcConfig.jobId()).thenReturn("test-job"); CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer1 = mock(SidecarCdc.class); - SidecarCdc mockConsumer2 = mock(SidecarCdc.class); - doReturn(mockConsumer1, mockConsumer2).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry1 = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + CdcConsumerEntry mockEntry2 = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry1, mockEntry2).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(2); } @@ -218,13 +215,13 @@ public class CdcManagerTest when(cdcConfig.jobId()).thenReturn("test-job"); CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer1 = mock(SidecarCdc.class); - SidecarCdc mockConsumer2 = mock(SidecarCdc.class); - doReturn(mockConsumer1, mockConsumer2).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry1 = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + CdcConsumerEntry mockEntry2 = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry1, mockEntry2).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(2); } @@ -251,12 +248,12 @@ public class CdcManagerTest when(cdcConfig.jobId()).thenReturn("test-job"); CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer = mock(SidecarCdc.class); - doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(1); } @@ -273,13 +270,14 @@ public class CdcManagerTest when(instanceFetcher.instance(unknownIp)).thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp)); when(cdcConfig.jobId()).thenReturn("test-job"); + // Spy to mock buildConsumer - will be called with instanceId = -1 CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer = mock(SidecarCdc.class); - doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(1); } @@ -320,16 +318,16 @@ public class CdcManagerTest when(cdcConfig.jobId()).thenReturn("test-job"); CdcManager spyManager = spy(cdcManager); - SidecarCdc mockConsumer = mock(SidecarCdc.class); - doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( - anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + CdcConsumerEntry mockEntry = new CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class)); + doReturn(mockEntry).when(spyManager).buildConsumer( + any(), anyInt(), any(), any(), any(), any(), any(), any() ); - List<SidecarCdc> consumers = spyManager.buildCdcConsumers(); + List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers(); assertThat(consumers).hasSize(1); - verify(spyManager).loadOrBuildCdcConsumer( - eq(instanceId), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + verify(spyManager).buildConsumer( + any(), eq(instanceId), any(), any(), any(), any(), any(), any() ); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java new file mode 100644 index 00000000..2e671c02 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.secrets.SecretsProvider; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SslConfiguration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link SidecarClientSecretsProvider} + */ +public class SidecarClientSecretsProviderTests +{ + @Test + void testSecretsProviderWithSslEnabledNoKeystoreNoTruststore() + { + SslConfiguration sslConfig = mockSslConfiguration( + true, + true, + "REQUIRED", + Arrays.asList("TLS_RSA_128"), + Arrays.asList("TLSv1.2"), + "10s", + false, + false + ); + + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(sslConfig); + + SecretsProvider result = new SidecarClientSecretsProvider(sidecarConfiguration); + + assertThat(result).isNotNull(); + } + + @Test + void testSecretsProviderWithKeystoreOnly() + { + KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( + "/path/to/keystore.jks", + "keystorePassword", + "JKS" + ); + + SslConfiguration sslConfig = mockSslConfiguration( + true, + false, + "OPTIONAL", + Arrays.asList("TLS_RSA_256"), + Arrays.asList("TLSv1.3"), + "15s", + true, + false + ); + + when(sslConfig.keystore()).thenReturn(keystoreConfig); + + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(sslConfig); + + SecretsProvider result = new SidecarClientSecretsProvider(sidecarConfiguration); + + assertThat(result).isNotNull(); + assertThat(result.keyStoreType()).isEqualTo("JKS"); + assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray()); + } + + @Test + void testSecretsProviderWithTruststoreOnly() + { + // SslConfig validation requires keystore password when any SSL config is provided + // This test validates that truststore-only configuration is rejected + KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( + "/path/to/truststore.jks", + "truststorePassword", + "PKCS12" + ); + + SslConfiguration sslConfig = mockSslConfiguration( + true, + true, + "NONE", + Collections.emptyList(), + Arrays.asList("TLSv1.2", "TLSv1.3"), + "20s", + false, + true + ); + + when(sslConfig.truststore()).thenReturn(truststoreConfig); + + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(sslConfig); + + IllegalArgumentException exception = org.junit.jupiter.api.Assertions.assertThrows( + IllegalArgumentException.class, + () -> new SidecarClientSecretsProvider(sidecarConfiguration) + ); + + assertThat(exception.getMessage()).contains("KEYSTORE_PASSWORD"); + } + + @Test + void testSecretsProviderWithBothKeystoreAndTruststore() + { + KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( + "/path/to/keystore.p12", + "keystorePass123", + "PKCS12" + ); + + KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( + "/path/to/truststore.p12", + "truststorePass456", + "PKCS12" + ); + + SslConfiguration sslConfig = mockSslConfiguration( + true, + true, + "REQUIRED", + Arrays.asList("TLS_ECDHE_RSA", "TLS_AES_256"), + Arrays.asList("TLSv1.2", "TLSv1.3"), + "30s", + true, + true + ); + + when(sslConfig.keystore()).thenReturn(keystoreConfig); + when(sslConfig.truststore()).thenReturn(truststoreConfig); + + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(sslConfig); + + SecretsProvider result = new SidecarClientSecretsProvider(sidecarConfiguration); + + assertThat(result).isNotNull(); + assertThat(result.keyStoreType()).isEqualTo("PKCS12"); + assertThat(result.keyStorePassword()).isEqualTo("keystorePass123".toCharArray()); + assertThat(result.trustStoreType()).isEqualTo("PKCS12"); + assertThat(result.trustStorePassword()).isEqualTo("truststorePass456".toCharArray()); + } + + @Test + void testSecretsProviderUsesCorrectSslConfigKeys() + { + KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( + "/path/to/keystore.jks", + "keystorePassword", + "JKS" + ); + + KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( + "/path/to/truststore.jks", + "truststorePassword", + "PKCS12" + ); + + SslConfiguration sslConfig = mockSslConfiguration( + true, + false, + "REQUIRED", + Collections.emptyList(), + Arrays.asList("TLSv1.2"), + "10s", + true, + true + ); + + when(sslConfig.keystore()).thenReturn(keystoreConfig); + when(sslConfig.truststore()).thenReturn(truststoreConfig); + + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(sslConfig); + + SecretsProvider result = new SidecarClientSecretsProvider(sidecarConfiguration); + + assertThat(result).isNotNull(); + assertThat(result).isInstanceOf(SidecarClientSecretsProvider.class); + + assertThat(result.keyStoreType()).isEqualTo("JKS"); + assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray()); + + assertThat(result.trustStoreType()).isEqualTo("PKCS12"); + assertThat(result.trustStorePassword()).isEqualTo("truststorePassword".toCharArray()); + } + + private SidecarConfiguration mockSidecarConfiguration(SslConfiguration sslConfiguration) + { + SidecarConfiguration sidecarConfiguration = mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS); + when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfiguration); + return sidecarConfiguration; + } + + private SslConfiguration mockSslConfiguration(boolean enabled, + boolean preferOpenSSL, + String clientAuth, + java.util.List<String> cipherSuites, + java.util.List<String> secureTransportProtocols, + String handshakeTimeout, + boolean keystoreConfigured, + boolean truststoreConfigured) + { + SslConfiguration sslConfig = mock(SslConfiguration.class, RETURNS_DEEP_STUBS); + when(sslConfig.enabled()).thenReturn(enabled); + when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL); + when(sslConfig.clientAuth()).thenReturn(clientAuth); + when(sslConfig.cipherSuites()).thenReturn(cipherSuites); + when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols); + + SecondBoundConfiguration durationSpec = mock(SecondBoundConfiguration.class); + when(durationSpec.toString()).thenReturn(handshakeTimeout); + when(sslConfig.handshakeTimeout()).thenReturn(durationSpec); + + when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured); + when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured); + + return sslConfig; + } + + private KeyStoreConfiguration mockKeystoreConfiguration(String path, String password, String type) + { + KeyStoreConfiguration config = mock(KeyStoreConfiguration.class); + when(config.path()).thenReturn(path); + when(config.password()).thenReturn(password); + when(config.type()).thenReturn(type); + return config; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
