This is an automated email from the ASF dual-hosted git repository.

jyothsnakonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git

commit 4c09917c0b630e3f1917eac0bb81dee30e12cbca
Author: jkonisa <[email protected]>
AuthorDate: Mon May 4 11:41:24 2026 -0700

    CASSSIDECAR-455: Fix breaking changes for Analytics 0.4.0
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-455
---
 CHANGES.txt                                        |   1 +
 gradle.properties                                  |   2 +-
 integration-framework/build.gradle                 |   1 +
 .../sidecar/testing/TestCdcPublisher.java          |  50 ++--
 .../cassandra/sidecar/cdc/CdcIntegrationTest.java  |   3 +
 ...SharedClusterCdcSidecarIntegrationTestBase.java |  49 ++--
 server/build.gradle                                |   4 +
 .../cassandra/bridge/CassandraBridgeFactory.java   |  18 +-
 .../cassandra/sidecar/cdc/CachingSchemaStore.java  |  63 +++--
 .../cassandra/sidecar/cdc/CdcAvroSerializer.java   |  42 ---
 .../apache/cassandra/sidecar/cdc/CdcConfig.java    |  16 ++
 .../apache/cassandra/sidecar/cdc/CdcPublisher.java | 199 +++++++--------
 .../cassandra/sidecar/cdc/SidecarCdcOptions.java   |   9 +
 .../cassandra/sidecar/modules/CdcModule.java       | 104 ++++++--
 .../cassandra/sidecar/cdc/CdcPublisherTests.java   | 282 +++------------------
 15 files changed, 354 insertions(+), 489 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 90c2f10d..78c4e547 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455)
  * Support IAM instance profile credentials for S3 restore jobs 
(CASSSIDECAR-415)
  * Adding endpoint for verifying files post data copy during live migration 
(CASSSIDECAR-226)
  * SAI support in Sidecar (CASSSIDECAR-422)
diff --git a/gradle.properties b/gradle.properties
index e7822d72..e9157f44 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -46,5 +46,5 @@ swaggerVersion=2.2.21
 kryoVersion=4.0.2
 # OSHI dependencies
 oshiVersion=6.9.0
-analyticsVersion=0.3.0
+analyticsVersion=0.4.0
 kafkaClientVersion=3.7.0
