This is an automated email from the ASF dual-hosted git repository. jyothsnakonisa pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
commit 4c09917c0b630e3f1917eac0bb81dee30e12cbca Author: jkonisa <[email protected]> AuthorDate: Mon May 4 11:41:24 2026 -0700 CASSSIDECAR-455: Fix breaking changes for Analytics 0.4.0 Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSSIDECAR-455 --- CHANGES.txt | 1 + gradle.properties | 2 +- integration-framework/build.gradle | 1 + .../sidecar/testing/TestCdcPublisher.java | 50 ++-- .../cassandra/sidecar/cdc/CdcIntegrationTest.java | 3 + ...SharedClusterCdcSidecarIntegrationTestBase.java | 49 ++-- server/build.gradle | 4 + .../cassandra/bridge/CassandraBridgeFactory.java | 18 +- .../cassandra/sidecar/cdc/CachingSchemaStore.java | 63 +++-- .../cassandra/sidecar/cdc/CdcAvroSerializer.java | 42 --- .../apache/cassandra/sidecar/cdc/CdcConfig.java | 16 ++ .../apache/cassandra/sidecar/cdc/CdcPublisher.java | 199 +++++++-------- .../cassandra/sidecar/cdc/SidecarCdcOptions.java | 9 + .../cassandra/sidecar/modules/CdcModule.java | 104 ++++++-- .../cassandra/sidecar/cdc/CdcPublisherTests.java | 282 +++------------------ 15 files changed, 354 insertions(+), 489 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 90c2f10d..78c4e547 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Fix breaking changes for Analytics 0.4.0 (CASSSIDECAR-455) * Support IAM instance profile credentials for S3 restore jobs (CASSSIDECAR-415) * Adding endpoint for verifying files post data copy during live migration (CASSSIDECAR-226) * SAI support in Sidecar (CASSSIDECAR-422) diff --git a/gradle.properties b/gradle.properties index e7822d72..e9157f44 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,5 +46,5 @@ swaggerVersion=2.2.21 kryoVersion=4.0.2 # OSHI dependencies oshiVersion=6.9.0 -analyticsVersion=0.3.0 +analyticsVersion=0.4.0 kafkaClientVersion=3.7.0 diff --git a/integration-framework/build.gradle b/integration-framework/build.gradle index 29b9c942..e6edbe1c 100644 --- a/integration-framework/build.gradle +++ b/integration-framework/build.gradle @@ -70,6 +70,7 @@ dependencies { // CDC dependencies api(group: "org.apache.cassandra", name: "cassandra-analytics-cdc_spark3_2.12", version: "${project.analyticsVersion}") + api(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-codec_spark3_2.12", version: "${project.analyticsVersion}") api(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-sidecar_spark3_2.12", version: "${project.analyticsVersion}") api "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}" } diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java index 4388c3e6..a47135ff 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java @@ -19,24 +19,26 @@ package org.apache.cassandra.sidecar.testing; import com.google.inject.Provider; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; -import org.apache.cassandra.cdc.msg.CdcEvent; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; +import org.apache.cassandra.cdc.kafka.KafkaProducerFactory; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.sidecar.cdc.CachingSchemaStore; import org.apache.cassandra.sidecar.cdc.CdcConfig; import org.apache.cassandra.sidecar.cdc.CdcPublisher; import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.coordination.RangeManager; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; import org.apache.cassandra.sidecar.tasks.ScheduleDecision; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import org.apache.kafka.common.serialization.Serializer; + +import static org.mockito.Mockito.mock; /** * Test implementation of CdcPublisher that uses an in-memory event consumer @@ -49,30 +51,32 @@ public class TestCdcPublisher extends CdcPublisher private final CdcDatabaseAccessor databaseAccessor; public TestCdcPublisher(Vertx vertx, - SidecarConfiguration sidecarConfiguration, - ExecutorPools executorPools, - ClusterConfigProvider clusterConfigProvider, - SchemaSupplier schemaSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - InstanceMetadataFetcher instanceMetadataFetcher, - CdcConfig conf, - CdcDatabaseAccessor databaseAccessor, - ICdcStats cdcStats, - VirtualTablesDatabaseAccessor virtualTables, - SidecarCdcStats sidecarCdcStats, - Serializer<CdcEvent> avroSerializer, - Provider<RangeManager> rangeManagerProvider) + ExecutorPools executorPools, + ClusterConfigProvider clusterConfigProvider, + SchemaSupplier schemaSupplier, + InstanceMetadataFetcher instanceMetadataFetcher, + CdcConfig conf, + CdcDatabaseAccessor databaseAccessor, + ICdcStats cdcStats, + VirtualTablesDatabaseAccessor virtualTables, + SidecarCdcStats sidecarCdcStats, + Provider<RangeManager> rangeManagerProvider, + CassandraBridgeFactory cassandraBridgeFactory, + Provider<SidecarCdcClient> sidecarCdcClientProvider, + CdcOptions cdcOptions) { - super(vertx, sidecarConfiguration, executorPools, clusterConfigProvider, - schemaSupplier, sidecarInstancesProvider, clientConfig, - instanceMetadataFetcher, conf, databaseAccessor, cdcStats, - virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider); + super(vertx, executorPools, clusterConfigProvider, + schemaSupplier, instanceMetadataFetcher, conf, databaseAccessor, cdcStats, + virtualTables, sidecarCdcStats, rangeManagerProvider, + cassandraBridgeFactory, sidecarCdcClientProvider, + mock(CachingSchemaStore.class), + mock(KafkaProducerFactory.class), + cdcOptions); this.databaseAccessor = databaseAccessor; } @Override - public EventConsumer eventConsumer(CdcConfig conf, Serializer<CdcEvent> avroSerializer) + public EventConsumer eventConsumer(CdcConfig conf) { return testEventConsumer; } diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java index 961a5224..ed228c2b 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java @@ -75,6 +75,9 @@ public class CdcIntegrationTest extends SharedClusterCdcSidecarIntegrationTestBa expectedMutations.put(i, i); } + // Seal the active commit log segment so CDC can find the mutations in cdc_raw + cluster.getFirstRunningInstance().flush(CDC_TEST_TABLE.keyspace()); + TestCdcEventConsumer consumer = getTestEventConsumer(); waitUntil(() -> consumer.getEvents().size() >= mutationCount, 120, 1000); assertThat(consumer.getEvents().size()) diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java index 56b68df2..f1f0c71e 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java @@ -24,12 +24,13 @@ import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import com.google.inject.AbstractModule; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.Singleton; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.SchemaSupplier; -import org.apache.cassandra.cdc.msg.CdcEvent; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.stats.ICdcStats; @@ -41,7 +42,6 @@ import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager; @@ -52,7 +52,6 @@ import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import org.apache.kafka.common.serialization.Serializer; import static org.assertj.core.api.Assumptions.assumeThat; @@ -61,11 +60,23 @@ import static org.assertj.core.api.Assumptions.assumeThat; * CDC-specific configuration and setup, including: * - CDC-enabled Cassandra cluster configuration * - TestCdcPublisher with TestCdcEventConsumer - * - Cassandra 4.1 version requirement + * - Cassandra 4.0 through 5.0 version support (analytics 0.4.0 does not support 5.1+) * - Helper methods to access CDC components */ public abstract class SharedClusterCdcSidecarIntegrationTestBase extends SharedClusterIntegrationTestBase { + // Analytics 0.4.0 supports up to Cassandra 5.0 (majorVersion=50). Cassandra 5.1+ requires a newer analytics version. + private static final SimpleCassandraVersion MAX_SUPPORTED_CDC_VERSION = SimpleCassandraVersion.create("5.0.99"); + + @Override + protected void beforeClusterProvisioning() + { + SimpleCassandraVersion version = SimpleCassandraVersion.create(testVersion.version()); + assumeThat(version) + .as("CDC is not supported for Cassandra %s; analytics 0.4.0 supports up to 5.0.x", version) + .isLessThanOrEqualTo(MAX_SUPPORTED_CDC_VERSION); + } + @AfterEach void cleanupCdcConsumerAfterEachTest() { @@ -80,16 +91,6 @@ public abstract class SharedClusterCdcSidecarIntegrationTestBase extends SharedC } } - @Override - protected void beforeClusterProvisioning() - { - // The current CDC implementation cannot read 5.x commitlogs, so verify Cassandra version is 4.x - SimpleCassandraVersion version = SimpleCassandraVersion.create(testVersion.version()); - assumeThat(version.major) - .as("Current CDC implementation cannot read 5.x commitlogs, requires Cassandra 4.x") - .isEqualTo(4); - } - @Override protected ClusterBuilderConfiguration testClusterConfiguration() { @@ -154,37 +155,35 @@ public abstract class SharedClusterCdcSidecarIntegrationTestBase extends SharedC @Provides @Singleton CdcPublisher cdcPublisher(Vertx vertx, - SidecarConfiguration sidecarConfiguration, ExecutorPools executorPools, ClusterConfigProvider clusterConfigProvider, SchemaSupplier schemaSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, InstanceMetadataFetcher instanceMetadataFetcher, CdcConfig conf, CdcDatabaseAccessor databaseAccessor, ICdcStats cdcStats, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, - Serializer<CdcEvent> avroSerializer, - TokenRingProvider tokenRingProvider) + TokenRingProvider tokenRingProvider, + CassandraBridgeFactory cassandraBridgeFactory, + Provider<SidecarCdcClient> sidecarCdcClientProvider, + CdcOptions cdcOptions) { RangeManager rangeManager = new ContentionFreeRangeManager(vertx, tokenRingProvider); return new TestCdcPublisher(vertx, - sidecarConfiguration, executorPools, clusterConfigProvider, schemaSupplier, - sidecarInstancesProvider, - clientConfig, instanceMetadataFetcher, conf, databaseAccessor, cdcStats, virtualTables, sidecarCdcStats, - avroSerializer, - () -> rangeManager); + () -> rangeManager, + cassandraBridgeFactory, + sidecarCdcClientProvider, + cdcOptions); } @Provides diff --git a/server/build.gradle b/server/build.gradle index 8b2ae702..ba71370c 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -151,6 +151,10 @@ dependencies { implementation(group: "org.apache.cassandra", name: "cassandra-avro-converter_spark3_2.12", version: "${[project.analyticsVersion]}") implementation(group: "org.apache.cassandra", name: "cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}") implementation(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-sidecar_spark3_2.12", version: "${[project.analyticsVersion]}") + // compileOnly: CdcModule directly instantiates SidecarCdcClient, requiring sidecar-client types at + // compile time. The jar is already on the runtime classpath transitively via cdc-sidecar, so + // implementation would duplicate it. + compileOnly(group: "org.apache.cassandra", name: "cassandra-analytics-sidecar-client", version: "${[project.analyticsVersion]}") implementation "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}" implementation "com.esotericsoftware:kryo-shaded:${kryoVersion}" diff --git a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java index 9c99430a..61b88431 100644 --- a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java +++ b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java @@ -21,11 +21,8 @@ package org.apache.cassandra.bridge; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.MalformedURLException; -import java.net.URL; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -119,21 +116,8 @@ public class CassandraBridgeFactory public ClassLoader buildClassLoader(String... resourceNames) { - URL[] urls = Arrays.stream(resourceNames) - .map(BaseCassandraBridgeFactory::copyClassResourceToFile) - .map(jar -> { - try - { - return jar.toURI().toURL(); - } - catch (MalformedURLException e) - { - throw new RuntimeException(e); - } - }).toArray(URL[]::new); - return AccessController.doPrivileged((PrivilegedAction<ClassLoader>) () -> - new PostDelegationClassLoader(urls, Thread.currentThread().getContextClassLoader())); + BaseCassandraBridgeFactory.buildClassLoader(resourceNames)); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java index 8795d2ed..e57aa4e9 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java @@ -39,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.cassandra.cdc.avro.AvroSchemaUtils; import org.apache.cassandra.cdc.avro.AvroSchemas; import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; import org.apache.cassandra.cdc.kafka.KafkaOptions; @@ -56,8 +57,21 @@ import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_ import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED; /** - * Schemas cache to be used by CDC event serialization. It contains a map of table schemas - * using TableIdentifier as key. + * Schemas cache for CDC event serialization, keyed by TableIdentifier. + * + * <p>{@link #getSchema}, {@link #getWriter}, and {@link #getReader} all return the + * <em>payload</em> (table-column) Avro schema — not the CDC envelope. Every caller in the + * analytics codec uses these to build and encode per-row payload records (column field lookup, + * range predicate encoding, byte-record encoding). Returning the merged envelope schema here + * would break those callers because table columns are not top-level fields of the envelope. + * + * <p>The merged CDC envelope schema is constructed on-the-fly in {@link #publishSchemas} and + * is never stored in the cache. The sidecar remains the authoritative source for building the + * merged schema (via {@link #buildMergedSchema}); it is just not exposed through + * {@link SchemaStore#getSchema}. + * + * <p>Schema version UUIDs ({@link #getVersion}) are derived from the CQL {@code CREATE TABLE} + * statement, consistent with the analytics {@code CachingSchemaStore} implementation. */ @Singleton public class CachingSchemaStore implements SchemaStore @@ -91,13 +105,17 @@ public class CachingSchemaStore implements SchemaStore this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor; this.sidecarSchema = sidecarSchema; this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter; - this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables())); - AvroSchemas.registerLogicalTypes(); - cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged); this.vertx = vertx; this.cdcConfig = cdcConfig; this.sidecarCdcStats = sidecarCdcStats; this.schemaStorePublisherFactory = schemaStorePublisherFactory; + // cdcConfig and schemaStorePublisherFactory must be assigned before the calls below. + // createSchemaCache and addSchemaChangeListener can fire onSchemaChanged synchronously, + // which calls loadPublisher() — accessing both fields — and would throw NPE if they + // were still null at that point. + this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables())); + AvroSchemas.registerLogicalTypes(); + cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged); if (cdcConfig.cdcEnabled()) { @@ -146,20 +164,32 @@ public class CachingSchemaStore implements SchemaStore TableIdentifier tableIdentifier = TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()); avroSchemasCache.compute(tableIdentifier, (k, v) -> { + Schema payloadSchema = cqlToAvroSchemaConverter.convert(cqlTable); if (null != publisher) { - Schema schema = cqlToAvroSchemaConverter.convert(cqlTable); + // Merged schema is only needed for Kafka publishing — build on-the-fly, + // not stored in the cache entry (getSchema/getWriter/getReader use payload). + Schema mergedSchema = buildMergedSchema(payloadSchema, cqlTable); TableSchemaPublisher.SchemaPublishMetadata metadata = new TableSchemaPublisher.SchemaPublishMetadata(); metadata.put(METADATA_NAME_KEY, cqlTable.table()); metadata.put(METADATA_NAMESPACE_KEY, cqlTable.keyspace()); - publisher.publishSchema(schema.toString(false), metadata); + publisher.publishSchema(mergedSchema.toString(false), metadata); sidecarCdcStats.capturePublishedSchema(); } - return new SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter); + return new SchemaCacheEntry(cqlTable, payloadSchema); }); } } + private Schema buildMergedSchema(Schema payloadSchema, CqlTable cqlTable) + { + String prefix = cdcConfig.schemaNamespacePrefix(); + String namespace = prefix.isEmpty() + ? cqlTable.keyspace() + : prefix + '.' + cqlTable.keyspace(); + return AvroSchemaUtils.buildMergedSchema(payloadSchema, cqlTable.table(), namespace); + } + @VisibleForTesting void onSchemaChanged() { @@ -176,7 +206,7 @@ public class CachingSchemaStore implements SchemaStore tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), cqlTable.table(), cqlTable.createStatement()); } LOGGER.info("Re-generating Avro Schema after schema change keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table()); - return new SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter); + return new SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter.convert(cqlTable)); } return v; }); @@ -186,7 +216,9 @@ public class CachingSchemaStore implements SchemaStore // Remove any old schema entries for deleted tables, this operation can be done in the end as this is // only for removing stale entries and no one is going to use these entries once the table is removed. // This doesn't have to be an atomic operation. - avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream().map(cqlTable -> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table())).collect(Collectors.toList())); + avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream() + .map(cqlTable -> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table())) + .collect(Collectors.toList())); vertx.eventBus().publish(ON_CDC_CACHE_WARMED_UP.address(), "Cdc cache warmed up"); } @@ -236,7 +268,7 @@ public class CachingSchemaStore implements SchemaStore return cdcTables.stream() .collect(Collectors.toMap(cqlTable -> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()), - cqlTable -> new SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter)) + cqlTable -> new SchemaCacheEntry(cqlTable, cqlToAvroSchemaConverter.convert(cqlTable))) ); } @@ -254,14 +286,13 @@ public class CachingSchemaStore implements SchemaStore private final GenericDatumWriter<GenericRecord> writer; private final GenericDatumReader<GenericRecord> reader; - private SchemaCacheEntry(CqlTable table, - CqlToAvroSchemaConverter cqlToAvroSchemaConverter) + private SchemaCacheEntry(CqlTable table, Schema payloadSchema) { this.table = table; - this.schema = cqlToAvroSchemaConverter.convert(table); + this.schema = payloadSchema; this.schemaUuid = UUID.nameUUIDFromBytes(table.createStatement().getBytes(StandardCharsets.UTF_8)).toString(); - this.writer = new GenericDatumWriter<>(schema); - this.reader = new GenericDatumReader<>(schema); + this.writer = new GenericDatumWriter<>(payloadSchema); + this.reader = new GenericDatumReader<>(payloadSchema); } public String tableSchema() diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java deleted file mode 100644 index 1acc7c4b..00000000 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.sidecar.cdc; - -import org.apache.cassandra.bridge.CassandraBridgeFactory; -import org.apache.cassandra.cdc.TypeCache; -import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer; -import org.apache.cassandra.cdc.schemastore.SchemaStore; -import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; - -/** - * Serializer to convert Cassandra CDC events into Avro GenericRecord objects. - */ -public class CdcAvroSerializer extends AvroGenericRecordSerializer -{ - public CdcAvroSerializer(SchemaStore schemaStore, - InstanceMetadataFetcher instanceMetadataFetcher, - CassandraBridgeFactory cassandraBridgeFactory) - { - super(schemaStore, key -> - TypeCache.get(cassandraBridgeFactory - .get(instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> - instance.delegate().nodeSettings()).releaseVersion()).getVersion()) - .getType(key.keyspace, key.type), "org.apache.cassandra"); - } -} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java index 776d494b..18014d8d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java @@ -138,6 +138,22 @@ public interface CdcConfig */ MillisecondBoundConfiguration persistDelay(); + /** + * Returns the namespace prefix used when building Avro schemas for CDC events. + * This is an extension point for schema registry integration: implementations that + * register schemas with a shared registry should override this to return a globally + * unique prefix so that schema names do not collide across clusters or services. + * The prefix is combined with the Cassandra keyspace to form the Avro schema namespace: + * {@code <prefix>.<keyspace>}. + * + * <p>Defaults to empty string, in which case the keyspace name is used directly as + * the Avro namespace. + */ + default String schemaNamespacePrefix() + { + return ""; + } + /** * Topic format */ diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java index 75aa6667..2650bd7d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java @@ -18,14 +18,9 @@ package org.apache.cassandra.sidecar.cdc; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -33,35 +28,29 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.CdcLogMode; +import org.apache.cassandra.cdc.TypeCache; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; +import org.apache.cassandra.cdc.kafka.KafkaProducerFactory; import org.apache.cassandra.cdc.kafka.KafkaPublisher; import org.apache.cassandra.cdc.kafka.TopicSupplier; -import org.apache.cassandra.cdc.msg.CdcEvent; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; -import org.apache.cassandra.cdc.sidecar.SidecarCdc; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.secrets.SecretsProvider; -import org.apache.cassandra.secrets.SslConfig; -import org.apache.cassandra.secrets.SslConfigSecretsProvider; import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; -import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; -import org.apache.cassandra.sidecar.config.SslConfiguration; import org.apache.cassandra.sidecar.coordination.RangeManager; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.tasks.ScheduleDecision; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED; @@ -85,50 +74,51 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask private final VirtualTablesDatabaseAccessor virtualTables; private final SidecarCdcStats sidecarCdcStats; private final SchemaSupplier schemaSupplier; - private final CdcSidecarInstancesProvider sidecarInstancesProvider; private final InstanceMetadataFetcher instanceMetadataFetcher; private final ClusterConfigProvider clusterConfigProvider; - private final SidecarCdcClient.ClientConfig clientConfig; private final ICdcStats cdcStats; - private final SidecarConfiguration sidecarConfiguration; private CdcManager cdcManager; - private final Serializer<CdcEvent> avroSerializer; private final Provider<RangeManager> rangeManagerProvider; - KafkaProducer<String, byte[]> producer; - KafkaPublisher kafkaPublisher; + private final CassandraBridgeFactory cassandraBridgeFactory; + private final CachingSchemaStore schemaStore; + private KafkaPublisher<?> kafkaPublisher; + private final Provider<SidecarCdcClient> sidecarCdcClientProvider; + private final KafkaProducerFactory kafkaProducerFactory; + private final CdcOptions cdcOptions; @Inject public CdcPublisher(Vertx vertx, - SidecarConfiguration sidecarConfiguration, ExecutorPools executorPools, ClusterConfigProvider clusterConfigProvider, SchemaSupplier schemaSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, InstanceMetadataFetcher instanceMetadataFetcher, CdcConfig conf, CdcDatabaseAccessor databaseAccessor, ICdcStats cdcStats, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, - Serializer<CdcEvent> avroSerializer, - Provider<RangeManager> rangeManagerProvider) + Provider<RangeManager> rangeManagerProvider, + CassandraBridgeFactory cassandraBridgeFactory, + Provider<SidecarCdcClient> sidecarCdcClientProvider, + CachingSchemaStore schemaStore, + KafkaProducerFactory kafkaProducerFactory, + CdcOptions cdcOptions) { this.sidecarCdcStats = sidecarCdcStats; this.executorPools = executorPools.internal(); this.conf = conf; this.databaseAccessor = databaseAccessor; this.virtualTables = virtualTables; - this.schemaSupplier = schemaSupplier; - this.sidecarInstancesProvider = sidecarInstancesProvider; this.instanceMetadataFetcher = instanceMetadataFetcher; this.clusterConfigProvider = clusterConfigProvider; - this.clientConfig = clientConfig; this.cdcStats = cdcStats; - this.sidecarConfiguration = sidecarConfiguration; - this.avroSerializer = avroSerializer; this.rangeManagerProvider = rangeManagerProvider; + this.cassandraBridgeFactory = cassandraBridgeFactory; + this.sidecarCdcClientProvider = sidecarCdcClientProvider; + this.schemaStore = schemaStore; + this.kafkaProducerFactory = kafkaProducerFactory; + this.cdcOptions = cdcOptions; if (conf.cdcEnabled()) { @@ -141,56 +131,23 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask } } - public SecretsProvider secretsProvider() + public EventConsumer eventConsumer(CdcConfig conf) { - SslConfiguration sslConfiguration = sidecarConfiguration.sidecarClientConfiguration().sslConfiguration(); - - if (sslConfiguration == null || !sslConfiguration.enabled()) - { - return null; - } - - Map<String, String> sslConfigMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - - if (sslConfiguration.isKeystoreConfigured()) - { - KeyStoreConfiguration keystore = sslConfiguration.keystore(); - sslConfigMap.put(SslConfig.KEYSTORE_PATH, keystore.path()); - sslConfigMap.put(SslConfig.KEYSTORE_PASSWORD, keystore.password()); - sslConfigMap.put(SslConfig.KEYSTORE_TYPE, keystore.type()); - } - - if (sslConfiguration.isTrustStoreConfigured()) - { - KeyStoreConfiguration truststore = sslConfiguration.truststore(); - sslConfigMap.put(SslConfig.TRUSTSTORE_PATH, truststore.path()); - sslConfigMap.put(SslConfig.TRUSTSTORE_PASSWORD, truststore.password()); - sslConfigMap.put(SslConfig.TRUSTSTORE_TYPE, truststore.type()); - } - - SslConfig sslConfig = SslConfig.create(sslConfigMap); - return new SslConfigSecretsProvider(sslConfig); - } - - public EventConsumer eventConsumer(CdcConfig conf, - Serializer<CdcEvent> avroSerializer) - { - if (this.producer != null) - { - this.producer.close(); - } - if (this.kafkaPublisher != null) - { - this.kafkaPublisher.close(); - } - this.producer = new KafkaProducer<>(conf.kafkaConfigs()); - this.kafkaPublisher = new KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()), - producer, - avroSerializer, - conf.maxRecordSizeBytes(), - conf.failOnRecordTooLargeError(), - conf.failOnKafkaError(), - CdcLogMode.FULL); + CassandraVersion version = cassandraBridgeFactory.get( + instanceMetadataFetcher.callOnFirstAvailableInstance(instance -> + instance.delegate().nodeSettings()).releaseVersion() + ).getVersion(); + this.kafkaPublisher = KafkaPublisher.create(version, + TopicSupplier.staticTopicSupplier(conf.kafkaTopic()), + conf.kafkaConfigs(), + kafkaProducerFactory, + schemaStore, + key -> TypeCache.get(version).getType(key.keyspace, key.type), + conf.schemaNamespacePrefix(), + conf.maxRecordSizeBytes(), + conf.failOnRecordTooLargeError(), + conf.failOnKafkaError(), + CdcLogMode.FULL); return new CdcEventConsumer(kafkaPublisher); } @@ -216,27 +173,38 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask } databaseAccessor.session(); - cdcManager = new CdcManager(eventConsumer(conf, avroSerializer), - schemaSupplier, - conf, - rangeManagerProvider.get(), - instanceMetadataFetcher, - clusterConfigProvider, - sidecarInstancesProvider, - secretsProvider(), - clientConfig, - cdcStats, - this.executorPools, - databaseAccessor); - - List<SidecarCdc> consumers = cdcManager.buildCdcConsumers(); - cdcManager.startConsumers(); - LOGGER.info("{} CDC iterators started successfully", consumers.size()); - isRunning = true; - sidecarCdcStats.captureCdcStarted(consumers.size()); + try + { + cdcManager = new CdcManager(eventConsumer(conf), + schemaSupplier, + conf, + rangeManagerProvider.get(), + instanceMetadataFetcher, + clusterConfigProvider, + sidecarCdcClientProvider.get(), + cdcStats, + this.executorPools, + databaseAccessor, + cdcOptions); + int consumerCount = cdcManager.buildCdcConsumers().size(); + cdcManager.startConsumers(); + LOGGER.info("{} CDC iterators started successfully", consumerCount); + isRunning = true; + sidecarCdcStats.captureCdcStarted(consumerCount); + } + catch (Exception e) + { + LOGGER.error("Failed to start CDC consumers, cleaning up resources", e); + if (cdcManager != null) + { + cdcManager.stopConsumers(); + } + closeKafkaResources(); + throw e; + } } - protected synchronized void restart() + private synchronized void restart() { try { @@ -258,7 +226,7 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask return isRunning; } - public synchronized void stop() + private synchronized void stop() { if (!isRunning) { @@ -279,6 +247,23 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask { isRunning = false; isInitialized = false; + closeKafkaResources(); + } + } + + private void closeKafkaResources() + { + if (kafkaPublisher != null) + { + try + { + kafkaPublisher.close(); + } + catch (Exception e) + { + LOGGER.warn("Error closing KafkaPublisher", e); + } + kafkaPublisher = null; } } @@ -384,8 +369,16 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask @Override public void execute(Promise<Void> promise) { - run(); - promise.complete(); + try + { + run(); + promise.complete(); + } + catch (Exception e) + { + LOGGER.error("CDC run failed", e); + promise.fail(e); + } } @Override diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java index a2968719..10dcac16 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java @@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.cdc; import java.util.Map; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.data.ReplicationFactor; @@ -50,4 +51,12 @@ public class SidecarCdcOptions implements CdcOptions { return instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings().datacenter()); } + + @Override + public CassandraVersion version() + { + String releaseVersion = instanceMetadataFetcher.callOnFirstAvailableInstance( + instance -> instance.delegate().nodeSettings().releaseVersion()); + return CassandraVersion.fromVersion(releaseVersion).orElse(CassandraVersion.FOURZERO); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java index fec382a4..303ce488 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java @@ -18,7 +18,15 @@ package org.apache.cassandra.sidecar.modules; +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.inject.AbstractModule; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.multibindings.ProvidesIntoMap; @@ -28,24 +36,27 @@ import jakarta.ws.rs.GET; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; -import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.kafka.KafkaProducerFactory; import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory; import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.stats.CdcStats; import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.secrets.SecretsProvider; import org.apache.cassandra.sidecar.cdc.CachingSchemaStore; -import org.apache.cassandra.sidecar.cdc.CdcAvroSerializer; import org.apache.cassandra.sidecar.cdc.CdcConfig; import org.apache.cassandra.sidecar.cdc.CdcConfigImpl; import org.apache.cassandra.sidecar.cdc.CdcDynamicSidecarInstancesProvider; import org.apache.cassandra.sidecar.cdc.CdcLogCache; import org.apache.cassandra.sidecar.cdc.CdcPublisher; import org.apache.cassandra.sidecar.cdc.CdcSchemaSupplier; +import org.apache.cassandra.sidecar.cdc.SidecarCdcOptions; import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; +import org.apache.cassandra.sidecar.cdc.SidecarClientSecretsProvider; import org.apache.cassandra.sidecar.cdc.SidecarClusterConfigProvider; import org.apache.cassandra.sidecar.cdc.SidecarCqlToAvroSchemaConverter; import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; @@ -58,6 +69,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SslConfiguration; import org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider; import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager; import org.apache.cassandra.sidecar.coordination.DynamicSidecarInstancesProvider; @@ -97,19 +109,22 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.SidecarClientProvider; import org.apache.cassandra.sidecar.utils.TokenSplitUtil; -import org.apache.kafka.common.serialization.Serializer; import org.eclipse.microprofile.openapi.annotations.Operation; import org.eclipse.microprofile.openapi.annotations.enums.SchemaType; import org.eclipse.microprofile.openapi.annotations.media.Content; import org.eclipse.microprofile.openapi.annotations.media.Schema; import org.eclipse.microprofile.openapi.annotations.responses.APIResponse; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP; + /** * Provides Cassandra change-data capture (CDC) publishing capability */ @Path("/") public class CdcModule extends AbstractModule { + private static final Logger LOGGER = LoggerFactory.getLogger(CdcModule.class); + @ProvidesIntoMap @KeyClassMapKey(PeriodicTaskMapKeys.SidecarPeerHealthMonitorTaskKey.class) PeriodicTask sidecarPeerHealthMonitorTask(SidecarPeerHealthMonitorTask task) @@ -133,7 +148,9 @@ public class CdcModule extends AbstractModule SidecarConfiguration configuration, CassandraBridgeFactory cassandraBridgeFactory) { - return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, databaseAccessor, driverUnsupportedSchemaCache, configuration, cassandraBridgeFactory); + return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, databaseAccessor, + driverUnsupportedSchemaCache, configuration, + cassandraBridgeFactory); } @ProvidesIntoMap @@ -330,11 +347,9 @@ public class CdcModule extends AbstractModule @Provides @Singleton - public Serializer<CdcEvent> getSerializer(CachingSchemaStore schemaStore, - InstanceMetadataFetcher instanceMetadataFetcher, - CassandraBridgeFactory cassandraBridgeFactory) + public KafkaProducerFactory kafkaProducerFactory() { - return new CdcAvroSerializer(schemaStore, instanceMetadataFetcher, cassandraBridgeFactory); + return KafkaProducerFactory.DEFAULT; } @Provides @@ -379,6 +394,50 @@ public class CdcModule extends AbstractModule sidecarClientConfiguration.retryDelay().toIntMillis()); } + @Provides + @Singleton + public SidecarCdcClient sidecarCdcClient(Vertx vertx, + SidecarCdcClient.ClientConfig clientConfig, + CdcSidecarInstancesProvider cdcSidecarInstancesProvider, + @Nullable SecretsProvider secretsProvider, + ICdcStats cdcStats) + { + try + { + SidecarCdcClient sidecarCdcClient = new SidecarCdcClient(clientConfig, cdcSidecarInstancesProvider, secretsProvider, cdcStats); + vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), message -> { + try + { + sidecarCdcClient.close(); + } + catch (Exception e) + { + LOGGER.warn("Error closing SidecarCdcClient", e); + } + }); + return sidecarCdcClient; + } + catch (IOException e) + { + throw new RuntimeException("Failed to create SidecarCdcClient", e); + } + } + + @Nullable + @Provides + @Singleton + public SecretsProvider secretsProvider(SidecarConfiguration sidecarConfiguration) + { + SslConfiguration sslConfiguration = sidecarConfiguration.sidecarClientConfiguration().sslConfiguration(); + + if (sslConfiguration == null || !sslConfiguration.enabled()) + { + return null; + } + + return new SidecarClientSecretsProvider(sidecarConfiguration); + } + @Provides @Singleton RangeManager rangeManager(Vertx vertx, TokenRingProvider tokenRingProvider) @@ -389,36 +448,45 @@ public class CdcModule extends AbstractModule @Provides @Singleton CdcPublisher cdcPublisher(Vertx vertx, - SidecarConfiguration sidecarConfiguration, ExecutorPools executorPools, ClusterConfigProvider clusterConfigProvider, SchemaSupplier schemaSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, InstanceMetadataFetcher instanceMetadataFetcher, CdcConfig conf, CdcDatabaseAccessor databaseAccessor, ICdcStats cdcStats, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, - Serializer<CdcEvent> avroSerializer, - RangeManager rangeManager) + RangeManager rangeManager, + CassandraBridgeFactory cassandraBridgeFactory, + Provider<SidecarCdcClient> sidecarCdcClientProvider, + CachingSchemaStore schemaStore, + KafkaProducerFactory kafkaProducerFactory, + CdcOptions cdcOptions) { return new CdcPublisher(vertx, - sidecarConfiguration, executorPools, clusterConfigProvider, schemaSupplier, - sidecarInstancesProvider, - clientConfig, instanceMetadataFetcher, conf, databaseAccessor, cdcStats, virtualTables, sidecarCdcStats, - avroSerializer, - () -> rangeManager); + () -> rangeManager, + cassandraBridgeFactory, + sidecarCdcClientProvider, + schemaStore, + kafkaProducerFactory, + cdcOptions); + } + + @Provides + @Singleton + public CdcOptions cdcOptions(InstanceMetadataFetcher instanceMetadataFetcher) + { + return new SidecarCdcOptions(instanceMetadataFetcher); } @Provides diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java index 4665e555..490d6352 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java @@ -18,41 +18,41 @@ package org.apache.cassandra.sidecar.cdc; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.google.inject.Provider; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; -import org.apache.cassandra.cdc.msg.CdcEvent; -import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; +import org.apache.cassandra.cdc.kafka.KafkaProducerFactory; import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.secrets.SecretsProvider; -import org.apache.cassandra.secrets.SslConfigSecretsProvider; -import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; -import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; -import org.apache.cassandra.sidecar.config.SslConfiguration; import org.apache.cassandra.sidecar.coordination.RangeManager; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -72,10 +72,6 @@ public class CdcPublisherTests @Mock private SchemaSupplier schemaSupplier; @Mock - private CdcSidecarInstancesProvider sidecarInstancesProvider; - @Mock - private SidecarCdcClient.ClientConfig clientConfig; - @Mock private InstanceMetadataFetcher instanceMetadataFetcher; @Mock private CdcDatabaseAccessor databaseAccessor; @@ -86,11 +82,18 @@ public class CdcPublisherTests @Mock private SidecarCdcStats sidecarCdcStats; @Mock - private Serializer<CdcEvent> avroSerializer; + private KafkaProducerFactory kafkaProducerFactory; + @Mock + private CachingSchemaStore schemaStore; @Mock private Provider<RangeManager> rangeManager; + @Mock + private CassandraBridgeFactory cassandraBridgeFactory; + @Mock + private SidecarCdcClient sidecarCdcClient; + @Mock + private CdcOptions cdcOptions; - private SidecarConfiguration sidecarConfiguration; private CdcConfig cdcConfig; private CdcPublisher cdcPublisher; @@ -99,8 +102,6 @@ public class CdcPublisherTests { MockitoAnnotations.openMocks(this); - // Mock deep stubs for complex configuration objects - sidecarConfiguration = mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS); cdcConfig = mock(CdcConfig.class, RETURNS_DEEP_STUBS); // Mock ExecutorPools behavior @@ -111,204 +112,22 @@ public class CdcPublisherTests cdcPublisher = new CdcPublisher( vertx, - sidecarConfiguration, executorPools, clusterConfigProvider, schemaSupplier, - sidecarInstancesProvider, - clientConfig, instanceMetadataFetcher, cdcConfig, databaseAccessor, cdcStats, virtualTables, sidecarCdcStats, - avroSerializer, - rangeManager - ); - } - - - @Test - void testSecretsProviderReturnsNullWhenSslDisabled() - { - SslConfiguration sslConfig = mock(SslConfiguration.class); - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - when(sslConfig.enabled()).thenReturn(false); - - SecretsProvider result = cdcPublisher.secretsProvider(); - - assertThat(result).isNull(); - } - - @Test - void testSecretsProviderWithSslEnabledNoKeystoreNoTruststore() - { - SslConfiguration sslConfig = mockSslConfiguration( - true, // enabled - true, // preferOpenSSL - "REQUIRED", // clientAuth - Arrays.asList("TLS_RSA_128"), // cipherSuites - Arrays.asList("TLSv1.2"), // secureTransportProtocols - "10s", // handshakeTimeout - false, // keystoreConfigured - false // truststoreConfigured - ); - - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - - SecretsProvider result = cdcPublisher.secretsProvider(); - - assertThat(result).isNotNull(); - } - - @Test - void testSecretsProviderWithKeystoreOnly() - { - KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( - "/path/to/keystore.jks", - "keystorePassword", - "JKS" + rangeManager, + cassandraBridgeFactory, + () -> sidecarCdcClient, + schemaStore, + kafkaProducerFactory, + cdcOptions ); - - SslConfiguration sslConfig = mockSslConfiguration( - true, - false, - "OPTIONAL", - Arrays.asList("TLS_RSA_256"), - Arrays.asList("TLSv1.3"), - "15s", - true, - false - ); - - when(sslConfig.keystore()).thenReturn(keystoreConfig); - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - - SecretsProvider result = cdcPublisher.secretsProvider(); - - assertThat(result).isNotNull(); - assertThat(result.keyStoreType()).isEqualTo("JKS"); - assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray()); - } - - @Test - void testSecretsProviderWithTruststoreOnly() - { - // SslConfig validation requires keystore password to always be provided - // This test validates that truststore-only configuration is rejected - KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( - "/path/to/truststore.jks", - "truststorePassword", - "PKCS12" - ); - - SslConfiguration sslConfig = mockSslConfiguration( - true, - true, - "NONE", - Collections.emptyList(), - Arrays.asList("TLSv1.2", "TLSv1.3"), - "20s", - false, - true - ); - - when(sslConfig.truststore()).thenReturn(truststoreConfig); - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - - // SslConfig.create() validates and requires keystore password when any SSL config is provided - IllegalArgumentException exception = org.junit.jupiter.api.Assertions.assertThrows( - IllegalArgumentException.class, - () -> cdcPublisher.secretsProvider() - ); - - assertThat(exception.getMessage()).contains("KEYSTORE_PASSWORD"); - } - - @Test - void testSecretsProviderWithBothKeystoreAndTruststore() - { - KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( - "/path/to/keystore.p12", - "keystorePass123", - "PKCS12" - ); - - KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( - "/path/to/truststore.p12", - "truststorePass456", - "PKCS12" - ); - - SslConfiguration sslConfig = mockSslConfiguration( - true, - true, - "REQUIRED", - Arrays.asList("TLS_ECDHE_RSA", "TLS_AES_256"), - Arrays.asList("TLSv1.2", "TLSv1.3"), - "30s", - true, - true - ); - - when(sslConfig.keystore()).thenReturn(keystoreConfig); - when(sslConfig.truststore()).thenReturn(truststoreConfig); - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - - SecretsProvider result = cdcPublisher.secretsProvider(); - - assertThat(result).isNotNull(); - assertThat(result.keyStoreType()).isEqualTo("PKCS12"); - assertThat(result.keyStorePassword()).isEqualTo("keystorePass123".toCharArray()); - assertThat(result.trustStoreType()).isEqualTo("PKCS12"); - assertThat(result.trustStorePassword()).isEqualTo("truststorePass456".toCharArray()); - } - - @Test - void testSecretsProviderUsesCorrectSslConfigKeys() - { - // This test validates that CdcPublisher uses SslConfig constants with MapUtils.lowerCaseKey() - KeyStoreConfiguration keystoreConfig = mockKeystoreConfiguration( - "/path/to/keystore.jks", - "keystorePassword", - "JKS" - ); - - KeyStoreConfiguration truststoreConfig = mockKeystoreConfiguration( - "/path/to/truststore.jks", - "truststorePassword", - "PKCS12" - ); - - SslConfiguration sslConfig = mockSslConfiguration( - true, - false, - "REQUIRED", - Collections.emptyList(), - Arrays.asList("TLSv1.2"), - "10s", - true, - true - ); - - when(sslConfig.keystore()).thenReturn(keystoreConfig); - when(sslConfig.truststore()).thenReturn(truststoreConfig); - when(sidecarConfiguration.sidecarClientConfiguration().sslConfiguration()).thenReturn(sslConfig); - - SecretsProvider result = cdcPublisher.secretsProvider(); - - // Validate that the SecretsProvider was created successfully using the correct keys - assertThat(result).isNotNull(); - assertThat(result).isInstanceOf(SslConfigSecretsProvider.class); - - // Verify keystore configuration is accessible - assertThat(result.keyStoreType()).isEqualTo("JKS"); - assertThat(result.keyStorePassword()).isEqualTo("keystorePassword".toCharArray()); - - // Verify truststore configuration is accessible - assertThat(result.trustStoreType()).isEqualTo("PKCS12"); - assertThat(result.trustStorePassword()).isEqualTo("truststorePassword".toCharArray()); } @Test @@ -325,45 +144,20 @@ public class CdcPublisherTests when(cdcConfig.failOnRecordTooLargeError()).thenReturn(false); when(cdcConfig.failOnKafkaError()).thenReturn(true); - EventConsumer result = cdcPublisher.eventConsumer(cdcConfig, avroSerializer); + InstanceMetadata mockInstance = mock(InstanceMetadata.class, RETURNS_DEEP_STUBS); + when(mockInstance.delegate().nodeSettings().releaseVersion()).thenReturn("4.1.0"); + doAnswer(invocation -> { + Function<InstanceMetadata, Object> fn = invocation.getArgument(0); + return fn.apply(mockInstance); + }).when(instanceMetadataFetcher).callOnFirstAvailableInstance(any()); + CassandraBridge mockBridge = mock(CassandraBridge.class); + when(mockBridge.getVersion()).thenReturn(CassandraVersion.FOURONE); + when(cassandraBridgeFactory.get(anyString())).thenReturn(mockBridge); + when(kafkaProducerFactory.create(any())).thenReturn(mock(KafkaProducer.class)); + + EventConsumer result = cdcPublisher.eventConsumer(cdcConfig); assertThat(result).isNotNull(); assertThat(result).isInstanceOf(CdcEventConsumer.class); } - - - private SslConfiguration mockSslConfiguration(boolean enabled, - boolean preferOpenSSL, - String clientAuth, - java.util.List<String> cipherSuites, - java.util.List<String> secureTransportProtocols, - String handshakeTimeout, - boolean keystoreConfigured, - boolean truststoreConfigured) - { - SslConfiguration sslConfig = mock(SslConfiguration.class, RETURNS_DEEP_STUBS); - when(sslConfig.enabled()).thenReturn(enabled); - when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL); - when(sslConfig.clientAuth()).thenReturn(clientAuth); - when(sslConfig.cipherSuites()).thenReturn(cipherSuites); - when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols); - - SecondBoundConfiguration durationSpec = mock(SecondBoundConfiguration.class); - when(durationSpec.toString()).thenReturn(handshakeTimeout); - when(sslConfig.handshakeTimeout()).thenReturn(durationSpec); - - when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured); - when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured); - - return sslConfig; - } - - private KeyStoreConfiguration mockKeystoreConfiguration(String path, String password, String type) - { - KeyStoreConfiguration config = mock(KeyStoreConfiguration.class); - when(config.path()).thenReturn(path); - when(config.password()).thenReturn(password); - when(config.type()).thenReturn(type); - return config; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
