This is an automated email from the ASF dual-hosted git repository.

jyothsnakonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git

commit c2f352f2eac1b9b99adc0cf569370d6a8d2f35c8
Author: jkonisa <[email protected]>
AuthorDate: Tue Mar 31 13:36:16 2026 -0700

    CASSSIDECAR-456: Fix CDC resource leaks: thread leaks, Kafka resource 
cleanup, singleton SidecarCdcClient
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-456
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/cdc/CdcConsumerEntry.java    |  61 +++++
 .../apache/cassandra/sidecar/cdc/CdcManager.java   | 176 ++++++--------
 .../sidecar/cdc/SidecarClientSecretsProvider.java  |  65 ++++++
 .../sidecar/cdc/CdcConsumerEntryTest.java          |  73 ++++++
 .../cassandra/sidecar/cdc/CdcManagerTest.java      |  76 +++----
 .../cdc/SidecarClientSecretsProviderTests.java     | 252 +++++++++++++++++++++
 7 files changed, 556 insertions(+), 148 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 78c4e547..c2e668b0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Fix CDC resource leaks: thread leaks, Kafka resource cleanup, singleton 
SidecarCdcClient (CASSSIDECAR-456)
  * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455)
  * Support IAM instance profile credentials for S3 restore jobs 
(CASSSIDECAR-415)
  * Adding endpoint for verifying files post data copy during live migration 
(CASSSIDECAR-226)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java
new file mode 100644
index 00000000..2e33982f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntry.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
+
+/**
+ * Pairs a {@link SidecarCdc} consumer with its {@link SidecarStatePersister} 
so they are
+ * always stopped together in the correct order: consumer first (blocking), 
then persister flush.
+ */
+class CdcConsumerEntry
+{
+    private final SidecarCdc consumer;
+    private final SidecarStatePersister persister;
+
+    CdcConsumerEntry(SidecarCdc consumer, SidecarStatePersister persister)
+    {
+        this.consumer = consumer;
+        this.persister = persister;
+    }
+
+    SidecarCdc consumer()
+    {
+        return consumer;
+    }
+
+    SidecarStatePersister persister()
+    {
+        return persister;
+    }
+
+    void start()
+    {
+        persister.start();
+        consumer.initSchema();
+        consumer.start();
+    }
+
+    void stop()
+    {
+        consumer.stop();           // blocking — waits for any active run() to 
complete
+        persister.stop(true);      // flush buffered state to Cassandra, then 
cancel timer
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
index d5bb24e4..ef7802ac 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.cdc;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,14 +32,12 @@ import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
 import org.apache.cassandra.cdc.api.TokenRangeSupplier;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdc;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcStats;
 import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.coordination.RangeManager;
@@ -83,13 +80,12 @@ public class CdcManager
     private final EventConsumer eventConsumer;
     private final SchemaSupplier schemaSupplier;
     private final ClusterConfigProvider clusterConfigProvider;
-    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
-    private final SecretsProvider secretsProvider;
-    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final SidecarCdcClient sidecarCdcClient;
     private final ICdcStats cdcStats;
-    private List<SidecarCdc> consumers = new ArrayList<>();
-    private final TaskExecutorPool taskExecutorPool;
-    private final CdcDatabaseAccessor cdcDatabaseAccessor;
+    private List<CdcConsumerEntry> entries = new ArrayList<>();
+    private final CdcOptions cdcOptions;
+    private final AsyncExecutor asyncExecutor;
+    private final StateSidecarCdcCassandraClient cassandraClient;
 
 
     public CdcManager(EventConsumer eventConsumer,
@@ -98,12 +94,11 @@ public class CdcManager
                       RangeManager rangeManager,
                       InstanceMetadataFetcher instanceFetcher,
                       ClusterConfigProvider clusterConfigProvider,
-                      CdcSidecarInstancesProvider sidecarInstancesProvider,
-                      SecretsProvider secretsProvider,
-                      SidecarCdcClient.ClientConfig clientConfig,
+                      SidecarCdcClient sidecarCdcClient,
                       ICdcStats cdcStats,
                       TaskExecutorPool taskExecutorPool,
-                      CdcDatabaseAccessor cdcDatabaseAccessor)
+                      CdcDatabaseAccessor cdcDatabaseAccessor,
+                      CdcOptions cdcOptions)
     {
         this.eventConsumer = eventConsumer;
         this.schemaSupplier = schemaSupplier;
@@ -111,15 +106,14 @@ public class CdcManager
         this.rangeManager = rangeManager;
         this.instanceFetcher = instanceFetcher;
         this.clusterConfigProvider = clusterConfigProvider;
-        this.sidecarInstancesProvider = sidecarInstancesProvider;
-        this.secretsProvider = secretsProvider;
-        this.clientConfig = clientConfig;
+        this.sidecarCdcClient = sidecarCdcClient;
         this.cdcStats = cdcStats;
-        this.taskExecutorPool = taskExecutorPool;
-        this.cdcDatabaseAccessor = cdcDatabaseAccessor;
+        this.cdcOptions = cdcOptions;
+        this.asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool);
+        this.cassandraClient = new 
StateSidecarCdcCassandraClient(cdcDatabaseAccessor);
     }
 