diff --git a/integration-framework/build.gradle 
b/integration-framework/build.gradle
index 29b9c942..e6edbe1c 100644
--- a/integration-framework/build.gradle
+++ b/integration-framework/build.gradle
@@ -70,6 +70,7 @@ dependencies {
 
     // CDC dependencies
     api(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc_spark3_2.12", version: "${project.analyticsVersion}")
+    api(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc-codec_spark3_2.12", version: 
"${project.analyticsVersion}")
     api(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc-sidecar_spark3_2.12", version: 
"${project.analyticsVersion}")
     api "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}"
 }
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
index 4388c3e6..a47135ff 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
@@ -19,24 +19,26 @@ package org.apache.cassandra.sidecar.testing;
 
 import com.google.inject.Provider;
 import io.vertx.core.Vertx;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
-import org.apache.cassandra.cdc.msg.CdcEvent;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.kafka.KafkaProducerFactory;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.sidecar.cdc.CachingSchemaStore;
 import org.apache.cassandra.sidecar.cdc.CdcConfig;
 import org.apache.cassandra.sidecar.cdc.CdcPublisher;
 import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
-import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.coordination.RangeManager;
 import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
 import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-import org.apache.kafka.common.serialization.Serializer;
+
+import static org.mockito.Mockito.mock;
 
 /**
  * Test implementation of CdcPublisher that uses an in-memory event consumer
@@ -49,30 +51,32 @@ public class TestCdcPublisher extends CdcPublisher
     private final CdcDatabaseAccessor databaseAccessor;
 
     public TestCdcPublisher(Vertx vertx,
-                           SidecarConfiguration sidecarConfiguration,
-                           ExecutorPools executorPools,
-                           ClusterConfigProvider clusterConfigProvider,
-                           SchemaSupplier schemaSupplier,
-                           CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                           SidecarCdcClient.ClientConfig clientConfig,
-                           InstanceMetadataFetcher instanceMetadataFetcher,
-                           CdcConfig conf,
-                           CdcDatabaseAccessor databaseAccessor,
-                           ICdcStats cdcStats,
-                           VirtualTablesDatabaseAccessor virtualTables,
-                           SidecarCdcStats sidecarCdcStats,
-                           Serializer<CdcEvent> avroSerializer,
-                           Provider<RangeManager> rangeManagerProvider)
+                            ExecutorPools executorPools,
+                            ClusterConfigProvider clusterConfigProvider,
+                            SchemaSupplier schemaSupplier,
+                            InstanceMetadataFetcher instanceMetadataFetcher,
+                            CdcConfig conf,
+                            CdcDatabaseAccessor databaseAccessor,
+                            ICdcStats cdcStats,
+                            VirtualTablesDatabaseAccessor virtualTables,
+                            SidecarCdcStats sidecarCdcStats,
+                            Provider<RangeManager> rangeManagerProvider,
+                            CassandraBridgeFactory cassandraBridgeFactory,
+                            Provider<SidecarCdcClient> 
sidecarCdcClientProvider,
+                            CdcOptions cdcOptions)
     {
-        super(vertx, sidecarConfiguration, executorPools, 
clusterConfigProvider,
-              schemaSupplier, sidecarInstancesProvider, clientConfig,
-              instanceMetadataFetcher, conf, databaseAccessor, cdcStats,
-              virtualTables, sidecarCdcStats, avroSerializer, 
rangeManagerProvider);
+        super(vertx, executorPools, clusterConfigProvider,
+              schemaSupplier, instanceMetadataFetcher, conf, databaseAccessor, 
cdcStats,
+              virtualTables, sidecarCdcStats, rangeManagerProvider,
+              cassandraBridgeFactory, sidecarCdcClientProvider,
+              mock(CachingSchemaStore.class),
+              mock(KafkaProducerFactory.class),
+              cdcOptions);
         this.databaseAccessor = databaseAccessor;
     }
 
     @Override
-    public EventConsumer eventConsumer(CdcConfig conf, Serializer<CdcEvent> 
avroSerializer)
+    public EventConsumer eventConsumer(CdcConfig conf)
     {
         return testEventConsumer;
     }
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
index 961a5224..ed228c2b 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
@@ -75,6 +75,9 @@ public class CdcIntegrationTest extends 
SharedClusterCdcSidecarIntegrationTestBa
             expectedMutations.put(i, i);
         }
 
+        // Seal the active commit log segment so CDC can find the mutations in 
cdc_raw
+        cluster.getFirstRunningInstance().flush(CDC_TEST_TABLE.keyspace());
+
         TestCdcEventConsumer consumer = getTestEventConsumer();
         waitUntil(() -> consumer.getEvents().size() >= mutationCount, 120, 
1000);
         assertThat(consumer.getEvents().size())
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
index 56b68df2..f1f0c71e 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
@@ -24,12 +24,13 @@ import java.util.function.Function;
 import org.junit.jupiter.api.AfterEach;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import io.vertx.core.Vertx;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
-import org.apache.cassandra.cdc.msg.CdcEvent;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.ICdcStats;
@@ -41,7 +42,6 @@ import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
@@ -52,7 +52,6 @@ import 
org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
-import org.apache.kafka.common.serialization.Serializer;
 
 import static org.assertj.core.api.Assumptions.assumeThat;
 
@@ -61,11 +60,23 @@ import static org.assertj.core.api.Assumptions.assumeThat;
  * CDC-specific configuration and setup, including:
  * - CDC-enabled Cassandra cluster configuration
  * - TestCdcPublisher with TestCdcEventConsumer
- * - Cassandra 4.1 version requirement
+ * - Cassandra 4.0 through 5.0 version support (analytics 0.4.0 does not 
support 5.1+)
  * - Helper methods to access CDC components
  */
 public abstract class SharedClusterCdcSidecarIntegrationTestBase extends 
SharedClusterIntegrationTestBase
 {
+    // Analytics 0.4.0 supports up to Cassandra 5.0 (majorVersion=50). 
Cassandra 5.1+ requires a newer analytics version.
+    private static final SimpleCassandraVersion MAX_SUPPORTED_CDC_VERSION = 
SimpleCassandraVersion.create("5.0.99");
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        SimpleCassandraVersion version = 
SimpleCassandraVersion.create(testVersion.version());
+        assumeThat(version)
+        .as("CDC is not supported for Cassandra %s; analytics 0.4.0 supports 
up to 5.0.x", version)
+        .isLessThanOrEqualTo(MAX_SUPPORTED_CDC_VERSION);
+    }
+
     @AfterEach
     void cleanupCdcConsumerAfterEachTest()
     {
@@ -80,16 +91,6 @@ public abstract class 
SharedClusterCdcSidecarIntegrationTestBase extends SharedC
         }
     }
 
-    @Override
-    protected void beforeClusterProvisioning()
-    {
-        // The current CDC implementation cannot read 5.x commitlogs, so 
verify Cassandra version is 4.x
-        SimpleCassandraVersion version = 
SimpleCassandraVersion.create(testVersion.version());
-        assumeThat(version.major)
-                .as("Current CDC implementation cannot read 5.x commitlogs, 
requires Cassandra 4.x")
-                .isEqualTo(4);
-    }
-
     @Override
     protected ClusterBuilderConfiguration testClusterConfiguration()
     {
@@ -154,37 +155,35 @@ public abstract class 
SharedClusterCdcSidecarIntegrationTestBase extends SharedC
         @Provides
         @Singleton
         CdcPublisher cdcPublisher(Vertx vertx,
-                                  SidecarConfiguration sidecarConfiguration,
                                   ExecutorPools executorPools,
                                   ClusterConfigProvider clusterConfigProvider,
                                   SchemaSupplier schemaSupplier,
-                                  CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                                  SidecarCdcClient.ClientConfig clientConfig,
                                   InstanceMetadataFetcher 
instanceMetadataFetcher,
                                   CdcConfig conf,
                                   CdcDatabaseAccessor databaseAccessor,
                                   ICdcStats cdcStats,
                                   VirtualTablesDatabaseAccessor virtualTables,
                                   SidecarCdcStats sidecarCdcStats,
-                                  Serializer<CdcEvent> avroSerializer,
-                                  TokenRingProvider tokenRingProvider)
+                                  TokenRingProvider tokenRingProvider,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory,
+                                  Provider<SidecarCdcClient> 
sidecarCdcClientProvider,
+                                  CdcOptions cdcOptions)
         {
             RangeManager rangeManager = new ContentionFreeRangeManager(vertx, 
tokenRingProvider);
             return new TestCdcPublisher(vertx,
-                                       sidecarConfiguration,
                                        executorPools,
                                        clusterConfigProvider,
                                        schemaSupplier,
-                                       sidecarInstancesProvider,
-                                       clientConfig,
                                        instanceMetadataFetcher,
                                        conf,
                                        databaseAccessor,
                                        cdcStats,
                                        virtualTables,
                                        sidecarCdcStats,
-                                       avroSerializer,
-                                       () -> rangeManager);
+                                       () -> rangeManager,
+                                       cassandraBridgeFactory,
+                                       sidecarCdcClientProvider,
+                                       cdcOptions);
         }
 
         @Provides
diff --git a/server/build.gradle b/server/build.gradle
index 8b2ae702..ba71370c 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -151,6 +151,10 @@ dependencies {
     implementation(group: "org.apache.cassandra", name: 
"cassandra-avro-converter_spark3_2.12", version: 
"${[project.analyticsVersion]}")
     implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}")
     implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc-sidecar_spark3_2.12", version: 
"${[project.analyticsVersion]}")
+    // compileOnly: CdcModule directly instantiates SidecarCdcClient, 
requiring sidecar-client types at
+    // compile time. The jar is already on the runtime classpath transitively 
via cdc-sidecar, so
+    // implementation would duplicate it.
+    compileOnly(group: "org.apache.cassandra", name: 
"cassandra-analytics-sidecar-client", version: "${[project.analyticsVersion]}")
 
     implementation 
"org.apache.kafka:kafka-clients:${project.kafkaClientVersion}"
     implementation "com.esotericsoftware:kryo-shaded:${kryoVersion}"
