This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1dbfbf0 CASSSIDECAR-308 CDC: Add end-to-end CDC integration tests
(#317)
c1dbfbf0 is described below
commit c1dbfbf0abd58f685758de57ff324fde88c15485
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Feb 17 16:14:47 2026 -0800
CASSSIDECAR-308 CDC: Add end-to-end CDC integration tests (#317)
Patch by Jyothsna Konisa; Reviewed by Bernardo Botella and Josh McKenzie
for CASSSIDECAR-308
---
CHANGES.txt | 1 +
gradle.properties | 2 +-
integration-framework/build.gradle | 7 +
.../cassandra/sidecar/testing/MtlsTestHelper.java | 85 +++++++++
.../testing/SharedClusterIntegrationTestBase.java | 78 ++++-----
.../cassandra/sidecar/testing/TestCdcConfig.java | 154 ++++++++++++++++
.../sidecar/testing/TestCdcEventConsumer.java | 59 +++++++
.../sidecar/testing/TestCdcPublisher.java | 111 ++++++++++++
.../org/apache/cassandra/testing/TlsTestUtils.java | 3 +-
.../cassandra/sidecar/cdc/CdcIntegrationTest.java | 98 +++++++++++
.../SidecarPeerDownDetectorIntegrationTest.java | 41 +----
...SharedClusterCdcSidecarIntegrationTestBase.java | 195 +++++++++++++++++++++
.../cassandra/sidecar/modules/CdcModule.java | 51 +++---
13 files changed, 780 insertions(+), 105 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e60e49c..b5400d6a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore
(CASSSIDECAR-408)
* Fix StorageClientTest Docker API compatibility and improve CI test
reporting (CASSSIDECAR-410)
* Incorrect SSL Configuration Keys in CdcPublisher.secretsProvider()
(CASSSIDECAR-401)
diff --git a/gradle.properties b/gradle.properties
index 515c9f7f..75279628 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -46,5 +46,5 @@ swaggerVersion=2.2.21
kryoVersion=4.0.2
# OSHI dependencies
oshiVersion=6.9.0
-analyticsVersion=0.2.0
+analyticsVersion=0.3.0
kafkaClientVersion=3.7.0
diff --git a/integration-framework/build.gradle
b/integration-framework/build.gradle
index 62248946..29b9c942 100644
--- a/integration-framework/build.gradle
+++ b/integration-framework/build.gradle
@@ -65,9 +65,16 @@ dependencies {
api("io.vertx:vertx-junit5:${project.vertxVersion}")
// The server itself
api(project(path: ":server"))
+ api(testFixtures(project(path: ":server")))
api(project(path: ":server-common"))
+
+ // CDC dependencies
+ api(group: "org.apache.cassandra", name:
"cassandra-analytics-cdc_spark3_2.12", version: "${project.analyticsVersion}")
+ api(group: "org.apache.cassandra", name:
"cassandra-analytics-cdc-sidecar_spark3_2.12", version:
"${project.analyticsVersion}")
+ api "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}"
}
compileJava.onlyIf { !skipIntegrationTest }
compileTestJava.onlyIf { !skipIntegrationTest }
javadoc.onlyIf { !skipIntegrationTest }
+
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
index d3d6e067..1bef05aa 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java
@@ -22,6 +22,16 @@ import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
import org.apache.cassandra.testing.utils.tls.CertificateBuilder;
import org.apache.cassandra.testing.utils.tls.CertificateBundle;
@@ -30,6 +40,7 @@ import
org.apache.cassandra.testing.utils.tls.CertificateBundle;
*/
public class MtlsTestHelper
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MtlsTestHelper.class);
public static final String PASSWORD_STRING = "cassandra";
public static final char[] PASSWORD = PASSWORD_STRING.toCharArray();
/**
@@ -142,4 +153,78 @@ public class MtlsTestHelper
{
return "PKCS12";
}
+
+ /**
+ * Creates SSL configuration with the specified keystore and shared
truststore.
+ *
+ * @param keyStorePath the path to the keystore
+ * @param keyStorePassword the keystore password
+ * @param keyStoreType the keystore type
+ * @return SslConfiguration with the provided keystore and shared
truststore
+ */
+ private SslConfiguration createSslConfiguration(String keyStorePath,
+ String keyStorePassword,
+ String keyStoreType)
+ {
+ KeyStoreConfiguration truststoreConfiguration =
+ new KeyStoreConfigurationImpl(trustStorePath(),
+ trustStorePassword(),
+ trustStoreType(),
+ SecondBoundConfiguration.parse("60s"));
+
+ KeyStoreConfiguration keyStoreConfiguration =
+ new KeyStoreConfigurationImpl(keyStorePath,
+ keyStorePassword,
+ keyStoreType,
+ SecondBoundConfiguration.parse("60s"));
+
+ return SslConfigurationImpl.builder()
+ .enabled(true)
+ .keystore(keyStoreConfiguration)
+ .truststore(truststoreConfiguration)
+ .build();
+ }
+
+ /**
+ * Creates SSL configuration for the Sidecar server with mTLS settings if
enabled.
+ *
+ * @return SslConfiguration with server keystore/truststore, or null if
mTLS is not enabled
+ */
+ public SslConfiguration createServerSslConfiguration()
+ {
+ if (!isEnabled())
+ {
+ LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if
you would like mTLS enabled.",
+ CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+ return null;
+ }
+
+ LOGGER.info("Enabling test mTLS certificate/keystore for server.");
+ return createSslConfiguration(serverKeyStorePath(),
+ serverKeyStorePassword(),
+ serverKeyStoreType());
+ }
+
+ /**
+ * Creates a SidecarClientConfiguration with mTLS settings if mTLS is
enabled.
+ *
+ * @return a SidecarClientConfiguration with mTLS settings, or null if
mTLS is not enabled
+ */
+ public SidecarClientConfiguration createSidecarClientConfiguration()
+ {
+ if (!isEnabled())
+ {
+ LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if
you would like mTLS enabled.",
+ CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+ return new SidecarClientConfigurationImpl(null);
+ }
+
+ LOGGER.info("Enabling test mTLS certificate/keystore for client.");
+ SslConfiguration clientSslConfiguration =
+ createSslConfiguration(clientKeyStorePath(),
+ clientKeyStorePassword(),
+ serverKeyStoreType());
+
+ return new SidecarClientConfigurationImpl(clientSslConfiguration);
+ }
}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 2f20ed7e..7cad8510 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -88,18 +89,15 @@ import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration
import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
import org.apache.cassandra.sidecar.config.JmxConfiguration;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
import org.apache.cassandra.sidecar.lifecycle.InJvmDTestLifecycleProvider;
import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
@@ -116,7 +114,6 @@ import org.apache.cassandra.testing.TestVersion;
import org.apache.cassandra.testing.TestVersionSupplier;
import static
org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT;
-import static
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
import static org.apache.cassandra.testing.DriverTestUtils.buildContactPoints;
import static
org.apache.cassandra.testing.utils.IInstanceUtils.tryGetIntConfig;
import static org.assertj.core.api.Assertions.assertThat;
@@ -440,6 +437,30 @@ public abstract class SharedClusterIntegrationTestBase
.isTrue();
}
+ /**
+ * Polls a condition until it returns true or timeout is reached.
+ * Uses System.nanoTime() for accurate timing and Uninterruptibles for
consistent sleep behavior.
+ *
+ * @param condition the condition to check
+ * @param timeoutSeconds maximum time to wait in seconds
+ * @param pollIntervalMillis interval between checks in milliseconds
+ * @throws AssertionError if timeout is reached before condition is met
+ */
+ protected void waitUntil(BooleanSupplier condition, long timeoutSeconds,
long pollIntervalMillis)
+ {
+ long startTime = System.nanoTime();
+ long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSeconds);
+
+ while (!condition.getAsBoolean())
+ {
+ if (System.nanoTime() - startTime > timeoutNanos)
+ {
+ throw new AssertionError("Condition not met within " +
timeoutSeconds + " seconds");
+ }
+ Uninterruptibles.sleepUninterruptibly(pollIntervalMillis,
TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Stops the Sidecar service
*
@@ -775,48 +796,23 @@ public abstract class SharedClusterIntegrationTestBase
}
public static SidecarConfigurationImpl.Builder
defaultConfigurationBuilder(
- MtlsTestHelper mtlsTestHelper,
Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder>
configurationOverrides)
+ MtlsTestHelper mtlsTestHelper,
+ Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides)
{
- ServiceConfiguration conf = ServiceConfigurationImpl.builder()
-
.host("0.0.0.0") // binds to all interfaces, potential security issue if left
running for long
- .port(0) //
let the test find an available port
+ ServiceConfiguration conf = TestServiceConfiguration.builder()
.schemaKeyspaceConfiguration(SchemaKeyspaceConfigurationImpl.builder()
.isEnabled(true)
.build())
.build();
-
- SslConfiguration sslConfiguration = null;
- if (mtlsTestHelper.isEnabled())
- {
- LOGGER.info("Enabling test mTLS certificate/keystore.");
-
- KeyStoreConfiguration truststoreConfiguration =
- new KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
-
mtlsTestHelper.trustStorePassword(),
- mtlsTestHelper.trustStoreType(),
-
SecondBoundConfiguration.parse("60s"));
-
- KeyStoreConfiguration keyStoreConfiguration =
- new
KeyStoreConfigurationImpl(mtlsTestHelper.serverKeyStorePath(),
-
mtlsTestHelper.serverKeyStorePassword(),
-
mtlsTestHelper.serverKeyStoreType(),
-
SecondBoundConfiguration.parse("60s"));
-
- sslConfiguration = SslConfigurationImpl.builder()
- .enabled(true)
-
.keystore(keyStoreConfiguration)
-
.truststore(truststoreConfiguration)
- .build();
- }
- else
- {
- LOGGER.info("Not enabling mTLS for testing purposes. Set '{}'
to 'true' if you would " +
- "like mTLS enabled.",
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
- }
- S3ClientConfiguration s3ClientConfig = new
S3ClientConfigurationImpl("s3-client", 4, SecondBoundConfiguration.parse("60s"),
-
5242880, DEFAULT_API_CALL_TIMEOUT,
-
buildTestS3ProxyConfig());
+ SslConfiguration sslConfiguration =
mtlsTestHelper.createServerSslConfiguration();
+ S3ClientConfiguration s3ClientConfig =
+ new S3ClientConfigurationImpl("s3-client",
+ 4,
+
SecondBoundConfiguration.parse("60s"),
+ 5242880,
+ DEFAULT_API_CALL_TIMEOUT,
+ buildTestS3ProxyConfig());
SidecarConfigurationImpl.Builder builder =
SidecarConfigurationImpl.builder()
.serviceConfiguration(conf)
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
new file mode 100644
index 00000000..02a286b9
--- /dev/null
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+/**
+ * Test implementation of {@link CdcConfig} for integration tests.
+ * Provides hardcoded configuration values suitable for testing CDC
functionality
+ * without requiring external Kafka infrastructure.
+ */
+public class TestCdcConfig implements CdcConfig
+{
+ @Override
+ public String env()
+ {
+ return "test";
+ }
+
+ @Override
+ public String kafkaTopic()
+ {
+ return "test-topic";
+ }
+
+ @Override
+ public TopicFormatType topicFormat()
+ {
+ return TopicFormatType.KEYSPACETABLE;
+ }
+
+ @Override
+ public boolean cdcEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public String jobId()
+ {
+ return "test-job-id";
+ }
+
+ @Override
+ public Map<String, Object> kafkaConfigs()
+ {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<String, Object> cdcConfigs()
+ {
+ return new HashMap<>();
+ }
+
+ @Override
+ public boolean logOnly()
+ {
+ return true;
+ }
+
+ @Override
+ public String datacenter()
+ {
+ return "datacenter1";
+ }
+
+ @Override
+ public SecondBoundConfiguration watermarkWindow()
+ {
+ return new SecondBoundConfiguration(3, TimeUnit.DAYS);
+ }
+
+ @Override
+ public int maxRecordSizeBytes()
+ {
+ return -1;
+ }
+
+ @Override
+ public String compression()
+ {
+ return null;
+ }
+
+ @Override
+ public MillisecondBoundConfiguration minDelayBetweenMicroBatches()
+ {
+ return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int maxCommitLogsPerInstance()
+ {
+ return 4;
+ }
+
+ @Override
+ public int maxWatermarkerSize()
+ {
+ return 400000;
+ }
+
+ @Override
+ public boolean persistEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean failOnRecordTooLargeError()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean failOnKafkaError()
+ {
+ return true;
+ }
+
+ @Override
+ public MillisecondBoundConfiguration persistDelay()
+ {
+ return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isConfigReady()
+ {
+ return true;
+ }
+}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
new file mode 100644
index 00000000..2836cbae
--- /dev/null
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.inject.Singleton;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+
+/**
+ * Test implementation of EventConsumer for CDC integration tests.
+ * Stores CDC events in a concurrent queue that can be accessed for test
assertions.
+ */
+@Singleton
+public class TestCdcEventConsumer implements EventConsumer
+{
+ private final Queue<CdcEvent> events = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void accept(CdcEvent event)
+ {
+ events.offer(event);
+ }
+
+ /**
+ * @return all CDC events captured so far as a list
+ */
+ public List<CdcEvent> getEvents()
+ {
+ return new ArrayList<>(events);
+ }
+
+ /**
+ * Clear all captured events
+ */
+ public void clear()
+ {
+ events.clear();
+ }
+}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
new file mode 100644
index 00000000..4388c3e6
--- /dev/null
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.testing;
+
+import com.google.inject.Provider;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import org.apache.cassandra.sidecar.cdc.CdcPublisher;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Test implementation of CdcPublisher that uses an in-memory event consumer
+ * instead of publishing to Kafka. This allows integration tests to capture
+ * and verify CDC events without requiring a Kafka infrastructure.
+ */
+public class TestCdcPublisher extends CdcPublisher
+{
+ private final TestCdcEventConsumer testEventConsumer = new
TestCdcEventConsumer();
+ private final CdcDatabaseAccessor databaseAccessor;
+
+ public TestCdcPublisher(Vertx vertx,
+ SidecarConfiguration sidecarConfiguration,
+ ExecutorPools executorPools,
+ ClusterConfigProvider clusterConfigProvider,
+ SchemaSupplier schemaSupplier,
+ CdcSidecarInstancesProvider
sidecarInstancesProvider,
+ SidecarCdcClient.ClientConfig clientConfig,
+ InstanceMetadataFetcher instanceMetadataFetcher,
+ CdcConfig conf,
+ CdcDatabaseAccessor databaseAccessor,
+ ICdcStats cdcStats,
+ VirtualTablesDatabaseAccessor virtualTables,
+ SidecarCdcStats sidecarCdcStats,
+ Serializer<CdcEvent> avroSerializer,
+ Provider<RangeManager> rangeManagerProvider)
+ {
+ super(vertx, sidecarConfiguration, executorPools,
clusterConfigProvider,
+ schemaSupplier, sidecarInstancesProvider, clientConfig,
+ instanceMetadataFetcher, conf, databaseAccessor, cdcStats,
+ virtualTables, sidecarCdcStats, avroSerializer,
rangeManagerProvider);
+ this.databaseAccessor = databaseAccessor;
+ }
+
+ @Override
+ public EventConsumer eventConsumer(CdcConfig conf, Serializer<CdcEvent>
avroSerializer)
+ {
+ return testEventConsumer;
+ }
+
+ /**
+ * Override scheduleDecision to execute in tests when database is ready,
+ * bypassing the initialization and cache warming checks required in
production.
+ */
+ @Override
+ public ScheduleDecision scheduleDecision()
+ {
+ // If already running, skip to avoid redundant execution attempts
+ if (isRunning())
+ {
+ return ScheduleDecision.SKIP;
+ }
+
+ // Only execute if the database accessor is available
+ // This prevents CassandraUnavailableException during test startup
+ if (databaseAccessor.isAvailable())
+ {
+ return ScheduleDecision.EXECUTE;
+ }
+
+ // Database not ready yet, skip this iteration and retry later
+ return ScheduleDecision.SKIP;
+ }
+
+ /**
+ * @return the test CDC event consumer for test assertions
+ */
+ public TestCdcEventConsumer getTestEventConsumer()
+ {
+ return testEventConsumer;
+ }
+}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
index ae802bb1..92d3800a 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java
@@ -87,7 +87,8 @@ public class TlsTestUtils
InetAddress address = nativeInetSocketAddress.getAddress();
com.datastax.driver.core.Cluster.Builder builder =
com.datastax.driver.core.Cluster.builder()
-
.withLoadBalancingPolicy(new
DCAwareRoundRobinPolicy.Builder().build())
+
.withLoadBalancingPolicy(
+
new DCAwareRoundRobinPolicy.Builder().build())
.withSSL(sslOptions)
.withoutJMXReporting()
.withAuthProvider(new PlainTextAuthProvider(username, password))
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
new file mode 100644
index 00000000..961a5224
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterCdcSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.TestCdcEventConsumer;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for CDC functionality.
+ * Tests that mutations on CDC-enabled tables are captured and published to
TestCdcEventConsumer.
+ */
+public class CdcIntegrationTest extends
SharedClusterCdcSidecarIntegrationTestBase
+{
+ private static final QualifiedName CDC_TEST_TABLE = new
QualifiedName("cdc_test_ks", "cdc_test_table");
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(CDC_TEST_TABLE, DC1_RF1);
+
+ String createTableStatement = "CREATE TABLE IF NOT EXISTS %s " +
+ "(id int PRIMARY KEY, value int) " +
+ "WITH cdc = true";
+ createTestTable(CDC_TEST_TABLE, createTableStatement);
+ }
+
+ @Override
+ protected void beforeTestStart()
+ {
+ waitForSchemaReady(30, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testCdcEventsPublishedToInMemory()
+ {
+ // Write mutations into the test table
+ int mutationCount = 100;
+ Map<Integer, Integer> expectedMutations = new HashMap<>();
+ for (int i = 1; i <= mutationCount; i++)
+ {
+ String query = String.format("INSERT INTO %s (id, value) VALUES
(%d, %d)", CDC_TEST_TABLE, i, i);
+ cluster.getFirstRunningInstance()
+ .coordinator()
+ .execute(query, ConsistencyLevel.ONE);
+ expectedMutations.put(i, i);
+ }
+
+ TestCdcEventConsumer consumer = getTestEventConsumer();
+ waitUntil(() -> consumer.getEvents().size() >= mutationCount, 120,
1000);
+ assertThat(consumer.getEvents().size())
+ .as("All CDC events should be published to in-memory consumer")
+ .isGreaterThanOrEqualTo(mutationCount);
+
+ // Verify all the mutations with expected values
+ List<CdcEvent> events = consumer.getEvents();
+ for (CdcEvent cdcEvent : events)
+ {
+ assertThat(cdcEvent.keyspace).isEqualTo(CDC_TEST_TABLE.keyspace());
+ assertThat(cdcEvent.table).isEqualTo(CDC_TEST_TABLE.table());
+ assertThat(cdcEvent.getKind()).isEqualTo(CdcEvent.Kind.INSERT);
+
assertThat(cdcEvent.getValueColumns().get(0).columnName).isEqualTo("value");
+
+ int value =
ByteBuffer.wrap(Objects.requireNonNull(cdcEvent.getValueColumns().get(0).getBytes())).getInt();
+ expectedMutations.remove(value);
+ }
+ assertThat(expectedMutations).isEmpty();
+ }
+}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
index 6adb51b0..aff265b1 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
@@ -38,20 +38,14 @@ import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
-import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SslConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
import
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
@@ -63,8 +57,6 @@ import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestB
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
-import static
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
-import static
org.apache.cassandra.sidecar.testing.MtlsTestHelper.PASSWORD_STRING;
import static
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.cassandraInstanceHostname;
import static
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.defaultConfigurationBuilder;
import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
@@ -230,38 +222,7 @@ class SidecarPeerDownDetectorIntegrationTest extends
SharedClusterSidecarIntegra
.schemaKeyspaceConfiguration(SCHEMA_KEYSPACE_CONFIG)
.build();
- // We need to provide mTLS configuration for the Sidecar
client so it can talk to
- // other sidecars using mTLS
- SslConfiguration clientSslConfiguration = null;
- if (mtlsTestHelper.isEnabled())
- {
- LOGGER.info("Enabling test mTLS certificate/keystore.");
-
- KeyStoreConfiguration truststoreConfiguration =
- new
KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
-
mtlsTestHelper.trustStorePassword(),
-
mtlsTestHelper.trustStoreType(),
-
SecondBoundConfiguration.parse("60s"));
-
- KeyStoreConfiguration keyStoreConfiguration =
- new
KeyStoreConfigurationImpl(mtlsTestHelper.clientKeyStorePath(),
- PASSWORD_STRING,
-
mtlsTestHelper.serverKeyStoreType(), // server and client keystore types are
the same
-
SecondBoundConfiguration.parse("60s"));
-
- clientSslConfiguration = SslConfigurationImpl.builder()
- .enabled(true)
-
.keystore(keyStoreConfiguration)
-
.truststore(truststoreConfiguration)
- .build();
- }
- else
- {
- LOGGER.info("Not enabling mTLS for testing purposes. Set
'{}' to 'true' if you would " +
- "like mTLS enabled.",
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
- }
-
- SidecarClientConfiguration sidecarClientConfiguration = new
SidecarClientConfigurationImpl(clientSslConfiguration);
+ SidecarClientConfiguration sidecarClientConfiguration =
mtlsTestHelper.createSidecarClientConfiguration();
// Let's run this very frequently for testing purposes
SidecarPeerHealthConfigurationImpl
sidecarPeerHealthConfiguration
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
new file mode 100644
index 00000000..6f411000
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.AfterEach;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import org.apache.cassandra.sidecar.cdc.CdcPublisher;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Base class for CDC integration tests. Extends
SharedClusterIntegrationTestBase with
+ * CDC-specific configuration and setup, including:
+ * - CDC-enabled Cassandra cluster configuration
+ * - TestCdcPublisher with TestCdcEventConsumer
+ * - Cassandra 4.1 version requirement
+ * - Helper methods to access CDC components
+ */
+public abstract class SharedClusterCdcSidecarIntegrationTestBase extends
SharedClusterIntegrationTestBase
+{
+ @AfterEach
+ void cleanupCdcConsumerAfterEachTest()
+ {
+ TestCdcPublisher testCdcPublisher = (TestCdcPublisher)
serverWrapper.injector.getInstance(CdcPublisher.class);
+ if (testCdcPublisher != null)
+ {
+ TestCdcEventConsumer consumer =
testCdcPublisher.getTestEventConsumer();
+ if (consumer != null)
+ {
+ consumer.clear();
+ }
+ }
+ }
+
+ @Override
+ protected void beforeClusterProvisioning()
+ {
+ // The current CDC implementation cannot read 5.x commitlogs, so
verify Cassandra version is 4.x
+ SimpleCassandraVersion version =
SimpleCassandraVersion.create(testVersion.version());
+ assumeThat(version.major)
+ .as("Current CDC implementation cannot read 5.x commitlogs,
requires Cassandra 4.x")
+ .isEqualTo(4);
+ }
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .dcCount(1)
+ .nodesPerDc(1)
+ .additionalInstanceConfig(Map.of("cdc_enabled", true));
+ }
+
+ @Override
+ protected Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides()
+ {
+ return builder -> {
+ // Override service configuration to use specific port for CDC
tests
+ ServiceConfiguration existingConfig =
builder.build().serviceConfiguration();
+ ServiceConfiguration cdcServiceConfig =
ServiceConfigurationImpl.builder()
+
.host(existingConfig.host())
+
.port(9043) // TODO: Make this port dynamically allocated
+
.schemaKeyspaceConfiguration(existingConfig.schemaKeyspaceConfiguration())
+
.build();
+ builder.serviceConfiguration(cdcServiceConfig);
+
+ // Configure sidecar client for mTLS if enabled
+ SidecarClientConfiguration clientConfig =
mtlsTestHelper.createSidecarClientConfiguration();
+ if (clientConfig != null)
+ {
+ builder.sidecarClientConfiguration(clientConfig);
+ }
+ return builder;
+ };
+ }
+
+ @Override
+ protected void startSidecar(ICluster<? extends IInstance> cluster) throws
InterruptedException
+ {
+ AbstractModule cdcModule = new CdcTestModule();
+ serverWrapper = startSidecarWithInstances(cluster, cdcModule);
+ }
+
+ /**
+ * @return the TestCdcPublisher instance for test access
+ */
+ protected TestCdcPublisher getCdcPublisher()
+ {
+ return (TestCdcPublisher)
serverWrapper.injector.getInstance(CdcPublisher.class);
+ }
+
+ /**
+ * @return the TestCdcEventConsumer for test assertions
+ */
+ protected TestCdcEventConsumer getTestEventConsumer()
+ {
+ TestCdcPublisher publisher = getCdcPublisher();
+ return publisher != null ? publisher.getTestEventConsumer() : null;
+ }
+
+ /**
+ * CDC-specific Guice module that provides test implementations for CDC
components.
+ */
+ private static class CdcTestModule extends AbstractModule
+ {
+ @Provides
+ @Singleton
+ CdcPublisher cdcPublisher(Vertx vertx,
+ SidecarConfiguration sidecarConfiguration,
+ ExecutorPools executorPools,
+ ClusterConfigProvider clusterConfigProvider,
+ SchemaSupplier schemaSupplier,
+ CdcSidecarInstancesProvider
sidecarInstancesProvider,
+ SidecarCdcClient.ClientConfig clientConfig,
+ InstanceMetadataFetcher
instanceMetadataFetcher,
+ CdcConfig conf,
+ CdcDatabaseAccessor databaseAccessor,
+ ICdcStats cdcStats,
+ VirtualTablesDatabaseAccessor virtualTables,
+ SidecarCdcStats sidecarCdcStats,
+ Serializer<CdcEvent> avroSerializer,
+ TokenRingProvider tokenRingProvider)
+ {
+ return new TestCdcPublisher(vertx,
+ sidecarConfiguration,
+ executorPools,
+ clusterConfigProvider,
+ schemaSupplier,
+ sidecarInstancesProvider,
+ clientConfig,
+ instanceMetadataFetcher,
+ conf,
+ databaseAccessor,
+ cdcStats,
+ virtualTables,
+ sidecarCdcStats,
+ avroSerializer,
+ () -> new
ContentionFreeRangeManager(vertx, tokenRingProvider));
+ }
+
+ @Provides
+ @Singleton
+ public CdcConfig cdcConfig()
+ {
+ return new TestCdcConfig();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 3d369c5e..b6fc14e4 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -377,28 +377,21 @@ public class CdcModule extends AbstractModule
@Provides
@Singleton
- public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration
configuration)
- {
- return new CdcStatesSchema(configuration);
- }
-
- @ProvidesIntoMap
- @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
- PeriodicTask cdcPublisherTask(Vertx vertx,
- SidecarConfiguration sidecarConfiguration,
- ExecutorPools executorPools,
- ClusterConfigProvider clusterConfigProvider,
- SchemaSupplier schemaSupplier,
- CdcSidecarInstancesProvider
sidecarInstancesProvider,
- SidecarCdcClient.ClientConfig clientConfig,
- InstanceMetadataFetcher
instanceMetadataFetcher,
- CdcConfig conf,
- CdcDatabaseAccessor databaseAccessor,
- TokenRingProvider tokenRingProvider,
- ICdcStats cdcStats,
- VirtualTablesDatabaseAccessor virtualTables,
- SidecarCdcStats sidecarCdcStats,
- Serializer<CdcEvent> avroSerializer)
+ CdcPublisher cdcPublisher(Vertx vertx,
+ SidecarConfiguration sidecarConfiguration,
+ ExecutorPools executorPools,
+ ClusterConfigProvider clusterConfigProvider,
+ SchemaSupplier schemaSupplier,
+ CdcSidecarInstancesProvider
sidecarInstancesProvider,
+ SidecarCdcClient.ClientConfig clientConfig,
+ InstanceMetadataFetcher instanceMetadataFetcher,
+ CdcConfig conf,
+ CdcDatabaseAccessor databaseAccessor,
+ TokenRingProvider tokenRingProvider,
+ ICdcStats cdcStats,
+ VirtualTablesDatabaseAccessor virtualTables,
+ SidecarCdcStats sidecarCdcStats,
+ Serializer<CdcEvent> avroSerializer)
{
return new CdcPublisher(vertx,
sidecarConfiguration,
@@ -417,6 +410,20 @@ public class CdcModule extends AbstractModule
() -> new ContentionFreeRangeManager(vertx,
tokenRingProvider));
}
+ @Provides
+ @Singleton
+ public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration
configuration)
+ {
+ return new CdcStatesSchema(configuration);
+ }
+
+ @ProvidesIntoMap
+ @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
+ PeriodicTask cdcPublisherTask(CdcPublisher cdcPublisher)
+ {
+ return cdcPublisher;
+ }
+
@Singleton
@ProvidesIntoMap
@KeyClassMapKey(PeriodicTaskMapKeys.CdcConfigRefresherNotifierKey.class)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]