-    List<SidecarCdc> buildCdcConsumers()
+    List<CdcConsumerEntry> buildCdcConsumers()
     {
         Map<String, Set<TokenRange>> ownedRanges = 
rangeManager.ownedTokenRanges();
         if (ownedRanges == null || ownedRanges.isEmpty())
@@ -127,10 +121,12 @@ public class CdcManager
             throw new IllegalStateException("No owned token ranges right now, 
cql session may still be initializing.");
         }
 
-        // NEW: Deduplicate by (instanceId, tokenRange) to prevent duplicate 
consumers
-        Map<String, SidecarCdc> uniqueConsumers = new 
HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum());
+        // Deduplicate by (instanceId, tokenRange) to prevent duplicate 
consumers
+        Map<String, CdcConsumerEntry> uniqueCdcConsumers = new 
HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum());
 
-        ownedRanges.entrySet().stream()
+        try
+        {
+            ownedRanges.entrySet().stream()
                    .flatMap(entry ->
                             entry.getValue().stream().map(range -> {
                                 Integer instanceId = 
getInstanceId(entry.getKey());
@@ -141,69 +137,38 @@ public class CdcManager
                                                                  
range.startAsBigInt(),
                                                                  
range.endAsBigInt());
 
-                                // Only create consumer if not already created 
for this (instance, range)
-                                return 
uniqueConsumers.computeIfAbsent(uniqueKey, k -> {
-                                    try
-                                    {
-                                        return 
loadOrBuildCdcConsumer(instanceId,
-                                                                      
clusterConfigProvider,
-                                                                      
eventConsumer,
-                                                                      
schemaSupplier,
-                                                                      () -> 
org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(), 
range.endAsBigInt()),
-                                                                      
sidecarInstancesProvider,
-                                                                      
secretsProvider,
-                                                                      
clientConfig,
-                                                                      conf,
-                                                                      cdcStats,
-                                                                      
taskExecutorPool);
-                                    }
-                                    catch (IOException e)
-                                    {
-                                        throw new RuntimeException(e);
-                                    }
-                                });
+                                return 
uniqueCdcConsumers.computeIfAbsent(uniqueKey, k ->
+                                        buildConsumer(conf.jobId(),
+                                                      instanceId,
+                                                      clusterConfigProvider,
+                                                      eventConsumer,
+                                                      schemaSupplier,
+                                                      () -> 
org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(), 
range.endAsBigInt()),
+                                                      sidecarCdcClient,
+                                                      cdcStats));
                             }))
                    .collect(Collectors.toList());
 
