This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfface1b CASSANALYTICS-142 : SidecarCdcClient should be passed as a
constructor parameter for SidecarCdc to avoid thread/resource leaks (#188)
dfface1b is described below
commit dfface1be7a0d70e745b6e0732e681bde237508c
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Mar 31 14:27:41 2026 -0700
CASSANALYTICS-142 : SidecarCdcClient should be passed as a constructor
parameter for SidecarCdc to avoid thread/resource leaks (#188)
Patch by Jyothsna Konisa; Reviewed by Yifan Cai for CASSANALYTICS-142
---
CHANGES.txt | 1 +
.../apache/cassandra/cdc/sidecar/SidecarCdc.java | 33 ++++++++++----
.../cassandra/cdc/sidecar/SidecarCdcBuilder.java | 50 +---------------------
.../cassandra/cdc/sidecar/SidecarCdcClient.java | 43 +++++++++++++++++--
.../cassandra/cdc/sidecar/SidecarCdcTest.java | 17 ++------
5 files changed, 69 insertions(+), 75 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 35afa6cd..ce89887d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Pass SidecarCdcClient as a constructor parameter to avoid thread/resource
leaks (CASSANALYTICS-142)
* Support extended deletion time in CDC for Cassandra 5.0
* Flush event consumer before persisting CDC state to prevent data loss on
failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error
termination in BufferingCommitLogReader (CASSANALYTICS-129)
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
index e5ac33b6..f7e0513c 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.cdc.sidecar;
-import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
@@ -30,7 +29,6 @@ import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.api.TokenRangeSupplier;
import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.utils.FutureUtils;
@@ -50,6 +48,27 @@ public class SidecarCdc extends Cdc
initSchema();
}
+ /**
+ * Creates a new {@link SidecarCdcBuilder} pre-configured with the
supplied parameters.
+ *
+ * <p><b>Lifecycle of {@code sidecarCdcClient}:</b> the supplied {@link
SidecarCdcClient} is treated as
+ * an externally managed singleton. Neither the returned builder nor the
{@link SidecarCdc} instance it
+ * produces will close the client. The caller is solely responsible for
closing the
+ * {@code SidecarCdcClient} (e.g. during application shutdown) to release
underlying resources such as
+ * thread pools and HTTP connections.
+ *
+ * @param jobId unique identifier for the CDC job
+ * @param partitionId partition index within the job
+ * @param cdcOptions CDC processing options
+ * @param clusterConfigProvider provider for cluster configuration (e.g.
datacenter, hosts)
+ * @param eventConsumer consumer that receives CDC change events
+ * @param schemaSupplier supplier for CDC-enabled table schemas
+ * @param tokenRangeSupplier supplier for the token ranges assigned to
this partition
+ * @param sidecarCdcClient externally managed Sidecar HTTP client;
<em>not</em> closed by
+ * {@code SidecarCdc} or {@code
SidecarCdcBuilder}
+ * @param cdcStats CDC statistics collector
+ * @return a new {@link SidecarCdcBuilder}
+ */
public static SidecarCdcBuilder builder(@NotNull String jobId,
int partitionId,
CdcOptions cdcOptions,
@@ -57,10 +76,8 @@ public class SidecarCdc extends Cdc
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier
tokenRangeSupplier,
- CdcSidecarInstancesProvider
sidecarInstancesProvider,
- SidecarCdcClient.ClientConfig
clientConfig,
- SecretsProvider secretsProvider,
- ICdcStats cdcStats) throws
IOException
+ SidecarCdcClient sidecarCdcClient,
+ ICdcStats cdcStats)
{
return new SidecarCdcBuilder(jobId,
partitionId,
@@ -69,9 +86,7 @@ public class SidecarCdc extends Cdc
eventConsumer,
schemaSupplier,
tokenRangeSupplier,
- sidecarInstancesProvider,
- clientConfig,
- secretsProvider,
+ sidecarCdcClient,
cdcStats);
}
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
index ecc5fa93..4027db2d 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
@@ -19,22 +19,14 @@
package org.apache.cassandra.cdc.sidecar;
-import java.io.IOException;
-import java.util.stream.Collectors;
-
import com.google.common.base.Preconditions;
-import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
-import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.cdc.CdcBuilder;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.api.TokenRangeSupplier;
import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.secrets.SecretsProvider;
-import o.a.c.sidecar.client.shaded.client.SidecarClient;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.jetbrains.annotations.NotNull;
@@ -56,42 +48,12 @@ public class SidecarCdcBuilder extends CdcBuilder
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
- CdcSidecarInstancesProvider sidecarInstancesProvider,
- SidecarCdcClient.ClientConfig clientConfig,
- SecretsProvider secretsProvider,
- ICdcStats cdcStats) throws IOException
- {
- this(
- jobId,
- partitionId,
- cdcOptions,
- clusterConfigProvider,
- eventConsumer,
- schemaSupplier,
- tokenRangeSupplier,
- clientConfig,
- Sidecar.from(new
SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream()
-
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
-
.collect(Collectors.toList())),
- clientConfig.toGenericSidecarConfig(), secretsProvider),
- cdcStats
- );
- }
-
- SidecarCdcBuilder(@NotNull String jobId,
- int partitionId,
- CdcOptions cdcOptions,
- ClusterConfigProvider clusterConfigProvider,
- EventConsumer eventConsumer,
- SchemaSupplier schemaSupplier,
- TokenRangeSupplier tokenRangeSupplier,
- SidecarCdcClient.ClientConfig clientConfig,
- SidecarClient sidecarClient,
+ SidecarCdcClient sidecarCdcClient,
ICdcStats cdcStats)
{
super(jobId, partitionId, eventConsumer, schemaSupplier);
this.clusterConfigProvider = clusterConfigProvider;
- this.sidecarCdcClient = new SidecarCdcClient(clientConfig,
sidecarClient, cdcStats);
+ this.sidecarCdcClient = sidecarCdcClient;
withCdcOptions(cdcOptions);
withTokenRangeSupplier(tokenRangeSupplier);
}
@@ -108,14 +70,6 @@ public class SidecarCdcBuilder extends CdcBuilder
return this;
}
- public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig
clientConfig,
- SidecarClient sidecarClient,
- ICdcStats cdcStats)
- {
- this.sidecarCdcClient = new SidecarCdcClient(clientConfig,
sidecarClient, cdcStats);
- return this;
- }
-
public SidecarCdcBuilder
withReplicationFactorSupplier(ReplicationFactorSupplier
replicationFactorSupplier)
{
this.replicationFactorSupplier = replicationFactorSupplier;
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
index 2dc98803..d8aa09ef 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.cdc.sidecar;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
@@ -32,8 +33,11 @@ import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.secrets.SecretsProvider;
import o.a.c.sidecar.client.shaded.client.SidecarClient;
import o.a.c.sidecar.client.shaded.client.SidecarInstance;
+import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
+import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
import o.a.c.sidecar.client.shaded.client.StreamBuffer;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
@@ -52,21 +56,52 @@ import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MILLIS_TO_SLEE
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;
-public class SidecarCdcClient
+public class SidecarCdcClient implements AutoCloseable
{
final ClientConfig config;
final SidecarClient sidecarClient;
final ICdcStats stats;
- public SidecarCdcClient(ClientConfig config,
- SidecarClient sidecarClient,
- ICdcStats stats)
+ public SidecarCdcClient(ClientConfig clientConfig,
+ CdcSidecarInstancesProvider instancesProvider,
+ SecretsProvider secretsProvider,
+ ICdcStats cdcStats) throws IOException
+ {
+ this(clientConfig,
+ Sidecar.from(new
SimpleSidecarInstancesProvider(instancesProvider.instances()
+
.stream()
+
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
+
.collect(Collectors.toList())),
+ clientConfig.toGenericSidecarConfig(),
+ secretsProvider),
+ cdcStats);
+ }
+
+ private SidecarCdcClient(ClientConfig config,
+ SidecarClient sidecarClient,
+ ICdcStats stats)
{
this.config = config;
this.sidecarClient = sidecarClient;
this.stats = stats;
}
+ /**
+ * Closes the underlying {@link SidecarClient} and releases associated
resources (e.g. thread pools,
+ * HTTP connections).
+ *
+ * <p>{@code SidecarCdcClient} is intended to be used as a singleton whose
lifecycle is managed by the
+ * enclosing component. Callers should not create per-request instances;
instead, a single instance
+ * should be constructed at startup and closed during shutdown to avoid
thread and resource leaks.
+ *
+ * @throws Exception if the underlying client throws while closing
+ */
+ @Override
+ public void close() throws Exception
+ {
+ sidecarClient.close();
+ }
+
public CompletableFuture<List<CommitLog>>
listCdcCommitLogSegments(CassandraInstance instance)
{
return sidecarClient.listCdcSegments(toSidecarInstance(instance))
diff --git
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
index 3ad306d8..b8913492 100644
---
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
+++
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.cdc.sidecar;
import org.junit.jupiter.api.Test;
-import o.a.c.sidecar.client.shaded.client.SidecarClient;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
@@ -46,8 +45,7 @@ public class SidecarCdcTest
EventConsumer eventConsumer = mock(EventConsumer.class);
SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
- SidecarCdcClient.ClientConfig clientConfig =
SidecarCdcClient.ClientConfig.create();
- SidecarClient mockSidecarClient = mock(SidecarClient.class);
+ SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class);
ICdcStats cdcStats = mock(ICdcStats.class);
SidecarCdcBuilder builder = new SidecarCdcBuilder(
@@ -58,22 +56,13 @@ public class SidecarCdcTest
eventConsumer,
schemaSupplier,
tokenRangeSupplier,
- clientConfig,
- mockSidecarClient,
+ mockSidecarCdcClient,
cdcStats
);
- // Verify the builder is properly created and configured
assertThat(builder).isNotNull();
assertThat(builder).isInstanceOf(SidecarCdcBuilder.class);
-
- // Verify the builder has the cluster config provider set
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);
-
- // Verify the builder has a sidecar CDC client configured
- assertThat(builder.sidecarCdcClient).isNotNull();
-
assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient);
- assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig);
- assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats);
+ assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]