diff --git 
a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java 
b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
index 9c99430a..61b88431 100644
--- 
a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
+++ 
b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
@@ -21,11 +21,8 @@ package org.apache.cassandra.bridge;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -119,21 +116,8 @@ public class CassandraBridgeFactory
 
     public ClassLoader buildClassLoader(String... resourceNames)
     {
-        URL[] urls = Arrays.stream(resourceNames)
-                           
.map(BaseCassandraBridgeFactory::copyClassResourceToFile)
-                           .map(jar -> {
-                               try
-                               {
-                                   return jar.toURI().toURL();
-                               }
-                               catch (MalformedURLException e)
-                               {
-                                   throw new RuntimeException(e);
-                               }
-                           }).toArray(URL[]::new);
-
         return AccessController.doPrivileged((PrivilegedAction<ClassLoader>) 
() ->
-                                                                             
new PostDelegationClassLoader(urls, 
Thread.currentThread().getContextClassLoader()));
+                BaseCassandraBridgeFactory.buildClassLoader(resourceNames));
     }
 
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
index 8795d2ed..e57aa4e9 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
@@ -39,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 
+import org.apache.cassandra.cdc.avro.AvroSchemaUtils;
 import org.apache.cassandra.cdc.avro.AvroSchemas;
 import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
 import org.apache.cassandra.cdc.kafka.KafkaOptions;
@@ -56,8 +57,21 @@ import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
 
 /**
- * Schemas cache to be used by CDC event serialization. It contains a map of 
table schemas
- * using TableIdentifier as key.
+ * Schemas cache for CDC event serialization, keyed by TableIdentifier.
+ *
+ * <p>{@link #getSchema}, {@link #getWriter}, and {@link #getReader} all 
return the
+ * <em>payload</em> (table-column) Avro schema — not the CDC envelope. Every 
caller in the
+ * analytics codec uses these to build and encode per-row payload records 
(column field lookup,
+ * range predicate encoding, byte-record encoding). Returning the merged 
envelope schema here
+ * would break those callers because table columns are not top-level fields of 
the envelope.
+ *
+ * <p>The merged CDC envelope schema is constructed on-the-fly in {@link 
#publishSchemas} and
+ * is never stored in the cache. The sidecar remains the authoritative source 
for building the
+ * merged schema (via {@link #buildMergedSchema}); it is just not exposed 
through
+ * {@link SchemaStore#getSchema}.
+ *
+ * <p>Schema version UUIDs ({@link #getVersion}) are derived from the CQL 
{@code CREATE TABLE}
+ * statement, consistent with the analytics {@code CachingSchemaStore} 
implementation.
  */
 @Singleton
 public class CachingSchemaStore implements SchemaStore
@@ -91,13 +105,17 @@ public class CachingSchemaStore implements SchemaStore
         this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
         this.sidecarSchema = sidecarSchema;
         this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
-        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
-        AvroSchemas.registerLogicalTypes();
-        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
         this.vertx = vertx;
         this.cdcConfig = cdcConfig;
         this.sidecarCdcStats = sidecarCdcStats;
         this.schemaStorePublisherFactory = schemaStorePublisherFactory;
+        // cdcConfig and schemaStorePublisherFactory must be assigned before 
the calls below.
+        // createSchemaCache and addSchemaChangeListener can fire 
onSchemaChanged synchronously,
+        // which calls loadPublisher() — accessing both fields — and would 
throw NPE if they
+        // were still null at that point.
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
 
         if (cdcConfig.cdcEnabled())
         {
@@ -146,20 +164,32 @@ public class CachingSchemaStore implements SchemaStore
             TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
             avroSchemasCache.compute(tableIdentifier, (k, v) ->
             {
+                Schema payloadSchema = 
cqlToAvroSchemaConverter.convert(cqlTable);
                 if (null != publisher)
                 {
-                    Schema schema = cqlToAvroSchemaConverter.convert(cqlTable);
+                    // Merged schema is only needed for Kafka publishing — 
build on-the-fly,
+                    // not stored in the cache entry 
(getSchema/getWriter/getReader use payload).
+                    Schema mergedSchema = buildMergedSchema(payloadSchema, 
cqlTable);
                     TableSchemaPublisher.SchemaPublishMetadata metadata = new 
TableSchemaPublisher.SchemaPublishMetadata();
                     metadata.put(METADATA_NAME_KEY, cqlTable.table());
                     metadata.put(METADATA_NAMESPACE_KEY, cqlTable.keyspace());
-                    publisher.publishSchema(schema.toString(false), metadata);
+                    publisher.publishSchema(mergedSchema.toString(false), 
metadata);
                     sidecarCdcStats.capturePublishedSchema();
                 }
-                return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                return new SchemaCacheEntry(cqlTable, payloadSchema);
             });
         }
     }
 
+    private Schema buildMergedSchema(Schema payloadSchema, CqlTable cqlTable)
+    {
+        String prefix = cdcConfig.schemaNamespacePrefix();
+        String namespace = prefix.isEmpty()
+                           ? cqlTable.keyspace()
+                           : prefix + '.' + cqlTable.keyspace();
+        return AvroSchemaUtils.buildMergedSchema(payloadSchema, 
cqlTable.table(), namespace);
+    }
+
     @VisibleForTesting
     void onSchemaChanged()
     {
@@ -176,7 +206,7 @@ public class CachingSchemaStore implements SchemaStore
                         
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
                     }
                     LOGGER.info("Re-generating Avro Schema after schema change 
keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table());
-                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter.convert(cqlTable));
                 }
                 return v;
             });
@@ -186,7 +216,9 @@ public class CachingSchemaStore implements SchemaStore
         // Remove any old schema entries for deleted tables, this operation 
can be done in the end as this is
         // only for removing stale entries and no one is going to use these 
entries once the table is removed.
         // This doesn't have to be an atomic operation.
-        
avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream().map(cqlTable -> 
TableIdentifier.of(cqlTable.keyspace(), 
cqlTable.table())).collect(Collectors.toList()));
+        avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream()
+                                                               .map(cqlTable 
-> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()))
+                                                               
.collect(Collectors.toList()));
         vertx.eventBus().publish(ON_CDC_CACHE_WARMED_UP.address(), "Cdc cache 
warmed up");
     }
 
@@ -236,7 +268,7 @@ public class CachingSchemaStore implements SchemaStore
 
         return cdcTables.stream()
                         .collect(Collectors.toMap(cqlTable -> 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()),
-                                                  cqlTable -> new 
SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter))
+                                                  cqlTable -> new 
SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter.convert(cqlTable)))
                         );
     }
 