-        consumers = new ArrayList<>(uniqueConsumers.values());
-        return consumers;
-    }
-
-    SidecarCdc loadOrBuildCdcConsumer(Integer instanceId,
-                                      ClusterConfigProvider 
clusterConfigProvider,
-                                      EventConsumer eventConsumer,
-                                      SchemaSupplier schemaSupplier,
-                                      TokenRangeSupplier tokenRangeSupplier,
-                                      CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                                      SecretsProvider secretsProvider,
-                                      SidecarCdcClient.ClientConfig 
clientConfig,
-                                      CdcConfig conf,
-                                      ICdcStats cdcStats,
-                                      TaskExecutorPool taskExecutorPool) 
throws IOException
-    {
-        return buildConsumer(conf.jobId(),
-                             instanceId,
-                             new SidecarCdcOptions(instanceFetcher),
-                             clusterConfigProvider,
-                             eventConsumer,
-                             schemaSupplier,
-                             tokenRangeSupplier,
-                             sidecarInstancesProvider,
-                             clientConfig,
-                             secretsProvider,
-                             cdcStats,
-                             taskExecutorPool);
+            entries = new ArrayList<>(uniqueCdcConsumers.values());
+            return entries;
+        }
+        catch (RuntimeException e)
+        {
+            // Stop any already-built consumers/persisters so timers and 
threads don't leak
+            uniqueCdcConsumers.values().forEach(CdcConsumerEntry::stop);
+            throw e;
+        }
     }
 
     public void startConsumers()
     {
-        consumers.forEach(SidecarCdc::initSchema);
-        consumers.forEach(SidecarCdc::start);
+        entries.forEach(CdcConsumerEntry::start);
     }
 
     public void stopConsumers()
     {
-        consumers.forEach(SidecarCdc::stop);
+        entries.forEach(CdcConsumerEntry::stop);
+        entries.clear();
     }
 
     @VisibleForTesting
@@ -221,44 +186,37 @@ public class CdcManager
     }
 
 
-    public SidecarCdc buildConsumer(@NotNull String jobId,
-                                    int partitionId,
-                                    CdcOptions cdcOptions,
-                                    ClusterConfigProvider 
clusterConfigProvider,
-                                    EventConsumer eventConsumer,
-                                    SchemaSupplier schemaSupplier,
-                                    TokenRangeSupplier tokenRangeSupplier,
-                                    CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                                    SidecarCdcClient.ClientConfig clientConfig,
-                                    SecretsProvider secretsProvider,
-                                    ICdcStats cdcStats,
-                                    TaskExecutorPool taskExecutorPool) throws 
IOException
+    CdcConsumerEntry buildConsumer(@NotNull String jobId,
+                                   int partitionId,
+                                   ClusterConfigProvider clusterConfigProvider,
+                                   EventConsumer eventConsumer,
+                                   SchemaSupplier schemaSupplier,
+                                   TokenRangeSupplier tokenRangeSupplier,
+                                   SidecarCdcClient sidecarCdcClient,
+                                   ICdcStats cdcStats)
     {
-
-        AsyncExecutor asyncExecutor = new 
ExecutorPoolsExecutor(taskExecutorPool);
-
-        final SidecarStatePersister sidecarStatePersister = 
getSidecarStatePersister(cdcOptions, asyncExecutor);
-        return (SidecarCdc) SidecarCdc.builder(jobId,
-                                               partitionId,
-                                               cdcOptions,
-                                               clusterConfigProvider,
-                                               eventConsumer,
-                                               schemaSupplier,
-                                               tokenRangeSupplier,
-                                               sidecarInstancesProvider,
-                                               clientConfig,
-                                               secretsProvider,
-                                               
cdcStats).withExecutor(asyncExecutor).withStatePersister(sidecarStatePersister).build();
+        SidecarStatePersister persister = getSidecarStatePersister();
+        SidecarCdc consumer = SidecarCdc.builder(jobId,
+                                                  partitionId,
+                                                  cdcOptions,
+                                                  clusterConfigProvider,
+                                                  eventConsumer,
+                                                  schemaSupplier,
+                                                  tokenRangeSupplier,
+                                                  sidecarCdcClient,
+                                                  cdcStats)
+                                         .withExecutor(asyncExecutor)
+                                         .withSidecarStatePersister(persister)
+                                         .build();
+        return new CdcConsumerEntry(consumer, persister);
     }
 
