This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1dbfbf0 CASSSIDECAR-308 CDC: Add end-to-end CDC integration tests 
(#317)
c1dbfbf0 is described below

commit c1dbfbf0abd58f685758de57ff324fde88c15485
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Feb 17 16:14:47 2026 -0800

    CASSSIDECAR-308 CDC: Add end-to-end CDC integration tests (#317)
    
     Patch by Jyothsna Konisa; Reviewed by Bernardo Botella and Josh McKenzie 
for CASSSIDECAR-308
---
 CHANGES.txt                                        |   1 +
 gradle.properties                                  |   2 +-
 integration-framework/build.gradle                 |   7 +
 .../cassandra/sidecar/testing/MtlsTestHelper.java  |  85 +++++++++
 .../testing/SharedClusterIntegrationTestBase.java  |  78 ++++-----
 .../cassandra/sidecar/testing/TestCdcConfig.java   | 154 ++++++++++++++++
 .../sidecar/testing/TestCdcEventConsumer.java      |  59 +++++++
 .../sidecar/testing/TestCdcPublisher.java          | 111 ++++++++++++
 .../org/apache/cassandra/testing/TlsTestUtils.java |   3 +-
 .../cassandra/sidecar/cdc/CdcIntegrationTest.java  |  98 +++++++++++
 .../SidecarPeerDownDetectorIntegrationTest.java    |  41 +----
 ...SharedClusterCdcSidecarIntegrationTestBase.java | 195 +++++++++++++++++++++
 .../cassandra/sidecar/modules/CdcModule.java       |  51 +++---
 13 files changed, 780 insertions(+), 105 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7e60e49c..b5400d6a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
  * SchemaStorePublisherFactory should be Injectable in CachingSchemaStore 
(CASSSIDECAR-408)
  * Fix StorageClientTest Docker API compatibility and improve CI test 
reporting (CASSSIDECAR-410)
  * Incorrect SSL Configuration Keys in CdcPublisher.secretsProvider() 
(CASSSIDECAR-401)
diff --git a/gradle.properties b/gradle.properties
index 515c9f7f..75279628 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -46,5 +46,5 @@ swaggerVersion=2.2.21
 kryoVersion=4.0.2
 # OSHI dependencies
 oshiVersion=6.9.0
-analyticsVersion=0.2.0
+analyticsVersion=0.3.0
 kafkaClientVersion=3.7.0
diff --git a/integration-framework/build.gradle 
b/integration-framework/build.gradle
index 62248946..29b9c942 100644
--- a/integration-framework/build.gradle
+++ b/integration-framework/build.gradle
@@ -65,9 +65,16 @@ dependencies {
     api("io.vertx:vertx-junit5:${project.vertxVersion}")
     // The server itself
     api(project(path: ":server"))
+    api(testFixtures(project(path: ":server")))
     api(project(path: ":server-common"))
+
+    // CDC dependencies
+    api(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc_spark3_2.12", version: "${project.analyticsVersion}")
+    api(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc-sidecar_spark3_2.12", version: 
"${project.analyticsVersion}")
+    api "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}"
 }
 
 compileJava.onlyIf { !skipIntegrationTest }
 compileTestJava.onlyIf { !skipIntegrationTest }
 javadoc.onlyIf { !skipIntegrationTest }
+
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
index d3d6e067..1bef05aa 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
@@ -22,6 +22,16 @@ import java.nio.file.Path;
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
 import org.apache.cassandra.testing.utils.tls.CertificateBuilder;
 import org.apache.cassandra.testing.utils.tls.CertificateBundle;
 
@@ -30,6 +40,7 @@ import 
org.apache.cassandra.testing.utils.tls.CertificateBundle;
  */
 public class MtlsTestHelper
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MtlsTestHelper.class);
     public static final String PASSWORD_STRING = "cassandra";
     public static final char[] PASSWORD = PASSWORD_STRING.toCharArray();
     /**
@@ -142,4 +153,78 @@ public class MtlsTestHelper
     {
         return "PKCS12";
     }
+
+    /**
+     * Creates SSL configuration with the specified keystore and shared 
truststore.
+     *
+     * @param keyStorePath the path to the keystore
+     * @param keyStorePassword the keystore password
+     * @param keyStoreType the keystore type
+     * @return SslConfiguration with the provided keystore and shared 
truststore
+     */
+    private SslConfiguration createSslConfiguration(String keyStorePath,
+                                                    String keyStorePassword,
+                                                    String keyStoreType)
+    {
+        KeyStoreConfiguration truststoreConfiguration =
+        new KeyStoreConfigurationImpl(trustStorePath(),
+                                      trustStorePassword(),
+                                      trustStoreType(),
+                                      SecondBoundConfiguration.parse("60s"));
+
+        KeyStoreConfiguration keyStoreConfiguration =
+        new KeyStoreConfigurationImpl(keyStorePath,
+                                      keyStorePassword,
+                                      keyStoreType,
+                                      SecondBoundConfiguration.parse("60s"));
+
+        return SslConfigurationImpl.builder()
+                                   .enabled(true)
+                                   .keystore(keyStoreConfiguration)
+                                   .truststore(truststoreConfiguration)
+                                   .build();
+    }
+
+    /**
+     * Creates SSL configuration for the Sidecar server with mTLS settings if 
enabled.
+     *
+     * @return SslConfiguration with server keystore/truststore, or null if 
mTLS is not enabled
+     */
+    public SslConfiguration createServerSslConfiguration()
+    {
+        if (!isEnabled())
+        {
+            LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if 
you would like mTLS enabled.",
+                        CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+            return null;
+        }
+
+        LOGGER.info("Enabling test mTLS certificate/keystore for server.");
+        return createSslConfiguration(serverKeyStorePath(),
+                                      serverKeyStorePassword(),
+                                      serverKeyStoreType());
+    }
+
+    /**
+     * Creates a SidecarClientConfiguration with mTLS settings if mTLS is 
enabled.
+     *
+     * @return a SidecarClientConfiguration with mTLS settings, or null if 
mTLS is not enabled
+     */
+    public SidecarClientConfiguration createSidecarClientConfiguration()
+    {
+        if (!isEnabled())
+        {
+            LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if 
you would like mTLS enabled.",
+                        CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+            return new SidecarClientConfigurationImpl(null);
+        }
+
+        LOGGER.info("Enabling test mTLS certificate/keystore for client.");
+        SslConfiguration clientSslConfiguration =
+        createSslConfiguration(clientKeyStorePath(),
+                              clientKeyStorePassword(),
+                              serverKeyStoreType());
+
+        return new SidecarClientConfigurationImpl(clientSslConfiguration);
+    }
 }
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 2f20ed7e..7cad8510 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -88,18 +89,15 @@ import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration
 import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
 import org.apache.cassandra.sidecar.config.JmxConfiguration;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
 import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.config.SslConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
 import org.apache.cassandra.sidecar.lifecycle.InJvmDTestLifecycleProvider;
 import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
@@ -116,7 +114,6 @@ import org.apache.cassandra.testing.TestVersion;
 import org.apache.cassandra.testing.TestVersionSupplier;
 
 import static 
org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT;
-import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
 import static org.apache.cassandra.testing.DriverTestUtils.buildContactPoints;
 import static 
org.apache.cassandra.testing.utils.IInstanceUtils.tryGetIntConfig;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -440,6 +437,30 @@ public abstract class SharedClusterIntegrationTestBase
         .isTrue();
     }
 