@@ -254,14 +286,13 @@ public class CachingSchemaStore implements SchemaStore
         private final GenericDatumWriter<GenericRecord> writer;
         private final GenericDatumReader<GenericRecord> reader;
 
-        private SchemaCacheEntry(CqlTable table,
-                                 CqlToAvroSchemaConverter 
cqlToAvroSchemaConverter)
+        private SchemaCacheEntry(CqlTable table, Schema payloadSchema)
         {
             this.table = table;
-            this.schema = cqlToAvroSchemaConverter.convert(table);
+            this.schema = payloadSchema;
             this.schemaUuid = 
UUID.nameUUIDFromBytes(table.createStatement().getBytes(StandardCharsets.UTF_8)).toString();
-            this.writer = new GenericDatumWriter<>(schema);
-            this.reader = new GenericDatumReader<>(schema);
+            this.writer = new GenericDatumWriter<>(payloadSchema);
+            this.reader = new GenericDatumReader<>(payloadSchema);
         }
 
         public String tableSchema()
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java
deleted file mode 100644
index 1acc7c4b..00000000
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.cdc;
-
-import org.apache.cassandra.bridge.CassandraBridgeFactory;
-import org.apache.cassandra.cdc.TypeCache;
-import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer;
-import org.apache.cassandra.cdc.schemastore.SchemaStore;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-
-/**
- * Serializer to convert Cassandra CDC events into Avro GenericRecord objects.
- */
-public class CdcAvroSerializer extends AvroGenericRecordSerializer
-{
-    public CdcAvroSerializer(SchemaStore schemaStore,
-                             InstanceMetadataFetcher instanceMetadataFetcher,
-                             CassandraBridgeFactory cassandraBridgeFactory)
-    {
-        super(schemaStore, key ->
-                           TypeCache.get(cassandraBridgeFactory
-                                         
.get(instanceMetadataFetcher.callOnFirstAvailableInstance(instance->
-                                                                               
                    
instance.delegate().nodeSettings()).releaseVersion()).getVersion())
-                                    .getType(key.keyspace, key.type), 
"org.apache.cassandra");
-    }
-}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
index 776d494b..18014d8d 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
@@ -138,6 +138,22 @@ public interface CdcConfig
      */
     MillisecondBoundConfiguration persistDelay();
 