-    private @NotNull SidecarStatePersister getSidecarStatePersister(CdcOptions 
cdcOptions, AsyncExecutor asyncExecutor)
+    private @NotNull SidecarStatePersister getSidecarStatePersister()
     {
-        SidecarStatePersister sidecarStatePersister = new 
SidecarStatePersister(org.apache.cassandra.cdc.sidecar.SidecarCdcOptions.DEFAULT,
-                                                                               
 cdcOptions,
-                                                                               
 SidecarCdcStats.STUB,
-                                                                               
 new StateSidecarCdcCassandraClient(cdcDatabaseAccessor),
-                                                                               
 asyncExecutor);
-        sidecarStatePersister.start();
-        return sidecarStatePersister;
+        return new 
SidecarStatePersister(org.apache.cassandra.cdc.sidecar.SidecarCdcOptions.DEFAULT,
+                                         cdcOptions,
+                                         SidecarCdcStats.STUB,
+                                         cassandraClient,
+                                         asyncExecutor);
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java
new file mode 100644
index 00000000..051ed3e9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+
+/**
+ * A {@link SslConfigSecretsProvider} that reads SSL configuration from {@link 
SidecarConfiguration}
+ * to provide keystore and truststore secrets for the sidecar client.
+ */
+public class SidecarClientSecretsProvider extends SslConfigSecretsProvider
+{
+    public SidecarClientSecretsProvider(SidecarConfiguration 
sidecarConfiguration)
+    {
+        super(SslConfig.create(buildSslConfigMap(sidecarConfiguration)));
+    }
+
+    private static Map<String, String> buildSslConfigMap(SidecarConfiguration 
sidecarConfiguration)
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        Map<String, String> sslConfigMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+        if (sslConfiguration.isKeystoreConfigured())
+        {
+            KeyStoreConfiguration keystore = sslConfiguration.keystore();
+            sslConfigMap.put(SslConfig.KEYSTORE_PATH, keystore.path());
+            sslConfigMap.put(SslConfig.KEYSTORE_PASSWORD, keystore.password());
+            sslConfigMap.put(SslConfig.KEYSTORE_TYPE, keystore.type());
+        }
+
+        if (sslConfiguration.isTrustStoreConfigured())
+        {
+            KeyStoreConfiguration truststore = sslConfiguration.truststore();
+            sslConfigMap.put(SslConfig.TRUSTSTORE_PATH, truststore.path());
+            sslConfigMap.put(SslConfig.TRUSTSTORE_PASSWORD, 
truststore.password());
+            sslConfigMap.put(SslConfig.TRUSTSTORE_TYPE, truststore.type());
+        }
+
+        return sslConfigMap;
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java
new file mode 100644
index 00000000..9891905f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConsumerEntryTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
+import org.mockito.InOrder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
+/** Unit tests for {@link CdcConsumerEntry}. */
+public class CdcConsumerEntryTest
+{
+    @Test
+    void startCallsPersisterBeforeConsumer()
+    {
+        SidecarCdc consumer = mock(SidecarCdc.class);
+        SidecarStatePersister persister = mock(SidecarStatePersister.class);
+        CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister);
+
+        entry.start();
+
+        InOrder order = inOrder(persister, consumer);
+        order.verify(persister).start();
+        order.verify(consumer).initSchema();
+        order.verify(consumer).start();
+    }
+
+    @Test
+    void stopCallsConsumerBeforePersister()
+    {
+        SidecarCdc consumer = mock(SidecarCdc.class);
+        SidecarStatePersister persister = mock(SidecarStatePersister.class);
+        CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister);
+
+        entry.stop();
+
+        InOrder order = inOrder(consumer, persister);
+        order.verify(consumer).stop();
+        order.verify(persister).stop(true);
+    }
+
+    @Test
+    void accessorsReturnConstructorArguments()
+    {
+        SidecarCdc consumer = mock(SidecarCdc.class);
+        SidecarStatePersister persister = mock(SidecarStatePersister.class);
+        CdcConsumerEntry entry = new CdcConsumerEntry(consumer, persister);
+
+        assertThat(entry.consumer()).isSameAs(consumer);
+        assertThat(entry.persister()).isSameAs(persister);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java
index 48fe70b5..55d7c169 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java
@@ -32,14 +32,14 @@ import java.util.Set;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdc;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
@@ -80,17 +80,15 @@ public class CdcManagerTest
     @Mock
     private ClusterConfigProvider clusterConfigProvider;
     @Mock
-    private CdcSidecarInstancesProvider sidecarInstancesProvider;
-    @Mock
-    private SecretsProvider secretsProvider;
-    @Mock
-    private SidecarCdcClient.ClientConfig clientConfig;
+    private SidecarCdcClient sidecarCdcClient;
     @Mock
     private ICdcStats cdcStats;
     @Mock
     private TaskExecutorPool taskExecutorPool;
     @Mock
     private CdcDatabaseAccessor cdcDatabaseAccessor;
+    @Mock
+    private CdcOptions cdcOptions;
 
     private CdcManager cdcManager;
 
@@ -106,12 +104,11 @@ public class CdcManagerTest
             rangeManager,
             instanceFetcher,
             clusterConfigProvider,
-            sidecarInstancesProvider,
-            secretsProvider,
-            clientConfig,
+            sidecarCdcClient,
             cdcStats,
             taskExecutorPool,
-            cdcDatabaseAccessor
+            cdcDatabaseAccessor,
+            cdcOptions
         );
     }
 