+    /**
+     * Polls a condition until it returns true or timeout is reached.
+     * Uses System.nanoTime() for accurate timing and Uninterruptibles for 
consistent sleep behavior.
+     *
+     * @param condition the condition to check
+     * @param timeoutSeconds maximum time to wait in seconds
+     * @param pollIntervalMillis interval between checks in milliseconds
+     * @throws AssertionError if timeout is reached before condition is met
+     */
+    protected void waitUntil(BooleanSupplier condition, long timeoutSeconds, 
long pollIntervalMillis)
+    {
+        long startTime = System.nanoTime();
+        long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSeconds);
+
+        while (!condition.getAsBoolean())
+        {
+            if (System.nanoTime() - startTime > timeoutNanos)
+            {
+                throw new AssertionError("Condition not met within " + 
timeoutSeconds + " seconds");
+            }
+            Uninterruptibles.sleepUninterruptibly(pollIntervalMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
     /**
      * Stops the Sidecar service
      *
@@ -775,48 +796,23 @@ public abstract class SharedClusterIntegrationTestBase
         }
 
         public static SidecarConfigurationImpl.Builder 
defaultConfigurationBuilder(
-        MtlsTestHelper mtlsTestHelper, 
Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> 
configurationOverrides)
+                MtlsTestHelper mtlsTestHelper,
+                Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides)
         {
-            ServiceConfiguration conf = ServiceConfigurationImpl.builder()
-                                                                
.host("0.0.0.0") // binds to all interfaces, potential security issue if left 
running for long
-                                                                .port(0) // 
let the test find an available port
+            ServiceConfiguration conf = TestServiceConfiguration.builder()
                                                                 
.schemaKeyspaceConfiguration(SchemaKeyspaceConfigurationImpl.builder()
                                                                                
                                             .isEnabled(true)
                                                                                
                                             .build())
                                                                 .build();
 
-
-            SslConfiguration sslConfiguration = null;
-            if (mtlsTestHelper.isEnabled())
-            {
-                LOGGER.info("Enabling test mTLS certificate/keystore.");
-
-                KeyStoreConfiguration truststoreConfiguration =
-                new KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
-                                              
mtlsTestHelper.trustStorePassword(),
-                                              mtlsTestHelper.trustStoreType(),
-                                              
SecondBoundConfiguration.parse("60s"));
-
-                KeyStoreConfiguration keyStoreConfiguration =
-                new 
KeyStoreConfigurationImpl(mtlsTestHelper.serverKeyStorePath(),
-                                              
mtlsTestHelper.serverKeyStorePassword(),
-                                              
mtlsTestHelper.serverKeyStoreType(),
-                                              
SecondBoundConfiguration.parse("60s"));
-
-                sslConfiguration = SslConfigurationImpl.builder()
-                                                       .enabled(true)
-                                                       
.keystore(keyStoreConfiguration)
-                                                       
.truststore(truststoreConfiguration)
-                                                       .build();
-            }
-            else
-            {
-                LOGGER.info("Not enabling mTLS for testing purposes. Set '{}' 
to 'true' if you would " +
-                            "like mTLS enabled.", 
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
-            }
-            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, SecondBoundConfiguration.parse("60s"),
-                                                                               
  5242880, DEFAULT_API_CALL_TIMEOUT,
-                                                                               
  buildTestS3ProxyConfig());
+            SslConfiguration sslConfiguration = 
mtlsTestHelper.createServerSslConfiguration();
+            S3ClientConfiguration s3ClientConfig =
+            new S3ClientConfigurationImpl("s3-client",
+                                          4,
+                                          
SecondBoundConfiguration.parse("60s"),
+                                          5242880,
+                                          DEFAULT_API_CALL_TIMEOUT,
+                                          buildTestS3ProxyConfig());
 
             SidecarConfigurationImpl.Builder builder = 
SidecarConfigurationImpl.builder()
                                                                                
.serviceConfiguration(conf)
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
new file mode 100644
index 00000000..02a286b9
--- /dev/null
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+/**
+ * Test implementation of {@link CdcConfig} for integration tests.
+ * Provides hardcoded configuration values suitable for testing CDC 
functionality
+ * without requiring external Kafka infrastructure.
+ */
+public class TestCdcConfig implements CdcConfig
+{
+    @Override
+    public String env()
+    {
+        return "test";
+    }
+
+    @Override
+    public String kafkaTopic()
+    {
+        return "test-topic";
+    }
+
+    @Override
+    public TopicFormatType topicFormat()
+    {
+        return TopicFormatType.KEYSPACETABLE;
+    }
+
+    @Override
+    public boolean cdcEnabled()
+    {
+        return true;
+    }
+
+    @Override
+    public String jobId()
+    {
+        return "test-job-id";
+    }
+
+    @Override
+    public Map<String, Object> kafkaConfigs()
+    {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<String, Object> cdcConfigs()
+    {
+        return new HashMap<>();
+    }
+
+    @Override
+    public boolean logOnly()
+    {
+        return true;
+    }
+
+    @Override
+    public String datacenter()
+    {
+        return "datacenter1";
+    }
+
+    @Override
+    public SecondBoundConfiguration watermarkWindow()
+    {
+        return new SecondBoundConfiguration(3, TimeUnit.DAYS);
+    }
+
+    @Override
+    public int maxRecordSizeBytes()
+    {
+        return -1;
+    }
+
+    @Override
+    public String compression()
+    {
+        return null;
+    }
+
+    @Override
+    public MillisecondBoundConfiguration minDelayBetweenMicroBatches()
+    {
+        return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int maxCommitLogsPerInstance()
+    {
+        return 4;
+    }
+
+    @Override
+    public int maxWatermarkerSize()
+    {
+        return 400000;
+    }
+
+    @Override
+    public boolean persistEnabled()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean failOnRecordTooLargeError()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean failOnKafkaError()
+    {
+        return true;
+    }
+
+    @Override
+    public MillisecondBoundConfiguration persistDelay()
+    {
+        return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean isConfigReady()
+    {
+        return true;
+    }
+}
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
new file mode 100644
index 00000000..2836cbae
--- /dev/null
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.inject.Singleton;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+
+/**
+ * Test implementation of EventConsumer for CDC integration tests.
+ * Stores CDC events in a concurrent queue that can be accessed for test 
assertions.
+ */
+@Singleton
+public class TestCdcEventConsumer implements EventConsumer
+{
+    private final Queue<CdcEvent> events = new ConcurrentLinkedQueue<>();
+
+    @Override
+    public void accept(CdcEvent event)
+    {
+        events.offer(event);
+    }
+
+    /**
+     * @return all CDC events captured so far as a list
+     */
+    public List<CdcEvent> getEvents()
+    {
+        return new ArrayList<>(events);
+    }
+
+    /**
+     * Clear all captured events
+     */
+    public void clear()
+    {
+        events.clear();
+    }
+}
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
new file mode 100644
index 00000000..4388c3e6
--- /dev/null
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import com.google.inject.Provider;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import org.apache.cassandra.sidecar.cdc.CdcPublisher;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Test implementation of CdcPublisher that uses an in-memory event consumer
+ * instead of publishing to Kafka. This allows integration tests to capture
+ * and verify CDC events without requiring a Kafka infrastructure.
+ */
+public class TestCdcPublisher extends CdcPublisher
+{
+    private final TestCdcEventConsumer testEventConsumer = new 
TestCdcEventConsumer();
+    private final CdcDatabaseAccessor databaseAccessor;
+
+    public TestCdcPublisher(Vertx vertx,
+                           SidecarConfiguration sidecarConfiguration,
+                           ExecutorPools executorPools,
+                           ClusterConfigProvider clusterConfigProvider,
+                           SchemaSupplier schemaSupplier,
+                           CdcSidecarInstancesProvider 
sidecarInstancesProvider,
+                           SidecarCdcClient.ClientConfig clientConfig,
+                           InstanceMetadataFetcher instanceMetadataFetcher,
+                           CdcConfig conf,
+                           CdcDatabaseAccessor databaseAccessor,
+                           ICdcStats cdcStats,
+                           VirtualTablesDatabaseAccessor virtualTables,
+                           SidecarCdcStats sidecarCdcStats,
+                           Serializer<CdcEvent> avroSerializer,
+                           Provider<RangeManager> rangeManagerProvider)
+    {
+        super(vertx, sidecarConfiguration, executorPools, 
clusterConfigProvider,
+              schemaSupplier, sidecarInstancesProvider, clientConfig,
+              instanceMetadataFetcher, conf, databaseAccessor, cdcStats,
+              virtualTables, sidecarCdcStats, avroSerializer, 
rangeManagerProvider);
+        this.databaseAccessor = databaseAccessor;
+    }
+
+    @Override
+    public EventConsumer eventConsumer(CdcConfig conf, Serializer<CdcEvent> 
avroSerializer)
+    {
+        return testEventConsumer;
+    }
+
+    /**
+     * Override scheduleDecision to execute in tests when database is ready,
+     * bypassing the initialization and cache warming checks required in 
production.
+     */
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        // If already running, skip to avoid redundant execution attempts
+        if (isRunning())
+        {
+            return ScheduleDecision.SKIP;
+        }
+
+        // Only execute if the database accessor is available
+        // This prevents CassandraUnavailableException during test startup
+        if (databaseAccessor.isAvailable())
+        {
+            return ScheduleDecision.EXECUTE;
+        }
+
+        // Database not ready yet, skip this iteration and retry later
+        return ScheduleDecision.SKIP;
+    }
+
+    /**
+     * @return the test CDC event consumer for test assertions
+     */
+    public TestCdcEventConsumer getTestEventConsumer()
+    {
+        return testEventConsumer;
+    }
+}
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
 
b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
index ae802bb1..92d3800a 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
@@ -87,7 +87,8 @@ public class TlsTestUtils
         InetAddress address = nativeInetSocketAddress.getAddress();
 
         com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder()
-                                                                               
            .withLoadBalancingPolicy(new 
DCAwareRoundRobinPolicy.Builder().build())
+                                                                               
            .withLoadBalancingPolicy(
+                                                                               
            new DCAwareRoundRobinPolicy.Builder().build())
                                                                                
            .withSSL(sslOptions)
                                                                                
            .withoutJMXReporting()
                                                                                
            .withAuthProvider(new PlainTextAuthProvider(username, password))
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
new file mode 100644
index 00000000..961a5224
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterCdcSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.TestCdcEventConsumer;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for CDC functionality.
+ * Tests that mutations on CDC-enabled tables are captured and published to 
TestCdcEventConsumer.
+ */
+public class CdcIntegrationTest extends 
SharedClusterCdcSidecarIntegrationTestBase
+{
+    private static final QualifiedName CDC_TEST_TABLE = new 
QualifiedName("cdc_test_ks", "cdc_test_table");
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(CDC_TEST_TABLE, DC1_RF1);
+
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS %s " +
+                "(id int PRIMARY KEY, value int) " +
+                "WITH cdc = true";
+        createTestTable(CDC_TEST_TABLE, createTableStatement);
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        waitForSchemaReady(30, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testCdcEventsPublishedToInMemory()
+    {
+        // Write mutations into the test table
+        int mutationCount = 100;
+        Map<Integer, Integer> expectedMutations = new HashMap<>();
+        for (int i = 1; i <= mutationCount; i++)
+        {
+            String query = String.format("INSERT INTO %s (id, value) VALUES 
(%d, %d)", CDC_TEST_TABLE, i, i);
+            cluster.getFirstRunningInstance()
+                    .coordinator()
+                    .execute(query, ConsistencyLevel.ONE);
+            expectedMutations.put(i, i);
+        }
+
+        TestCdcEventConsumer consumer = getTestEventConsumer();
+        waitUntil(() -> consumer.getEvents().size() >= mutationCount, 120, 
1000);
+        assertThat(consumer.getEvents().size())
+                .as("All CDC events should be published to in-memory consumer")
+                .isGreaterThanOrEqualTo(mutationCount);
+
+        // Verify all the mutations with expected values
+        List<CdcEvent> events = consumer.getEvents();
+        for (CdcEvent cdcEvent : events)
+        {
+            assertThat(cdcEvent.keyspace).isEqualTo(CDC_TEST_TABLE.keyspace());
+            assertThat(cdcEvent.table).isEqualTo(CDC_TEST_TABLE.table());
+            assertThat(cdcEvent.getKind()).isEqualTo(CdcEvent.Kind.INSERT);
+            
assertThat(cdcEvent.getValueColumns().get(0).columnName).isEqualTo("value");
+
+            int value = 
ByteBuffer.wrap(Objects.requireNonNull(cdcEvent.getValueColumns().get(0).getBytes())).getInt();
+            expectedMutations.remove(value);
+        }
+        assertThat(expectedMutations).isEmpty();
+    }
+}
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
index 6adb51b0..aff265b1 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
@@ -38,20 +38,14 @@ import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
 import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SslConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
 import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
 import 
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
@@ -63,8 +57,6 @@ import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestB
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
-import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.PASSWORD_STRING;
 import static 
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.cassandraInstanceHostname;
 import static 
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.defaultConfigurationBuilder;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
@@ -230,38 +222,7 @@ class SidecarPeerDownDetectorIntegrationTest extends 
SharedClusterSidecarIntegra
                                                                     
.schemaKeyspaceConfiguration(SCHEMA_KEYSPACE_CONFIG)
                                                                     .build();
 
-                // We need to provide mTLS configuration for the Sidecar 
client so it can talk to
-                // other sidecars using mTLS
-                SslConfiguration clientSslConfiguration = null;
-                if (mtlsTestHelper.isEnabled())
-                {
-                    LOGGER.info("Enabling test mTLS certificate/keystore.");
-
-                    KeyStoreConfiguration truststoreConfiguration =
-                    new 
KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
-                                                  
mtlsTestHelper.trustStorePassword(),
-                                                  
mtlsTestHelper.trustStoreType(),
-                                                  
SecondBoundConfiguration.parse("60s"));
-
-                    KeyStoreConfiguration keyStoreConfiguration =
-                    new 
KeyStoreConfigurationImpl(mtlsTestHelper.clientKeyStorePath(),
-                                                  PASSWORD_STRING,
-                                                  
mtlsTestHelper.serverKeyStoreType(), // server and client keystore types are 
the same
-                                                  
SecondBoundConfiguration.parse("60s"));
-
-                    clientSslConfiguration = SslConfigurationImpl.builder()
-                                                                 .enabled(true)
-                                                                 
.keystore(keyStoreConfiguration)
-                                                                 
.truststore(truststoreConfiguration)
-                                                                 .build();
-                }
-                else
-                {
-                    LOGGER.info("Not enabling mTLS for testing purposes. Set 
'{}' to 'true' if you would " +
-                                "like mTLS enabled.", 
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
-                }
-
-                SidecarClientConfiguration sidecarClientConfiguration = new 
SidecarClientConfigurationImpl(clientSslConfiguration);
+                SidecarClientConfiguration sidecarClientConfiguration = 
mtlsTestHelper.createSidecarClientConfiguration();
 
                 // Let's run this very frequently for testing purposes
                 SidecarPeerHealthConfigurationImpl 
sidecarPeerHealthConfiguration
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
new file mode 100644
index 00000000..6f411000
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.AfterEach;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import org.apache.cassandra.sidecar.cdc.CdcPublisher;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Base class for CDC integration tests. Extends 
SharedClusterIntegrationTestBase with
+ * CDC-specific configuration and setup, including:
+ * - CDC-enabled Cassandra cluster configuration
+ * - TestCdcPublisher with TestCdcEventConsumer
+ * - Cassandra 4.1 version requirement
+ * - Helper methods to access CDC components
+ */
+public abstract class SharedClusterCdcSidecarIntegrationTestBase extends 
SharedClusterIntegrationTestBase
+{
+    @AfterEach
+    void cleanupCdcConsumerAfterEachTest()
+    {
+        TestCdcPublisher testCdcPublisher = (TestCdcPublisher) 
serverWrapper.injector.getInstance(CdcPublisher.class);
+        if (testCdcPublisher != null)
+        {
+            TestCdcEventConsumer consumer = 
testCdcPublisher.getTestEventConsumer();
+            if (consumer != null)
+            {
+                consumer.clear();
+            }
+        }
+    }
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        // The current CDC implementation cannot read 5.x commitlogs, so 
verify Cassandra version is 4.x
+        SimpleCassandraVersion version = 
SimpleCassandraVersion.create(testVersion.version());
+        assumeThat(version.major)
+                .as("Current CDC implementation cannot read 5.x commitlogs, 
requires Cassandra 4.x")
+                .isEqualTo(4);
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                .dcCount(1)
+                .nodesPerDc(1)
+                .additionalInstanceConfig(Map.of("cdc_enabled", true));
+    }
+
+    @Override
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
+    {
+        return builder -> {
+            // Override service configuration to use specific port for CDC 
tests
+            ServiceConfiguration existingConfig = 
builder.build().serviceConfiguration();
+            ServiceConfiguration cdcServiceConfig = 
ServiceConfigurationImpl.builder()
+                                                                            
.host(existingConfig.host())
+                                                                            
.port(9043)  // TODO: Make this port dynamically allocated
+                                                                            
.schemaKeyspaceConfiguration(existingConfig.schemaKeyspaceConfiguration())
+                                                                            
.build();
+            builder.serviceConfiguration(cdcServiceConfig);
+
+            // Configure sidecar client for mTLS if enabled
+            SidecarClientConfiguration clientConfig = 
mtlsTestHelper.createSidecarClientConfiguration();
+            if (clientConfig != null)
+            {
+                builder.sidecarClientConfiguration(clientConfig);
+            }
+            return builder;
+        };
+    }
+
+    @Override
+    protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        AbstractModule cdcModule = new CdcTestModule();
+        serverWrapper = startSidecarWithInstances(cluster, cdcModule);
+    }
+
+    /**
+     * @return the TestCdcPublisher instance for test access
+     */
+    protected TestCdcPublisher getCdcPublisher()
+    {
+        return (TestCdcPublisher) 
serverWrapper.injector.getInstance(CdcPublisher.class);
+    }
+
+    /**
+     * @return the TestCdcEventConsumer for test assertions
+     */
+    protected TestCdcEventConsumer getTestEventConsumer()
+    {
+        TestCdcPublisher publisher = getCdcPublisher();
+        return publisher != null ? publisher.getTestEventConsumer() : null;
+    }
+
+    /**
+     * CDC-specific Guice module that provides test implementations for CDC 
components.
+     */
+    private static class CdcTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        CdcPublisher cdcPublisher(Vertx vertx,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  ExecutorPools executorPools,
+                                  ClusterConfigProvider clusterConfigProvider,
+                                  SchemaSupplier schemaSupplier,
+                                  CdcSidecarInstancesProvider 
sidecarInstancesProvider,
+                                  SidecarCdcClient.ClientConfig clientConfig,
+                                  InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                  CdcConfig conf,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  ICdcStats cdcStats,
+                                  VirtualTablesDatabaseAccessor virtualTables,
+                                  SidecarCdcStats sidecarCdcStats,
+                                  Serializer<CdcEvent> avroSerializer,
+                                  TokenRingProvider tokenRingProvider)
+        {
+            return new TestCdcPublisher(vertx,
+                                       sidecarConfiguration,
+                                       executorPools,
+                                       clusterConfigProvider,
+                                       schemaSupplier,
+                                       sidecarInstancesProvider,
+                                       clientConfig,
+                                       instanceMetadataFetcher,
+                                       conf,
+                                       databaseAccessor,
+                                       cdcStats,
+                                       virtualTables,
+                                       sidecarCdcStats,
+                                       avroSerializer,
+                                       () -> new 
ContentionFreeRangeManager(vertx, tokenRingProvider));
+        }
+
+        @Provides
+        @Singleton
+        public CdcConfig cdcConfig()
+        {
+            return new TestCdcConfig();
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 3d369c5e..b6fc14e4 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -377,28 +377,21 @@ public class CdcModule extends AbstractModule
 
     @Provides
     @Singleton
-    public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration 
configuration)
-    {
-        return new CdcStatesSchema(configuration);
-    }
-
-    @ProvidesIntoMap
-    @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
-    PeriodicTask cdcPublisherTask(Vertx vertx,
-                                  SidecarConfiguration sidecarConfiguration,
-                                  ExecutorPools executorPools,
-                                  ClusterConfigProvider clusterConfigProvider,
-                                  SchemaSupplier schemaSupplier,
-                                  CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                                  SidecarCdcClient.ClientConfig clientConfig,
-                                  InstanceMetadataFetcher 
instanceMetadataFetcher,
-                                  CdcConfig conf,
-                                  CdcDatabaseAccessor databaseAccessor,
-                                  TokenRingProvider tokenRingProvider,
-                                  ICdcStats cdcStats,
-                                  VirtualTablesDatabaseAccessor virtualTables,
-                                  SidecarCdcStats sidecarCdcStats,
-                                  Serializer<CdcEvent> avroSerializer)
+    CdcPublisher cdcPublisher(Vertx vertx,
+                              SidecarConfiguration sidecarConfiguration,
+                              ExecutorPools executorPools,
+                              ClusterConfigProvider clusterConfigProvider,
+                              SchemaSupplier schemaSupplier,
+                              CdcSidecarInstancesProvider 
sidecarInstancesProvider,
+                              SidecarCdcClient.ClientConfig clientConfig,
+                              InstanceMetadataFetcher instanceMetadataFetcher,
+                              CdcConfig conf,
+                              CdcDatabaseAccessor databaseAccessor,
+                              TokenRingProvider tokenRingProvider,
+                              ICdcStats cdcStats,
+                              VirtualTablesDatabaseAccessor virtualTables,
+                              SidecarCdcStats sidecarCdcStats,
+                              Serializer<CdcEvent> avroSerializer)
     {
         return new CdcPublisher(vertx,
                                 sidecarConfiguration,
@@ -417,6 +410,20 @@ public class CdcModule extends AbstractModule
                                 () -> new ContentionFreeRangeManager(vertx, 
tokenRingProvider));
     }
 
+    @Provides
+    @Singleton
+    public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration 
configuration)
+    {
+        return new CdcStatesSchema(configuration);
+    }
+
+    @ProvidesIntoMap
+    @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
+    PeriodicTask cdcPublisherTask(CdcPublisher cdcPublisher)
+    {
+        return cdcPublisher;
+    }
+
     @Singleton
     @ProvidesIntoMap
     @KeyClassMapKey(PeriodicTaskMapKeys.CdcConfigRefresherNotifierKey.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to