+    /**
+     * Returns the namespace prefix used when building Avro schemas for CDC 
events.
+     * This is an extension point for schema registry integration: 
implementations that
+     * register schemas with a shared registry should override this to return 
a globally
+     * unique prefix so that schema names do not collide across clusters or 
services.
+     * The prefix is combined with the Cassandra keyspace to form the Avro 
schema namespace:
+     * {@code <prefix>.<keyspace>}.
+     *
+     * <p>Defaults to empty string, in which case the keyspace name is used 
directly as
+     * the Avro namespace.
+     */
+    default String schemaNamespacePrefix()
+    {
+        return "";
+    }
+
     /**
      * Topic format
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
index 75aa6667..2650bd7d 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
@@ -18,14 +18,9 @@
 
 package org.apache.cassandra.sidecar.cdc;
 
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -33,35 +28,29 @@ import io.vertx.core.Handler;
 import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.TypeCache;
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaProducerFactory;
 import org.apache.cassandra.cdc.kafka.KafkaPublisher;
 import org.apache.cassandra.cdc.kafka.TopicSupplier;
-import org.apache.cassandra.cdc.msg.CdcEvent;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
-import org.apache.cassandra.cdc.sidecar.SidecarCdc;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
-import org.apache.cassandra.secrets.SslConfig;
-import org.apache.cassandra.secrets.SslConfigSecretsProvider;
 import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SslConfiguration;
 import org.apache.cassandra.sidecar.coordination.RangeManager;
 import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.Serializer;
 
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
@@ -85,50 +74,51 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
     private final VirtualTablesDatabaseAccessor virtualTables;
     private final SidecarCdcStats sidecarCdcStats;
     private final SchemaSupplier schemaSupplier;
-    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
     private final InstanceMetadataFetcher instanceMetadataFetcher;
     private final ClusterConfigProvider clusterConfigProvider;
-    private final SidecarCdcClient.ClientConfig clientConfig;
     private final ICdcStats cdcStats;
-    private final SidecarConfiguration sidecarConfiguration;
     private CdcManager cdcManager;
-    private final Serializer<CdcEvent> avroSerializer;
     private final Provider<RangeManager> rangeManagerProvider;
-    KafkaProducer<String, byte[]> producer;
-    KafkaPublisher kafkaPublisher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+    private final CachingSchemaStore schemaStore;
+    private KafkaPublisher<?> kafkaPublisher;
+    private final Provider<SidecarCdcClient> sidecarCdcClientProvider;
+    private final KafkaProducerFactory kafkaProducerFactory;
+    private final CdcOptions cdcOptions;
 
     @Inject
     public CdcPublisher(Vertx vertx,
-                        SidecarConfiguration sidecarConfiguration,
                         ExecutorPools executorPools,
                         ClusterConfigProvider clusterConfigProvider,
                         SchemaSupplier schemaSupplier,
-                        CdcSidecarInstancesProvider sidecarInstancesProvider,
-                        SidecarCdcClient.ClientConfig clientConfig,
                         InstanceMetadataFetcher instanceMetadataFetcher,
                         CdcConfig conf,
                         CdcDatabaseAccessor databaseAccessor,
                         ICdcStats cdcStats,
                         VirtualTablesDatabaseAccessor virtualTables,
                         SidecarCdcStats sidecarCdcStats,
-                        Serializer<CdcEvent> avroSerializer,
-                        Provider<RangeManager> rangeManagerProvider)
+                        Provider<RangeManager> rangeManagerProvider,
+                        CassandraBridgeFactory cassandraBridgeFactory,
+                        Provider<SidecarCdcClient> sidecarCdcClientProvider,
+                        CachingSchemaStore schemaStore,
+                        KafkaProducerFactory kafkaProducerFactory,
+                        CdcOptions cdcOptions)
     {
         this.sidecarCdcStats = sidecarCdcStats;
         this.executorPools = executorPools.internal();
         this.conf = conf;
         this.databaseAccessor = databaseAccessor;
         this.virtualTables = virtualTables;
-
         this.schemaSupplier = schemaSupplier;
-        this.sidecarInstancesProvider = sidecarInstancesProvider;
         this.instanceMetadataFetcher = instanceMetadataFetcher;
         this.clusterConfigProvider = clusterConfigProvider;
-        this.clientConfig = clientConfig;
         this.cdcStats = cdcStats;
-        this.sidecarConfiguration = sidecarConfiguration;
-        this.avroSerializer = avroSerializer;
         this.rangeManagerProvider = rangeManagerProvider;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+        this.sidecarCdcClientProvider = sidecarCdcClientProvider;
+        this.schemaStore = schemaStore;
+        this.kafkaProducerFactory = kafkaProducerFactory;
+        this.cdcOptions = cdcOptions;
 
         if (conf.cdcEnabled())
         {
@@ -141,56 +131,23 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         }
     }
 
-    public SecretsProvider secretsProvider()
+    public EventConsumer eventConsumer(CdcConfig conf)
     {
-        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
-
-        if (sslConfiguration == null || !sslConfiguration.enabled())
-        {
-            return null;
-        }
-
-        Map<String, String> sslConfigMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
-        if (sslConfiguration.isKeystoreConfigured())
-        {
-            KeyStoreConfiguration keystore = sslConfiguration.keystore();
-            sslConfigMap.put(SslConfig.KEYSTORE_PATH, keystore.path());
-            sslConfigMap.put(SslConfig.KEYSTORE_PASSWORD, keystore.password());
-            sslConfigMap.put(SslConfig.KEYSTORE_TYPE, keystore.type());
-        }
-
-        if (sslConfiguration.isTrustStoreConfigured())
-        {
-            KeyStoreConfiguration truststore = sslConfiguration.truststore();
-            sslConfigMap.put(SslConfig.TRUSTSTORE_PATH, truststore.path());
-            sslConfigMap.put(SslConfig.TRUSTSTORE_PASSWORD, 
truststore.password());
-            sslConfigMap.put(SslConfig.TRUSTSTORE_TYPE, truststore.type());
-        }
-
-        SslConfig sslConfig = SslConfig.create(sslConfigMap);
-        return new SslConfigSecretsProvider(sslConfig);
-    }
-
-    public EventConsumer eventConsumer(CdcConfig conf,
-                                       Serializer<CdcEvent> avroSerializer)
-    {
-        if (this.producer != null)
-        {
-            this.producer.close();
-        }
-        if (this.kafkaPublisher != null)
-        {
-            this.kafkaPublisher.close();
-        }
-        this.producer = new KafkaProducer<>(conf.kafkaConfigs());
-        this.kafkaPublisher = new 
KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
-                                                           producer,
-                                                           avroSerializer,
-                                                           
conf.maxRecordSizeBytes(),
-                                                           
conf.failOnRecordTooLargeError(),
-                                                           
conf.failOnKafkaError(),
-                                                           CdcLogMode.FULL);
+        CassandraVersion version = cassandraBridgeFactory.get(
+            instanceMetadataFetcher.callOnFirstAvailableInstance(instance ->
+                instance.delegate().nodeSettings()).releaseVersion()
+        ).getVersion();
+        this.kafkaPublisher = KafkaPublisher.create(version,
+                                                    
TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
+                                                    conf.kafkaConfigs(),
+                                                    kafkaProducerFactory,
+                                                    schemaStore,
+                                                    key -> 
TypeCache.get(version).getType(key.keyspace, key.type),
+                                                    
conf.schemaNamespacePrefix(),
+                                                    conf.maxRecordSizeBytes(),
+                                                    
conf.failOnRecordTooLargeError(),
+                                                    conf.failOnKafkaError(),
+                                                    CdcLogMode.FULL);
         return new CdcEventConsumer(kafkaPublisher);
     }
 
@@ -216,27 +173,38 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         }
         databaseAccessor.session();
 
-        cdcManager = new CdcManager(eventConsumer(conf, avroSerializer),
-                                    schemaSupplier,
-                                    conf,
-                                    rangeManagerProvider.get(),
-                                    instanceMetadataFetcher,
-                                    clusterConfigProvider,
-                                    sidecarInstancesProvider,
-                                    secretsProvider(),
-                                    clientConfig,
-                                    cdcStats,
-                                    this.executorPools,
-                                    databaseAccessor);
-
-        List<SidecarCdc> consumers = cdcManager.buildCdcConsumers();
-        cdcManager.startConsumers();
-        LOGGER.info("{} CDC iterators started successfully", consumers.size());
-        isRunning = true;
-        sidecarCdcStats.captureCdcStarted(consumers.size());
+        try
+        {
+            cdcManager = new CdcManager(eventConsumer(conf),
+                                        schemaSupplier,
+                                        conf,
+                                        rangeManagerProvider.get(),
+                                        instanceMetadataFetcher,
+                                        clusterConfigProvider,
+                                        sidecarCdcClientProvider.get(),
+                                        cdcStats,
+                                        this.executorPools,
+                                        databaseAccessor,
+                                        cdcOptions);
+            int consumerCount = cdcManager.buildCdcConsumers().size();
+            cdcManager.startConsumers();
+            LOGGER.info("{} CDC iterators started successfully", 
consumerCount);
+            isRunning = true;
+            sidecarCdcStats.captureCdcStarted(consumerCount);
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to start CDC consumers, cleaning up 
resources", e);
+            if (cdcManager != null)
+            {
+                cdcManager.stopConsumers();
+            }
+            closeKafkaResources();
+            throw e;
+        }
     }
 
-    protected synchronized void restart()
+    private synchronized void restart()
     {
         try
         {
@@ -258,7 +226,7 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         return isRunning;
     }
 
-    public synchronized void stop()
+    private synchronized void stop()
     {
         if (!isRunning)
         {
@@ -279,6 +247,23 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         {
             isRunning = false;
             isInitialized = false;
+            closeKafkaResources();
+        }
+    }
+
+    private void closeKafkaResources()
+    {
+        if (kafkaPublisher != null)
+        {
+            try
+            {
+                kafkaPublisher.close();
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Error closing KafkaPublisher", e);
+            }
+            kafkaPublisher = null;
         }
     }
 
@@ -384,8 +369,16 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
     @Override
     public void execute(Promise<Void> promise)
     {
-        run();
-        promise.complete();
+        try
+        {
+            run();
+            promise.complete();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("CDC run failed", e);
+            promise.fail(e);
+        }
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java
index a2968719..10dcac16 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.cdc;
 
 import java.util.Map;
 
+import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -50,4 +51,12 @@ public class SidecarCdcOptions implements CdcOptions
     {
         return instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings().datacenter());
     }
+
+    @Override
+    public CassandraVersion version()
+    {
+        String releaseVersion = 
instanceMetadataFetcher.callOnFirstAvailableInstance(
+                instance -> 
instance.delegate().nodeSettings().releaseVersion());
+        return 
CassandraVersion.fromVersion(releaseVersion).orElse(CassandraVersion.FOURZERO);
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index fec382a4..303ce488 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -18,7 +18,15 @@
 
 package org.apache.cassandra.sidecar.modules;
 
+import java.io.IOException;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.ProvidesIntoMap;
@@ -28,24 +36,27 @@ import jakarta.ws.rs.GET;
 import jakarta.ws.rs.PUT;
 import jakarta.ws.rs.Path;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
 import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
-import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.kafka.KafkaProducerFactory;
 import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
 import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.CdcStats;
 import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.sidecar.cdc.CachingSchemaStore;
-import org.apache.cassandra.sidecar.cdc.CdcAvroSerializer;
 import org.apache.cassandra.sidecar.cdc.CdcConfig;
 import org.apache.cassandra.sidecar.cdc.CdcConfigImpl;
 import org.apache.cassandra.sidecar.cdc.CdcDynamicSidecarInstancesProvider;
 import org.apache.cassandra.sidecar.cdc.CdcLogCache;
 import org.apache.cassandra.sidecar.cdc.CdcPublisher;
 import org.apache.cassandra.sidecar.cdc.CdcSchemaSupplier;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcOptions;
 import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.cdc.SidecarClientSecretsProvider;
 import org.apache.cassandra.sidecar.cdc.SidecarClusterConfigProvider;
 import org.apache.cassandra.sidecar.cdc.SidecarCqlToAvroSchemaConverter;
 import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
@@ -58,6 +69,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
 import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
 import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
 import 
org.apache.cassandra.sidecar.coordination.DynamicSidecarInstancesProvider;
@@ -97,19 +109,22 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
 import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
-import org.apache.kafka.common.serialization.Serializer;
 import org.eclipse.microprofile.openapi.annotations.Operation;
 import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
 import org.eclipse.microprofile.openapi.annotations.media.Content;
 import org.eclipse.microprofile.openapi.annotations.media.Schema;
 import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
 
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
 /**
  * Provides Cassandra change-data capture (CDC) publishing capability
  */
 @Path("/")
 public class CdcModule extends AbstractModule
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcModule.class);
+
     @ProvidesIntoMap
     @KeyClassMapKey(PeriodicTaskMapKeys.SidecarPeerHealthMonitorTaskKey.class)
     PeriodicTask sidecarPeerHealthMonitorTask(SidecarPeerHealthMonitorTask 
task)
@@ -133,7 +148,9 @@ public class CdcModule extends AbstractModule
                                                                          