@@ -152,12 +149,12 @@ public class CdcManagerTest
         when(cdcConfig.jobId()).thenReturn("test-job");
 
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer = mock(SidecarCdc.class);
-        doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry).when(spyManager).buildConsumer(
+            any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(1);
     }
@@ -183,13 +180,13 @@ public class CdcManagerTest
         when(cdcConfig.jobId()).thenReturn("test-job");
 
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer1 = mock(SidecarCdc.class);
-        SidecarCdc mockConsumer2 = mock(SidecarCdc.class);
-        doReturn(mockConsumer1, 
mockConsumer2).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry1 = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        CdcConsumerEntry mockEntry2 = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry1, mockEntry2).when(spyManager).buildConsumer(
+            any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(2);
     }
@@ -218,13 +215,13 @@ public class CdcManagerTest
         when(cdcConfig.jobId()).thenReturn("test-job");
 
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer1 = mock(SidecarCdc.class);
-        SidecarCdc mockConsumer2 = mock(SidecarCdc.class);
-        doReturn(mockConsumer1, 
mockConsumer2).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry1 = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        CdcConsumerEntry mockEntry2 = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry1, mockEntry2).when(spyManager).buildConsumer(
+            any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(2);
     }
@@ -251,12 +248,12 @@ public class CdcManagerTest
         when(cdcConfig.jobId()).thenReturn("test-job");
 
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer = mock(SidecarCdc.class);
-        doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry).when(spyManager).buildConsumer(
+            any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(1);
     }
@@ -273,13 +270,14 @@ public class CdcManagerTest
         when(instanceFetcher.instance(unknownIp)).thenThrow(new 
NoSuchCassandraInstanceException("Instance not found: " + unknownIp));
         when(cdcConfig.jobId()).thenReturn("test-job");
 
+        // Spy to mock buildConsumer - will be called with instanceId = -1
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer = mock(SidecarCdc.class);
-        doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry).when(spyManager).buildConsumer(
+            any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(1);
     }
@@ -320,16 +318,16 @@ public class CdcManagerTest
         when(cdcConfig.jobId()).thenReturn("test-job");
 
         CdcManager spyManager = spy(cdcManager);
-        SidecarCdc mockConsumer = mock(SidecarCdc.class);
-        doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
-            anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any()
+        CdcConsumerEntry mockEntry = new 
CdcConsumerEntry(mock(SidecarCdc.class), mock(SidecarStatePersister.class));
+        doReturn(mockEntry).when(spyManager).buildConsumer(
+                any(), anyInt(), any(), any(), any(), any(), any(), any()
         );
 