SidecarConfiguration configuration,
                                                                          
CassandraBridgeFactory cassandraBridgeFactory)
     {
-        return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, 
databaseAccessor, driverUnsupportedSchemaCache, configuration, 
cassandraBridgeFactory);
+        return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, 
databaseAccessor,
+                                                  
driverUnsupportedSchemaCache, configuration,
+                                                  cassandraBridgeFactory);
     }
 
     @ProvidesIntoMap
@@ -330,11 +347,9 @@ public class CdcModule extends AbstractModule
 
     @Provides
     @Singleton
-    public Serializer<CdcEvent> getSerializer(CachingSchemaStore schemaStore,
-                                              InstanceMetadataFetcher 
instanceMetadataFetcher,
-                                              CassandraBridgeFactory 
cassandraBridgeFactory)
+    public KafkaProducerFactory kafkaProducerFactory()
     {
-        return new CdcAvroSerializer(schemaStore, instanceMetadataFetcher, 
cassandraBridgeFactory);
+        return KafkaProducerFactory.DEFAULT;
     }
 
     @Provides
@@ -379,6 +394,50 @@ public class CdcModule extends AbstractModule
                                                     
sidecarClientConfiguration.retryDelay().toIntMillis());
     }
 
+    @Provides
+    @Singleton
+    public SidecarCdcClient sidecarCdcClient(Vertx vertx,
+                                             SidecarCdcClient.ClientConfig 
clientConfig,
+                                             CdcSidecarInstancesProvider 
cdcSidecarInstancesProvider,
+                                             @Nullable SecretsProvider 
secretsProvider,
+                                             ICdcStats cdcStats)
+    {
+        try
+        {
+            SidecarCdcClient sidecarCdcClient = new 
SidecarCdcClient(clientConfig, cdcSidecarInstancesProvider, secretsProvider, 
cdcStats);
+            vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), message 
-> {
+                try
+                {
+                    sidecarCdcClient.close();
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Error closing SidecarCdcClient", e);
+                }
+            });
+            return sidecarCdcClient;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Failed to create SidecarCdcClient", e);
+        }
+    }
+
+    @Nullable
+    @Provides
+    @Singleton
+    public SecretsProvider secretsProvider(SidecarConfiguration 
sidecarConfiguration)
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        if (sslConfiguration == null || !sslConfiguration.enabled())
+        {
+            return null;
+        }
+
+        return new SidecarClientSecretsProvider(sidecarConfiguration);
+    }
+
     @Provides
     @Singleton
     RangeManager rangeManager(Vertx vertx, TokenRingProvider tokenRingProvider)
@@ -389,36 +448,45 @@ public class CdcModule extends AbstractModule
     @Provides
     @Singleton
     CdcPublisher cdcPublisher(Vertx vertx,
-                              SidecarConfiguration sidecarConfiguration,
                               ExecutorPools executorPools,
                               ClusterConfigProvider clusterConfigProvider,
                               SchemaSupplier schemaSupplier,
-                              CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                              SidecarCdcClient.ClientConfig clientConfig,
                               InstanceMetadataFetcher instanceMetadataFetcher,
                               CdcConfig conf,
                               CdcDatabaseAccessor databaseAccessor,
                               ICdcStats cdcStats,
                               VirtualTablesDatabaseAccessor virtualTables,
                               SidecarCdcStats sidecarCdcStats,
-                              Serializer<CdcEvent> avroSerializer,
-                              RangeManager rangeManager)
+                              RangeManager rangeManager,
+                              CassandraBridgeFactory cassandraBridgeFactory,
+                              Provider<SidecarCdcClient> 
sidecarCdcClientProvider,
+                              CachingSchemaStore schemaStore,
+                              KafkaProducerFactory kafkaProducerFactory,
+                              CdcOptions cdcOptions)
     {
         return new CdcPublisher(vertx,
-                                sidecarConfiguration,
                                 executorPools,
                                 clusterConfigProvider,
                                 schemaSupplier,
-                                sidecarInstancesProvider,
-                                clientConfig,
                                 instanceMetadataFetcher,
                                 conf,
                                 databaseAccessor,
                                 cdcStats,
                                 virtualTables,
                                 sidecarCdcStats,
-                                avroSerializer,
-                                () -> rangeManager);
+                                () -> rangeManager,
+                                cassandraBridgeFactory,
+                                sidecarCdcClientProvider,
+                                schemaStore,
+                                kafkaProducerFactory,
+                                cdcOptions);
+    }
+
+    @Provides
+    @Singleton
+    public CdcOptions cdcOptions(InstanceMetadataFetcher 
instanceMetadataFetcher)
+    {
+        return new SidecarCdcOptions(instanceMetadataFetcher);
     }
 
     @Provides
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
index 4665e555..490d6352 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
@@ -18,41 +18,41 @@
 
 package org.apache.cassandra.sidecar.cdc;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.google.inject.Provider;
 import io.vertx.core.Vertx;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
-import org.apache.cassandra.cdc.msg.CdcEvent;
-import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.kafka.KafkaProducerFactory;
 import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
 import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
-import org.apache.cassandra.secrets.SslConfigSecretsProvider;
-import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SslConfiguration;
 import org.apache.cassandra.sidecar.coordination.RangeManager;
 import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -72,10 +72,6 @@ public class CdcPublisherTests
     @Mock
     private SchemaSupplier schemaSupplier;
     @Mock
-    private CdcSidecarInstancesProvider sidecarInstancesProvider;
-    @Mock
-    private SidecarCdcClient.ClientConfig clientConfig;
-    @Mock
     private InstanceMetadataFetcher instanceMetadataFetcher;
     @Mock
     private CdcDatabaseAccessor databaseAccessor;
@@ -86,11 +82,18 @@ public class CdcPublisherTests
     @Mock
     private SidecarCdcStats sidecarCdcStats;
     @Mock
-    private Serializer<CdcEvent> avroSerializer;
+    private KafkaProducerFactory kafkaProducerFactory;
+    @Mock
+    private CachingSchemaStore schemaStore;
     @Mock
     private Provider<RangeManager> rangeManager;
+    @Mock
+    private CassandraBridgeFactory cassandraBridgeFactory;
+    @Mock
+    private SidecarCdcClient sidecarCdcClient;
+    @Mock
+    private CdcOptions cdcOptions;
 
-    private SidecarConfiguration sidecarConfiguration;
     private CdcConfig cdcConfig;
     private CdcPublisher cdcPublisher;
 
@@ -99,8 +102,6 @@ public class CdcPublisherTests
     {
         MockitoAnnotations.openMocks(this);
 
-        // Mock deep stubs for complex configuration objects
-        sidecarConfiguration = mock(SidecarConfiguration.class, 
RETURNS_DEEP_STUBS);
         cdcConfig = mock(CdcConfig.class, RETURNS_DEEP_STUBS);
 
         // Mock ExecutorPools behavior
@@ -111,204 +112,22 @@ public class CdcPublisherTests
 
         cdcPublisher = new CdcPublisher(
             vertx,
-            sidecarConfiguration,
             executorPools,
             clusterConfigProvider,
             schemaSupplier,
-            sidecarInstancesProvider,
-            clientConfig,
             instanceMetadataFetcher,
             cdcConfig,
             databaseAccessor,
             cdcStats,
             virtualTables,
             sidecarCdcStats,
-            avroSerializer,
-            rangeManager
-        );
-    }
-
-
-    @Test
-    void testSecretsProviderReturnsNullWhenSslDisabled()
-    {
-        SslConfiguration sslConfig = mock(SslConfiguration.class);
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-        when(sslConfig.enabled()).thenReturn(false);
-
-        SecretsProvider result = cdcPublisher.secretsProvider();
-
-        assertThat(result).isNull();
-    }
-
-    @Test
-    void testSecretsProviderWithSslEnabledNoKeystoreNoTruststore()
-    {
-        SslConfiguration sslConfig = mockSslConfiguration(
-            true,                           // enabled
-            true,                           // preferOpenSSL
-            "REQUIRED",                     // clientAuth
-            Arrays.asList("TLS_RSA_128"),  // cipherSuites
-            Arrays.asList("TLSv1.2"),      // secureTransportProtocols
-            "10s",                          // handshakeTimeout
-            false,                          // keystoreConfigured
-            false                           // truststoreConfigured
-        );
-
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-
-        SecretsProvider result = cdcPublisher.secretsProvider();
-
-        assertThat(result).isNotNull();
-    }
-
-    @Test
-    void testSecretsProviderWithKeystoreOnly()
-    {
-        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
-            "/path/to/keystore.jks",
-            "keystorePassword",
-            "JKS"
+            rangeManager,
+            cassandraBridgeFactory,
+            () -> sidecarCdcClient,
+            schemaStore,
+            kafkaProducerFactory,
+            cdcOptions
         );