-        List<SidecarCdc> consumers = spyManager.buildCdcConsumers();
+        List<CdcConsumerEntry> consumers = spyManager.buildCdcConsumers();
 
         assertThat(consumers).hasSize(1);
-        verify(spyManager).loadOrBuildCdcConsumer(
-            eq(instanceId), any(), any(), any(), any(), any(), any(), any(), 
any(), any(), any()
+        verify(spyManager).buildConsumer(
+            any(), eq(instanceId), any(), any(), any(), any(), any(), any()
         );
     }
 
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java
new file mode 100644
index 00000000..2e671c02
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/SidecarClientSecretsProviderTests.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.secrets.SecretsProvider;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link SidecarClientSecretsProvider}
+ */
+public class SidecarClientSecretsProviderTests
+{
+    @Test
+    void testSecretsProviderWithSslEnabledNoKeystoreNoTruststore()
+    {
+        SslConfiguration sslConfig = mockSslConfiguration(
+            true,
+            true,
+            "REQUIRED",
+            Arrays.asList("TLS_RSA_128"),
+            Arrays.asList("TLSv1.2"),
+            "10s",
+            false,
+            false
+        );
+
+        SidecarConfiguration sidecarConfiguration = 
mockSidecarConfiguration(sslConfig);
+
+        SecretsProvider result = new 
SidecarClientSecretsProvider(sidecarConfiguration);
+
+        assertThat(result).isNotNull();
+    }
+
+    @Test
+    void testSecretsProviderWithKeystoreOnly()
+    {
+        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
+            "/path/to/keystore.jks",
+            "keystorePassword",
+            "JKS"
+        );
+
+        SslConfiguration sslConfig = mockSslConfiguration(
+            true,
+            false,
+            "OPTIONAL",
+            Arrays.asList("TLS_RSA_256"),
+            Arrays.asList("TLSv1.3"),
+            "15s",
+            true,
+            false
+        );
+
+        when(sslConfig.keystore()).thenReturn(keystoreConfig);
+
+        SidecarConfiguration sidecarConfiguration = 
mockSidecarConfiguration(sslConfig);
+
+        SecretsProvider result = new 
SidecarClientSecretsProvider(sidecarConfiguration);
+
+        assertThat(result).isNotNull();
+        assertThat(result.keyStoreType()).isEqualTo("JKS");
+        
assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray());
+    }
+
+    @Test
+    void testSecretsProviderWithTruststoreOnly()
+    {
+        // SslConfig validation requires keystore password when any SSL config 
is provided
+        // This test validates that truststore-only configuration is rejected
+        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
+            "/path/to/truststore.jks",
+            "truststorePassword",
+            "PKCS12"
+        );
+
+        SslConfiguration sslConfig = mockSslConfiguration(
+            true,
+            true,
+            "NONE",
+            Collections.emptyList(),
+            Arrays.asList("TLSv1.2", "TLSv1.3"),
+            "20s",
+            false,
+            true
+        );
+
+        when(sslConfig.truststore()).thenReturn(truststoreConfig);
+
+        SidecarConfiguration sidecarConfiguration = 
mockSidecarConfiguration(sslConfig);
+
+        IllegalArgumentException exception = 
org.junit.jupiter.api.Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> new SidecarClientSecretsProvider(sidecarConfiguration)
+        );
+
+        assertThat(exception.getMessage()).contains("KEYSTORE_PASSWORD");
+    }
+
+    @Test
+    void testSecretsProviderWithBothKeystoreAndTruststore()
+    {
+        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
+            "/path/to/keystore.p12",
+            "keystorePass123",
+            "PKCS12"
+        );
+
+        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
+            "/path/to/truststore.p12",
+            "truststorePass456",
+            "PKCS12"
+        );
+
+        SslConfiguration sslConfig = mockSslConfiguration(
+            true,
+            true,
+            "REQUIRED",
+            Arrays.asList("TLS_ECDHE_RSA", "TLS_AES_256"),
+            Arrays.asList("TLSv1.2", "TLSv1.3"),
+            "30s",
+            true,
+            true
+        );
+
+        when(sslConfig.keystore()).thenReturn(keystoreConfig);
+        when(sslConfig.truststore()).thenReturn(truststoreConfig);
+
+        SidecarConfiguration sidecarConfiguration = 
mockSidecarConfiguration(sslConfig);
+
+        SecretsProvider result = new 
SidecarClientSecretsProvider(sidecarConfiguration);
+
+        assertThat(result).isNotNull();
+        assertThat(result.keyStoreType()).isEqualTo("PKCS12");
+        
assertThat(result.keyStorePassword()).isEqualTo("keystorePass123".toCharArray());
+        assertThat(result.trustStoreType()).isEqualTo("PKCS12");
+        
assertThat(result.trustStorePassword()).isEqualTo("truststorePass456".toCharArray());
+    }
+
+    @Test
+    void testSecretsProviderUsesCorrectSslConfigKeys()
+    {
+        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
+            "/path/to/keystore.jks",
+            "keystorePassword",
+            "JKS"
+        );
+
+        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
+            "/path/to/truststore.jks",
+            "truststorePassword",
+            "PKCS12"
+        );
+
+        SslConfiguration sslConfig = mockSslConfiguration(
+            true,
+            false,
+            "REQUIRED",
+            Collections.emptyList(),
+            Arrays.asList("TLSv1.2"),
+            "10s",
+            true,
+            true
+        );
+
+        when(sslConfig.keystore()).thenReturn(keystoreConfig);
+        when(sslConfig.truststore()).thenReturn(truststoreConfig);
+
+        SidecarConfiguration sidecarConfiguration = 
mockSidecarConfiguration(sslConfig);
+
+        SecretsProvider result = new 
SidecarClientSecretsProvider(sidecarConfiguration);
+
+        assertThat(result).isNotNull();
+        assertThat(result).isInstanceOf(SidecarClientSecretsProvider.class);
+
+        assertThat(result.keyStoreType()).isEqualTo("JKS");
+        
assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray());
+
+        assertThat(result.trustStoreType()).isEqualTo("PKCS12");
+        
assertThat(result.trustStorePassword()).isEqualTo("truststorePassword".toCharArray());
+    }
+
+    private SidecarConfiguration mockSidecarConfiguration(SslConfiguration 
sslConfiguration)
+    {
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS);
+        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfiguration);
+        return sidecarConfiguration;
+    }
+
+    private SslConfiguration mockSslConfiguration(boolean enabled,
+                                                   boolean preferOpenSSL,
+                                                   String clientAuth,
+                                                   java.util.List<String> 
cipherSuites,
+                                                   java.util.List<String> 
secureTransportProtocols,
+                                                   String handshakeTimeout,
+                                                   boolean keystoreConfigured,
+                                                   boolean 
truststoreConfigured)
+    {
+        SslConfiguration sslConfig = mock(SslConfiguration.class, 
RETURNS_DEEP_STUBS);
+        when(sslConfig.enabled()).thenReturn(enabled);
+        when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL);
+        when(sslConfig.clientAuth()).thenReturn(clientAuth);
+        when(sslConfig.cipherSuites()).thenReturn(cipherSuites);
+        
when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols);
+
+        SecondBoundConfiguration durationSpec = 
mock(SecondBoundConfiguration.class);
+        when(durationSpec.toString()).thenReturn(handshakeTimeout);
+        when(sslConfig.handshakeTimeout()).thenReturn(durationSpec);
+
+        when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured);
+        
when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured);
+
+        return sslConfig;
+    }
+
+    private KeyStoreConfiguration mockKeystoreConfiguration(String path, 
String password, String type)
+    {
+        KeyStoreConfiguration config = mock(KeyStoreConfiguration.class);
+        when(config.path()).thenReturn(path);
+        when(config.password()).thenReturn(password);
+        when(config.type()).thenReturn(type);
+        return config;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to