-
-        SslConfiguration sslConfig = mockSslConfiguration(
-            true,
-            false,
-            "OPTIONAL",
-            Arrays.asList("TLS_RSA_256"),
-            Arrays.asList("TLSv1.3"),
-            "15s",
-            true,
-            false
-        );
-
-        when(sslConfig.keystore()).thenReturn(keystoreConfig);
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-
-        SecretsProvider result = cdcPublisher.secretsProvider();
-
-        assertThat(result).isNotNull();
-        assertThat(result.keyStoreType()).isEqualTo("JKS");
-        
assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray());
-    }
-
-    @Test
-    void testSecretsProviderWithTruststoreOnly()
-    {
-        // SslConfig validation requires keystore password to always be 
provided
-        // This test validates that truststore-only configuration is rejected
-        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
-            "/path/to/truststore.jks",
-            "truststorePassword",
-            "PKCS12"
-        );
-
-        SslConfiguration sslConfig = mockSslConfiguration(
-            true,
-            true,
-            "NONE",
-            Collections.emptyList(),
-            Arrays.asList("TLSv1.2", "TLSv1.3"),
-            "20s",
-            false,
-            true
-        );
-
-        when(sslConfig.truststore()).thenReturn(truststoreConfig);
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-
-        // SslConfig.create() validates and requires keystore password when 
any SSL config is provided
-        IllegalArgumentException exception = 
org.junit.jupiter.api.Assertions.assertThrows(
-            IllegalArgumentException.class,
-            () -> cdcPublisher.secretsProvider()
-        );
-
-        assertThat(exception.getMessage()).contains("KEYSTORE_PASSWORD");
-    }
-
-    @Test
-    void testSecretsProviderWithBothKeystoreAndTruststore()
-    {
-        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
-            "/path/to/keystore.p12",
-            "keystorePass123",
-            "PKCS12"
-        );
-
-        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
-            "/path/to/truststore.p12",
-            "truststorePass456",
-            "PKCS12"
-        );
-
-        SslConfiguration sslConfig = mockSslConfiguration(
-            true,
-            true,
-            "REQUIRED",
-            Arrays.asList("TLS_ECDHE_RSA", "TLS_AES_256"),
-            Arrays.asList("TLSv1.2", "TLSv1.3"),
-            "30s",
-            true,
-            true
-        );
-
-        when(sslConfig.keystore()).thenReturn(keystoreConfig);
-        when(sslConfig.truststore()).thenReturn(truststoreConfig);
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-
-        SecretsProvider result = cdcPublisher.secretsProvider();
-
-        assertThat(result).isNotNull();
-        assertThat(result.keyStoreType()).isEqualTo("PKCS12");
-        
assertThat(result.keyStorePassword()).isEqualTo("keystorePass123".toCharArray());
-        assertThat(result.trustStoreType()).isEqualTo("PKCS12");
-        
assertThat(result.trustStorePassword()).isEqualTo("truststorePass456".toCharArray());
-    }
-
-    @Test
-    void testSecretsProviderUsesCorrectSslConfigKeys()
-    {
-        // This test validates that CdcPublisher uses SslConfig constants with 
MapUtils.lowerCaseKey()
-        KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration(
-            "/path/to/keystore.jks",
-            "keystorePassword",
-            "JKS"
-        );
-
-        KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration(
-            "/path/to/truststore.jks",
-            "truststorePassword",
-            "PKCS12"
-        );
-
-        SslConfiguration sslConfig = mockSslConfiguration(
-            true,
-            false,
-            "REQUIRED",
-            Collections.emptyList(),
-            Arrays.asList("TLSv1.2"),
-            "10s",
-            true,
-            true
-        );
-
-        when(sslConfig.keystore()).thenReturn(keystoreConfig);
-        when(sslConfig.truststore()).thenReturn(truststoreConfig);
-        
when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig);
-
-        SecretsProvider result = cdcPublisher.secretsProvider();
-
-        // Validate that the SecretsProvider was created successfully using 
the correct keys
-        assertThat(result).isNotNull();
-        assertThat(result).isInstanceOf(SslConfigSecretsProvider.class);
-
-        // Verify keystore configuration is accessible
-        assertThat(result.keyStoreType()).isEqualTo("JKS");
-        
assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray());
-
-        // Verify truststore configuration is accessible
-        assertThat(result.trustStoreType()).isEqualTo("PKCS12");
-        
assertThat(result.trustStorePassword()).isEqualTo("truststorePassword".toCharArray());
     }
 
     @Test
@@ -325,45 +144,20 @@ public class CdcPublisherTests
         when(cdcConfig.failOnRecordTooLargeError()).thenReturn(false);
         when(cdcConfig.failOnKafkaError()).thenReturn(true);
 
-        EventConsumer result = cdcPublisher.eventConsumer(cdcConfig, 
avroSerializer);
+        InstanceMetadata mockInstance = mock(InstanceMetadata.class, 
RETURNS_DEEP_STUBS);
+        
when(mockInstance.delegate().nodeSettings().releaseVersion()).thenReturn("4.1.0");
+        doAnswer(invocation -> {
+            Function<InstanceMetadata, Object> fn = invocation.getArgument(0);
+            return fn.apply(mockInstance);
+        }).when(instanceMetadataFetcher).callOnFirstAvailableInstance(any());
+        CassandraBridge mockBridge = mock(CassandraBridge.class);
+        when(mockBridge.getVersion()).thenReturn(CassandraVersion.FOURONE);
+        when(cassandraBridgeFactory.get(anyString())).thenReturn(mockBridge);
+        
when(kafkaProducerFactory.create(any())).thenReturn(mock(KafkaProducer.class));
+
+        EventConsumer result = cdcPublisher.eventConsumer(cdcConfig);
 
         assertThat(result).isNotNull();
         assertThat(result).isInstanceOf(CdcEventConsumer.class);
     }
-
-
-    private SslConfiguration mockSslConfiguration(boolean enabled,
-                                                  boolean preferOpenSSL,
-                                                  String clientAuth,
-                                                  java.util.List<String> 
cipherSuites,
-                                                  java.util.List<String> 
secureTransportProtocols,
-                                                  String handshakeTimeout,
-                                                  boolean keystoreConfigured,
-                                                  boolean truststoreConfigured)
-    {
-        SslConfiguration sslConfig = mock(SslConfiguration.class, 
RETURNS_DEEP_STUBS);
-        when(sslConfig.enabled()).thenReturn(enabled);
-        when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL);
-        when(sslConfig.clientAuth()).thenReturn(clientAuth);
-        when(sslConfig.cipherSuites()).thenReturn(cipherSuites);
-        
when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols);
-
-        SecondBoundConfiguration durationSpec = 
mock(SecondBoundConfiguration.class);
-        when(durationSpec.toString()).thenReturn(handshakeTimeout);
-        when(sslConfig.handshakeTimeout()).thenReturn(durationSpec);
-
-        when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured);
-        
when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured);
-
-        return sslConfig;
-    }
-
-    private KeyStoreConfiguration mockKeystoreConfiguration(String path, 
String password, String type)
-    {
-        KeyStoreConfiguration config = mock(KeyStoreConfiguration.class);
-        when(config.path()).thenReturn(path);
-        when(config.password()).thenReturn(password);
-        when(config.type()).thenReturn(type);
-        return config;
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to