This is an automated email from the ASF dual-hosted git repository.
yifan-c pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new a69cd7a3 CASSSIDECAR-447: Scope all CDC dependencies exclusively to
CdcModule (#341)
a69cd7a3 is described below
commit a69cd7a3146912e7b1d746ce2a33d01905c03c0a
Author: Yifan Cai <[email protected]>
AuthorDate: Tue May 26 13:50:01 2026 -0700
CASSSIDECAR-447: Scope all CDC dependencies exclusively to CdcModule (#341)
Patch by Yifan Cai; Reviewed by Francisco Guerrero, Jyothsna Konisa for
CASSSIDECAR-447
---
CHANGES.txt | 1 +
.../sidecar/testing/TestCdcPublisher.java | 6 +-
.../db/SystemDatabaseAccessorIntegrationTest.java | 2 +-
...SharedClusterCdcSidecarIntegrationTestBase.java | 6 +-
.../cassandra/sidecar/CassandraSidecarDaemon.java | 8 +-
.../sidecar/bridge/CassandraBridgeFactory.java | 5 +-
.../cassandra/sidecar/cdc/CachingSchemaStore.java | 9 +-
.../cassandra/sidecar/cdc/CdcConfigImpl.java | 2 -
.../apache/cassandra/sidecar/cdc/CdcManager.java | 48 ++++-----
.../apache/cassandra/sidecar/cdc/CdcPublisher.java | 12 +--
.../cassandra/sidecar/cdc/CdcSchemaSupplier.java | 6 +-
.../codecs/DcLocalTopologyChangeEventCodec.java | 4 +-
.../CassandraClientTokenRingProvider.java | 2 -
.../coordination/ContentionFreeRangeManager.java | 5 +-
.../InnerDcTokenAdjacentPeerProvider.java | 2 -
.../sidecar/coordination/RangeManager.java | 6 +-
.../coordination/SidecarHttpHealthProvider.java | 2 -
.../coordination/SidecarPeerHealthMonitorTask.java | 2 -
.../cassandra/sidecar/db/CdcConfigAccessor.java | 2 -
.../cassandra/sidecar/db/CdcDatabaseAccessor.java | 2 -
...or.java => CdcSystemViewsDatabaseAccessor.java} | 37 ++++---
.../cassandra/sidecar/db/KafkaConfigAccessor.java | 2 -
.../sidecar/db/TokenSplitConfigAccessor.java | 4 +-
.../sidecar/db/VirtualTablesDatabaseAccessor.java | 61 ------------
.../sidecar/handlers/LifecycleUpdateHandler.java | 3 +-
.../cassandra/sidecar/modules/CdcModule.java | 49 +++++----
.../sidecar/modules/ConfigurationModule.java | 31 +++---
.../cassandra/sidecar/modules/SidecarModules.java | 109 ++++++++++++++++-----
.../cassandra/sidecar/modules/SysInfoModule.java | 5 +-
.../cassandra/sidecar/modules/UtilitiesModule.java | 12 +++
.../sidecar/modules/guice-best-practice.md | 55 +++++++++++
.../tasks/CassandraClusterSchemaMonitor.java | 2 -
.../tasks/CdcConfigRefresherNotifierTask.java | 5 +-
.../sidecar/tasks/CdcRawDirectorySpaceCleaner.java | 8 +-
.../sidecar/tasks/ClusterTopologyMonitor.java | 6 +-
.../sidecar/CassandraSidecarDaemonTest.java | 4 +-
.../cassandra/sidecar/cdc/CdcPublisherTests.java | 6 +-
.../tasks/CdcRawDirectorySpaceCleanerTest.java | 28 +++---
38 files changed, 306 insertions(+), 253 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e00877c..54ee2d2b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Scope all CDC dependencies exclusively to CdcModule (CASSSIDECAR-447)
* Add ConfigurationProvider interfaces for pluggable overlay storage
(CASSSIDECAR-424)
* Refactor OperationalJob to have data separate from execution logic
(CASSSIDECAR-460)
* Sidecar’s CassandraBridgeFactory FQCN colliding with the Cassandra
analytics class (CASSSIDECAR-467)
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
index 1604077a..8399a892 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -58,7 +58,7 @@ public class TestCdcPublisher extends CdcPublisher
CdcConfig conf,
CdcDatabaseAccessor databaseAccessor,
ICdcStats cdcStats,
- VirtualTablesDatabaseAccessor virtualTables,
+ CdcSystemViewsDatabaseAccessor cdcSystemViews,
SidecarCdcStats sidecarCdcStats,
Provider<RangeManager> rangeManagerProvider,
CassandraBridgeFactory cassandraBridgeFactory,
@@ -67,7 +67,7 @@ public class TestCdcPublisher extends CdcPublisher
{
super(vertx, executorPools, clusterConfigProvider,
schemaSupplier, instanceMetadataFetcher, conf, databaseAccessor,
cdcStats,
- virtualTables, sidecarCdcStats, rangeManagerProvider,
+ cdcSystemViews, sidecarCdcStats, rangeManagerProvider,
cassandraBridgeFactory, sidecarCdcClientProvider,
mock(CachingSchemaStore.class),
mock(KafkaProducerFactory.class),
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/db/SystemDatabaseAccessorIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/db/SystemDatabaseAccessorIntegrationTest.java
index 43c58f68..3a341009 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/db/SystemDatabaseAccessorIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/db/SystemDatabaseAccessorIntegrationTest.java
@@ -67,7 +67,7 @@ class SystemDatabaseAccessorIntegrationTest extends
SharedClusterSidecarIntegrat
CQLSessionProvider sessionProvider = cqlSessionProvider(cluster);
SystemViewsSchema systemViewsSchema = new SystemViewsSchema();
systemViewsSchema.initialize(sessionProvider.get(), t -> false);
- SystemViewsDatabaseAccessor accessor = new
SystemViewsDatabaseAccessor(systemViewsSchema, sessionProvider);
+ CdcSystemViewsDatabaseAccessor accessor = new
CdcSystemViewsDatabaseAccessor(systemViewsSchema, sessionProvider);
Long cdcTotalSpaceSettings = accessor.cdcTotalSpaceBytesSetting();
assertThat(cdcTotalSpaceSettings).isNotNull().isEqualTo(CDC_SIZE_LIMIT_IN_MIB *
ONE_MIB);
}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
index 6167b474..70ff33b9 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java
@@ -48,7 +48,7 @@ import
org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
@@ -162,7 +162,7 @@ public abstract class
SharedClusterCdcSidecarIntegrationTestBase extends SharedC
CdcConfig conf,
CdcDatabaseAccessor databaseAccessor,
ICdcStats cdcStats,
- VirtualTablesDatabaseAccessor virtualTables,
+ CdcSystemViewsDatabaseAccessor systemViews,
SidecarCdcStats sidecarCdcStats,
TokenRingProvider tokenRingProvider,
CassandraBridgeFactory
cassandraBridgeFactory,
@@ -178,7 +178,7 @@ public abstract class
SharedClusterCdcSidecarIntegrationTestBase extends SharedC
conf,
databaseAccessor,
cdcStats,
- virtualTables,
+ systemViews,
sidecarCdcStats,
() -> rangeManager,
cassandraBridgeFactory,
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
b/server/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index ddf3c24d..8b1b14a3 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Guice;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
import org.jetbrains.annotations.VisibleForTesting;
@@ -42,9 +44,11 @@ public class CassandraSidecarDaemon
@VisibleForTesting
static Server runningApplication;
- public static void main(String[] args)
+ public static void main(String[] args) throws Exception
{
- Server app =
Guice.createInjector(SidecarModules.all(determineConfigPath()))
+ Path confPath = determineConfigPath();
+ SidecarConfiguration config =
SidecarConfigurationImpl.readYamlConfiguration(confPath);
+ Server app = Guice.createInjector(SidecarModules.all(config))
.getInstance(Server.class);
runningApplication = app;
app.start().onSuccess(deploymentId ->
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/bridge/CassandraBridgeFactory.java
b/server/src/main/java/org/apache/cassandra/sidecar/bridge/CassandraBridgeFactory.java
index 99a26ccf..8dc49350 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/bridge/CassandraBridgeFactory.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/bridge/CassandraBridgeFactory.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import jakarta.inject.Singleton;
import org.apache.cassandra.bridge.BaseCassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
@@ -43,7 +42,6 @@ import static
org.apache.cassandra.bridge.BaseCassandraBridgeFactory.getCassandr
* and provides methods to retrieve bridge instances for specific Cassandra
versions.
* Each bridge is loaded from version-specific JAR resources and instantiated
using reflection.
*/
-@Singleton
public class CassandraBridgeFactory
{
// maps Cassandra version-specific jar name (e.g. 'four-zero') to matching
CassandraBridge
@@ -120,8 +118,7 @@ public class CassandraBridgeFactory
public ClassLoader buildClassLoader(String... resourceNames)
{
- return AccessController.doPrivileged((PrivilegedAction<ClassLoader>)
() ->
- BaseCassandraBridgeFactory.buildClassLoader(resourceNames));
+ return AccessController.doPrivileged((PrivilegedAction<ClassLoader>)
() -> BaseCassandraBridgeFactory.buildClassLoader(resourceNames));
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
index e91bd6b6..47db6ca2 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import org.apache.avro.Schema;
@@ -73,7 +72,6 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR
* <p>Schema version UUIDs ({@link #getVersion}) are derived from the CQL
{@code CREATE TABLE}
* statement, consistent with the analytics {@code CachingSchemaStore}
implementation.
*/
-@Singleton
public class CachingSchemaStore implements SchemaStore
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CachingSchemaStore.class);
@@ -230,9 +228,10 @@ public class CachingSchemaStore implements SchemaStore
// Remove any old schema entries for deleted tables, this operation
can be done in the end as this is
// only for removing stale entries and no one is going to use these
entries once the table is removed.
// This doesn't have to be an atomic operation.
- avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream()
- .map(cqlTable
-> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()))
-
.collect(Collectors.toList()));
+ Set<TableIdentifier> toKeep = refreshedCdcTables.stream()
+ .map(cqlTable ->
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()))
+
.collect(Collectors.toSet());
+ avroSchemasCache.keySet().retainAll(toKeep);
vertx.eventBus().publish(ON_CDC_CACHE_WARMED_UP.address(), "Cdc cache
warmed up");
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
index fa4d7469..3e493aaa 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
@@ -43,7 +42,6 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CON
* Implementation of the interface {@link CdcConfig}, an in-memory
representation holding
* CDC and Kafka configurations from "configs" table inside sidecar internal
keyspace.
*/
-@Singleton
public class CdcConfigImpl implements CdcConfig
{
private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
index 76889ca4..88f05d02 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,28 +128,31 @@ public class CdcManager
try
{
- ownedRanges.entrySet().stream()
- .flatMap(entry ->
- entry.getValue().stream().map(range -> {
- Integer instanceId =
getInstanceId(entry.getKey());
-
- // Create unique key:
"instanceId:rangeStart:rangeEnd"
- String uniqueKey = String.format("%d:%s:%s",
- instanceId,
-
range.startAsBigInt(),
-
range.endAsBigInt());
-
- return
uniqueCdcConsumers.computeIfAbsent(uniqueKey, k ->
- buildConsumer(conf.jobId(),
- instanceId,
- clusterConfigProvider,
- eventConsumer,
- schemaSupplier,
- () ->
org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(),
range.endAsBigInt()),
- sidecarCdcClient,
- cdcStats));
- }))
- .collect(Collectors.toList());
+ for (Map.Entry<String, Set<TokenRange>> entry :
ownedRanges.entrySet())
+ {
+ for (TokenRange range : entry.getValue())
+ {
+ Integer instanceId = getInstanceId(entry.getKey());
+
+ // Create unique key: "instanceId:rangeStart:rangeEnd"
+ String uniqueKey = String.format("%d:%s:%s",
+ instanceId,
+ range.startAsBigInt(),
+ range.endAsBigInt());
+
+ TokenRangeSupplier tokenRangeSupplier = () ->
org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(),
+
range.endAsBigInt());
+ uniqueCdcConsumers.computeIfAbsent(uniqueKey,
+ k ->
buildConsumer(conf.jobId(),
+
instanceId,
+
clusterConfigProvider,
+
eventConsumer,
+
schemaSupplier,
+
tokenRangeSupplier,
+
sidecarCdcClient,
+
cdcStats));
+ }
+ }
entries = new ArrayList<>(uniqueCdcConsumers.values());
return entries;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
index 2ab1a6f7..134fa13c 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
-import com.google.inject.Singleton;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
@@ -47,7 +46,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -59,7 +58,6 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_
/**
* Class that handles CDC life cycle
*/
-@Singleton
public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcPublisher.class);
@@ -71,7 +69,7 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
private volatile boolean isInitialized = false;
private volatile boolean cdcCacheWarmedUp = false;
private final CdcDatabaseAccessor databaseAccessor;
- private final VirtualTablesDatabaseAccessor virtualTables;
+ private final CdcSystemViewsDatabaseAccessor systemViews;
private final SidecarCdcStats sidecarCdcStats;
private final SchemaSupplier schemaSupplier;
private final InstanceMetadataFetcher instanceMetadataFetcher;
@@ -95,7 +93,7 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
CdcConfig conf,
CdcDatabaseAccessor databaseAccessor,
ICdcStats cdcStats,
- VirtualTablesDatabaseAccessor virtualTables,
+ CdcSystemViewsDatabaseAccessor systemViews,
SidecarCdcStats sidecarCdcStats,
Provider<RangeManager> rangeManagerProvider,
CassandraBridgeFactory cassandraBridgeFactory,
@@ -108,7 +106,7 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
this.executorPools = executorPools.internal();
this.conf = conf;
this.databaseAccessor = databaseAccessor;
- this.virtualTables = virtualTables;
+ this.systemViews = systemViews;
this.schemaSupplier = schemaSupplier;
this.instanceMetadataFetcher = instanceMetadataFetcher;
this.clusterConfigProvider = clusterConfigProvider;
@@ -276,7 +274,7 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
{
LOGGER.info("Cdc not enabled in this DC localDc={} cdcDc={}",
localDc, conf.datacenter());
}
- else if (virtualTables.isCdcOnRepairEnabled())
+ else if (systemViews.isCdcOnRepairEnabled())
{
LOGGER.warn("Cannot run CDC while cdc on repair is enabled,
disable cdc_on_repair_enabled in the yaml file.");
sidecarCdcStats.captureCdcOnRepairEnabled();
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcSchemaSupplier.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcSchemaSupplier.java
index eaec0259..e2bb55a8 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcSchemaSupplier.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcSchemaSupplier.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
-import com.google.inject.Singleton;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CdcBridge;
import org.apache.cassandra.bridge.CdcBridgeFactory;
@@ -71,7 +70,6 @@ import org.jetbrains.annotations.NotNull;
* @see CdcUtil
* @see CassandraBridge
*/
-@Singleton
public class CdcSchemaSupplier implements SchemaSupplier
{
private final InstanceMetadataFetcher instanceMetadataFetcher;
@@ -79,7 +77,9 @@ public class CdcSchemaSupplier implements SchemaSupplier
private final CdcDatabaseAccessor cdcDatabaseAccessor;
private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
- public CdcSchemaSupplier(InstanceMetadataFetcher instanceMetadataFetcher,
CassandraBridgeFactory cassandraBridgeFactory, CdcDatabaseAccessor
cdcDatabaseAccessor)
+ public CdcSchemaSupplier(InstanceMetadataFetcher instanceMetadataFetcher,
+ CassandraBridgeFactory cassandraBridgeFactory,
+ CdcDatabaseAccessor cdcDatabaseAccessor)
{
this.instanceMetadataFetcher = instanceMetadataFetcher;
this.cassandraBridgeFactory = cassandraBridgeFactory;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/DcLocalTopologyChangeEventCodec.java
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/DcLocalTopologyChangeEventCodec.java
index 775392d8..872795f5 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/DcLocalTopologyChangeEventCodec.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/DcLocalTopologyChangeEventCodec.java
@@ -36,7 +36,9 @@ import
org.apache.cassandra.sidecar.tasks.ClusterTopologyMonitor;
/**
* Message codec for encoding and decoding datacenter-local topology change
events over the Vert.x event bus.
*/
-public class DcLocalTopologyChangeEventCodec implements
MessageCodec<ClusterTopologyMonitor.DcLocalTopologyChangeEvent,
ClusterTopologyMonitor.DcLocalTopologyChangeEvent>
+public class DcLocalTopologyChangeEventCodec
+ implements
MessageCodec<ClusterTopologyMonitor.DcLocalTopologyChangeEvent,
+
ClusterTopologyMonitor.DcLocalTopologyChangeEvent>
{
public static final DcLocalTopologyChangeEventCodec INSTANCE = new
DcLocalTopologyChangeEventCodec();
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java
index ed55de85..7c56ddc3 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java
@@ -46,7 +46,6 @@ import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Token;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
@@ -62,7 +61,6 @@ import
org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
* Class for getting token range related information using cassandra client's
session.
* Token ranges are cached to avoid making dns calls when cluster topology has
not changed.
*/
-@Singleton
public class CassandraClientTokenRingProvider extends TokenRingProvider
implements LocalTokenRangesProvider
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClientTokenRingProvider.class);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/ContentionFreeRangeManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/ContentionFreeRangeManager.java
index 33035d28..2e8f4365 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/ContentionFreeRangeManager.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/ContentionFreeRangeManager.java
@@ -22,16 +22,15 @@ import java.util.Map;
import java.util.Set;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
/**
- * Stub implementation of the RangeManager that provides contention-free
failover for token ranges without coordinating with other Sidecar instances, at
the cost of consistency.
+ * Stub implementation of the RangeManager that provides contention-free
failover for token ranges without
+ * coordinating with other Sidecar instances, at the cost of consistency.
*/
-@Singleton
public class ContentionFreeRangeManager extends RangeManager
{
@Inject
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
index 9ca0df43..959229d9 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
@@ -42,7 +42,6 @@ import com.datastax.driver.core.Host;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.common.server.cluster.locator.Token;
@@ -60,7 +59,6 @@ import static
org.apache.cassandra.sidecar.config.yaml.CassandraInputValidationC
/**
* Return Sidecar(s) adjacent to current Sidecar in the token ring within the
same datacenter.
*/
-@Singleton
public class InnerDcTokenAdjacentPeerProvider implements SidecarPeerProvider
{
private static final Logger LOGGER =
LoggerFactory.getLogger(InnerDcTokenAdjacentPeerProvider.class);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/RangeManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/RangeManager.java
index 47e0005a..53ec1378 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/RangeManager.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/RangeManager.java
@@ -253,7 +253,8 @@ public abstract class RangeManager implements
Handler<Message<Object>>
}
else
{
- LOGGER.warn("Failed to release ownership of instance primary
range host={} port={} ranges='{}'", instance.hostname(), instance.port(),
primaryRanges);
+ LOGGER.warn("Failed to release ownership of instance primary
range host={} port={} ranges='{}'",
+ instance.hostname(), instance.port(),
primaryRanges);
}
})
.onFailure(throwable -> LOGGER.warn("Error attempting to release range
ownership", throwable));
@@ -286,7 +287,8 @@ public abstract class RangeManager implements
Handler<Message<Object>>
}
else
{
- LOGGER.warn("Failed to gain ownership of instance primary
range host={} port={} ranges='{}'", instance.hostname(), instance.port(),
primaryRanges);
+ LOGGER.warn("Failed to gain ownership of instance primary
range host={} port={} ranges='{}'",
+ instance.hostname(), instance.port(),
primaryRanges);
}
})
.onFailure(throwable -> LOGGER.warn("Error attempting to gain range
ownership", throwable));
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java
index 008d3871..011cf0e4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.coordination;
import java.util.concurrent.CompletableFuture;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Future;
import org.apache.cassandra.sidecar.client.SidecarClient;
import org.apache.cassandra.sidecar.client.SidecarInstance;
@@ -33,7 +32,6 @@ import
org.apache.cassandra.sidecar.utils.SidecarClientProvider;
* Provides the health of a Sidecar instance over HTTP API, retrying to
* confirm Sidecar is DOWN for extended period of time.
*/
-@Singleton
public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider
{
private final SidecarClientProvider clientProvider;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
index e2ae016b..77db2108 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
@@ -53,7 +52,6 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR
* Pings other 'peer' Sidecar(s) that are relevant to this Sidecar over HTTP
and notifies
* listeners when other Sidecar(s) goes DOWN or OK.
*/
-@Singleton
public class SidecarPeerHealthMonitorTask implements PeriodicTask
{
private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/CdcConfigAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcConfigAccessor.java
index 66206ee3..a0055295 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/CdcConfigAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcConfigAccessor.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.sidecar.db;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.common.request.Service;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
@@ -28,7 +27,6 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
* "configs" table in sidecar keyspace. All the cdc configurations required
for CDC feature are
* stored in this table using this class.
*/
-@Singleton
public class CdcConfigAccessor extends ConfigAccessorImpl
{
@Inject
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java
index 472a4cf8..31c6b046 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java
@@ -37,7 +37,6 @@ import com.datastax.driver.core.Row;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
-import com.google.inject.Singleton;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
@@ -58,7 +57,6 @@ import org.jetbrains.annotations.NotNull;
* operations.
*/
@SuppressWarnings("resource")
-@Singleton
public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcSystemViewsDatabaseAccessor.java
similarity index 68%
rename from
server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
rename to
server/src/main/java/org/apache/cassandra/sidecar/db/CdcSystemViewsDatabaseAccessor.java
index db79ae24..c21b425c 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/CdcSystemViewsDatabaseAccessor.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema;
import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
@@ -37,19 +36,19 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Database Accessor that queries cassandra to get information maintained
under system_views keyspace.
+ * Database accessor for CDC-related settings in the {@code
system_views.settings} virtual table.
*/
-@Singleton
-public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSchema>
+public class CdcSystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSchema>
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(SystemViewsDatabaseAccessor.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcSystemViewsDatabaseAccessor.class);
- private static final String YAML_PROP_IN_MB = "cdc_total_space_in_mb";
- private static final String YAML_PROP_WITH_UNIT = "cdc_total_space"; //
expects value with units e.g. "5MiB"
+ private static final String CDC_TOTAL_SPACE_IN_MB_NAME =
"cdc_total_space_in_mb";
+ private static final String CDC_TOTAL_SPACE_NAME = "cdc_total_space"; //
expects value with units e.g. "5MiB"
+ private static final String CDC_ON_REPAIR_ENABLED_FLAG =
"cdc_on_repair_enabled";
@Inject
- public SystemViewsDatabaseAccessor(SystemViewsSchema systemViewsSchema,
- CQLSessionProvider sessionProvider)
+ public CdcSystemViewsDatabaseAccessor(SystemViewsSchema systemViewsSchema,
+ CQLSessionProvider sessionProvider)
{
super(systemViewsSchema, sessionProvider);
}
@@ -61,17 +60,17 @@ public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSch
@Nullable
public Long cdcTotalSpaceBytesSetting() throws SchemaUnavailableException
{
- // attempt to parse Cassandra v4.0 'cdc_total_space_in_mb' yaml prop
- String[] cdcTotalSpaceSettingNames = { YAML_PROP_IN_MB,
YAML_PROP_WITH_UNIT };
+ // attempt to read Cassandra v4.0 'cdc_total_space_in_mb'
+ String[] cdcTotalSpaceSettingNames = { CDC_TOTAL_SPACE_IN_MB_NAME,
CDC_TOTAL_SPACE_NAME };
Map<String, String> settings = getSettings(cdcTotalSpaceSettingNames);
- String cdcTotalSpaceInMb = settings.get(YAML_PROP_IN_MB);
+ String cdcTotalSpaceInMb = settings.get(CDC_TOTAL_SPACE_IN_MB_NAME);
if (cdcTotalSpaceInMb != null)
{
return FileUtils.mbStringToBytes(cdcTotalSpaceInMb);
}
- // otherwise parse current (v5.0+) 'cdc_total_space' yaml prop
- String storageStringToBytes = settings.get(YAML_PROP_WITH_UNIT);
+ // otherwise read Cassandra v5.0+ 'cdc_total_space'
+ String storageStringToBytes = settings.get(CDC_TOTAL_SPACE_NAME);
if (storageStringToBytes != null)
{
return FileUtils.storageStringToBytes(storageStringToBytes);
@@ -102,4 +101,14 @@ public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSch
row -> row.getString(1))
);
}
+
+ /**
+ * @return {@code true} if {@code cdc_on_repair_enabled} is set to {@code
true} in {@code system_views.settings}
+ * @throws SchemaUnavailableException when the schema is not initialized
+ */
+ public boolean isCdcOnRepairEnabled() throws SchemaUnavailableException
+ {
+ String value =
getSettings(CDC_ON_REPAIR_ENABLED_FLAG).get(CDC_ON_REPAIR_ENABLED_FLAG);
+ return value != null && "true".equalsIgnoreCase(value);
+ }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/KafkaConfigAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/KafkaConfigAccessor.java
index 7047ee69..6c69bce4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/KafkaConfigAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/KafkaConfigAccessor.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.sidecar.db;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.common.request.Service;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
@@ -28,7 +27,6 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
* "configs" table in sidecar keyspace. All the kafka configurations required
for CDC feature are
* stored in this table using this class.
*/
-@Singleton
public class KafkaConfigAccessor extends ConfigAccessorImpl
{
@Inject
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/TokenSplitConfigAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/TokenSplitConfigAccessor.java
index 22c68c29..4ad54a9f 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/TokenSplitConfigAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/TokenSplitConfigAccessor.java
@@ -20,7 +20,6 @@
package org.apache.cassandra.sidecar.db;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
@@ -59,9 +58,8 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
*
* @see ConfigAccessorImpl
* @see org.apache.cassandra.sidecar.utils.TokenSplitUtil
- * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see CdcDatabaseAccessor
*/
-@Singleton
public class TokenSplitConfigAccessor extends ConfigAccessorImpl
{
@Inject
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/VirtualTablesDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/VirtualTablesDatabaseAccessor.java
deleted file mode 100644
index c814aaae..00000000
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/VirtualTablesDatabaseAccessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.db;
-
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
-import org.apache.cassandra.sidecar.db.schema.TableSchema;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-
-/**
- * Database accessor for querying Cassandra virtual tables in the system_views
keyspace.
- */
-@Singleton
-public class VirtualTablesDatabaseAccessor extends
DatabaseAccessor<TableSchema>
-{
- public static final String SYSTEM_VIEWS_KS = "system_views";
- public static final String SYSTEM_VIEWS_SETTINGS_TBL = "settings";
- public static final String CDC_ON_REPAIR_ENABLED_FLAG =
"cdc_on_repair_enabled";
-
- /**
- * Creates a new virtual tables database accessor.
- */
- @Inject
- public VirtualTablesDatabaseAccessor(TableSchema tableSchema,
CQLSessionProvider sessionProvider)
- {
- super(tableSchema, sessionProvider);
- }
-
- /**
- * Checks if CDC on repair is enabled in the system settings.
- */
- public boolean isCdcOnRepairEnabled()
- {
- Select.Where query = QueryBuilder.select("value")
- .from(SYSTEM_VIEWS_KS,
SYSTEM_VIEWS_SETTINGS_TBL)
- .where(eq("name",
CDC_ON_REPAIR_ENABLED_FLAG));
- Row row = session().execute(query).one();
- return row != null && !row.isNull(0) &&
"true".equalsIgnoreCase(row.getString(0));
- }
-}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
index 6f69fcec..0caeb99a 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
@@ -89,7 +89,8 @@ public class LifecycleUpdateHandler extends
NodeCommandHandler implements Access
response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
break;
default:
- logger.warn("{} request failed
with unexpected result. request={}, remoteAddress={}, instance={},
operationStatus={}",
+ logger.warn("{} request failed
with unexpected result. " +
+ "request={},
remoteAddress={}, instance={}, operationStatus={}",
this.getClass().getSimpleName(), request, remoteAddress, host, info.status());
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index ab00ae12..e49538e1 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
+import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.ProvidesIntoMap;
import io.vertx.core.Vertx;
@@ -59,7 +60,6 @@ import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
import org.apache.cassandra.sidecar.cdc.SidecarClientSecretsProvider;
import org.apache.cassandra.sidecar.cdc.SidecarClusterConfigProvider;
import org.apache.cassandra.sidecar.cdc.SidecarCqlToAvroSchemaConverter;
-import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import
org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload;
@@ -72,7 +72,6 @@ import
org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
-import
org.apache.cassandra.sidecar.coordination.DynamicSidecarInstancesProvider;
import
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider;
@@ -82,10 +81,10 @@ import
org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
import org.apache.cassandra.sidecar.db.CdcConfigAccessor;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.db.KafkaConfigAccessor;
import org.apache.cassandra.sidecar.db.TokenSplitConfigAccessor;
-import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
import org.apache.cassandra.sidecar.db.schema.ConfigsSchema;
import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema;
@@ -125,6 +124,20 @@ public class CdcModule extends AbstractModule
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcModule.class);
+ @Override
+ protected void configure()
+ {
+ bind(CachingSchemaStore.class).in(Scopes.SINGLETON);
+ bind(CdcRawDirectorySpaceCleaner.class).in(Scopes.SINGLETON);
+ bind(SidecarPeerHealthMonitorTask.class).in(Scopes.SINGLETON);
+ bind(InnerDcTokenAdjacentPeerProvider.class).in(Scopes.SINGLETON);
+ bind(CdcDatabaseAccessor.class).in(Scopes.SINGLETON);
+ bind(CdcConfigAccessor.class).in(Scopes.SINGLETON);
+ bind(KafkaConfigAccessor.class).in(Scopes.SINGLETON);
+ bind(TokenSplitConfigAccessor.class).in(Scopes.SINGLETON);
+ bind(CdcSystemViewsDatabaseAccessor.class).in(Scopes.SINGLETON);
+ }
+
@ProvidesIntoMap
@KeyClassMapKey(PeriodicTaskMapKeys.SidecarPeerHealthMonitorTaskKey.class)
PeriodicTask sidecarPeerHealthMonitorTask(SidecarPeerHealthMonitorTask
task)
@@ -143,14 +156,14 @@ public class CdcModule extends AbstractModule
@Provides
@Singleton
CassandraClusterSchemaMonitor
cassandraClusterSchemaMonitorInstance(InstanceMetadataFetcher
instanceMetadataFetcher,
-
CdcDatabaseAccessor databaseAccessor,
-
DriverUnsupportedSchemaCache driverUnsupportedSchemaCache,
-
SidecarConfiguration configuration,
-
CassandraBridgeFactory cassandraBridgeFactory)
+
CdcDatabaseAccessor databaseAccessor,
+
DriverUnsupportedSchemaCache driverUnsupportedSchemaCache,
+
SidecarConfiguration configuration,
+
CassandraBridgeFactory cassandraBridgeFactory)
{
return new CassandraClusterSchemaMonitor(instanceMetadataFetcher,
databaseAccessor,
-
driverUnsupportedSchemaCache, configuration,
- cassandraBridgeFactory);
+ driverUnsupportedSchemaCache,
configuration,
+ cassandraBridgeFactory);
}
@ProvidesIntoMap
@@ -329,13 +342,6 @@ public class CdcModule extends AbstractModule
return new CdcDynamicSidecarInstancesProvider(instancesMetadata,
serviceConfiguration);
}
- @Provides
- @Singleton
- public SidecarInstancesProvider sidecarInstancesProvider(InstancesMetadata
instancesMetadata, ServiceConfiguration serviceConfiguration)
- {
- return new DynamicSidecarInstancesProvider(instancesMetadata,
serviceConfiguration);
- }
-
@Provides
@Singleton
public SidecarCdcStats sidecarCdcStats()
@@ -455,7 +461,7 @@ public class CdcModule extends AbstractModule
CdcConfig conf,
CdcDatabaseAccessor databaseAccessor,
ICdcStats cdcStats,
- VirtualTablesDatabaseAccessor virtualTables,
+ CdcSystemViewsDatabaseAccessor systemViews,
SidecarCdcStats sidecarCdcStats,
RangeManager rangeManager,
CassandraBridgeFactory cassandraBridgeFactory,
@@ -472,7 +478,7 @@ public class CdcModule extends AbstractModule
conf,
databaseAccessor,
cdcStats,
- virtualTables,
+ systemViews,
sidecarCdcStats,
() -> rangeManager,
cassandraBridgeFactory,
@@ -489,13 +495,6 @@ public class CdcModule extends AbstractModule
return new SidecarCdcOptions(instanceMetadataFetcher);
}
- @Provides
- @Singleton
- public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration
configuration)
- {
- return new CdcStatesSchema(configuration);
- }
-
@ProvidesIntoMap
@KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
PeriodicTask cdcPublisherTask(CdcPublisher cdcPublisher)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
index 983a6195..f6256405 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.sidecar.modules;
-import java.io.IOException;
-import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@@ -54,7 +52,6 @@ import
org.apache.cassandra.sidecar.config.InstanceConfiguration;
import org.apache.cassandra.sidecar.config.JmxConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache;
import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
@@ -64,6 +61,7 @@ import
org.apache.cassandra.sidecar.modules.multibindings.PeriodicTaskMapKeys;
import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import org.apache.cassandra.sidecar.utils.EventBusUtils;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
@@ -75,35 +73,30 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_
public class ConfigurationModule extends AbstractModule
{
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurationModule.class);
- protected final Path confPath;
+ @Nullable
+ private final SidecarConfiguration config;
- /**
- * Constructs the Guice main module to run Cassandra Sidecar
- */
- public ConfigurationModule()
- {
- confPath = null;
- }
/**
- * Constructs the Guice main module with the configured yaml {@code
confPath} to run Cassandra Sidecar
+ * Constructs the Guice main module with a pre-parsed {@link
SidecarConfiguration}.
+ * Using this constructor avoids a second YAML parse at injection time.
*
- * @param confPath the path to the yaml configuration file
+ * @param config the already-parsed sidecar configuration
*/
- public ConfigurationModule(Path confPath)
+ public ConfigurationModule(SidecarConfiguration config)
{
- this.confPath = confPath;
+ this.config = config;
}
@Provides
@Singleton
- SidecarConfiguration sidecarConfiguration() throws IOException
+ SidecarConfiguration sidecarConfiguration()
{
- if (confPath == null)
+ if (config != null)
{
- throw new NullPointerException("the YAML configuration path for
Sidecar has not been defined.");
+ return config;
}
- return SidecarConfigurationImpl.readYamlConfiguration(confPath);
+ throw new NullPointerException("Sidecar configuration is not defined");
}
@Provides
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
index 50073b51..7361254d 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
@@ -18,11 +18,15 @@
package org.apache.cassandra.sidecar.modules;
+import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
import com.google.inject.Module;
import com.google.inject.util.Modules;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import
org.apache.cassandra.sidecar.modules.multibindings.MultiBindingTypeResolverModule;
import org.jetbrains.annotations.Nullable;
@@ -37,33 +41,88 @@ public class SidecarModules
}
/**
- * All sidecar modules
- * @param confPath path to the configuration file
- * @return all sidecar modules
+ * All sidecar modules for a pre-parsed {@link SidecarConfiguration}.
+ * {@link CdcModule} is included unless CDC is explicitly disabled in the
configuration.
+ * Passing {@code null} (e.g. in tests that supply bindings via {@code
Modules.override()})
+ * also includes {@link CdcModule}.
+ *
+ * @param config the already-parsed sidecar configuration, or {@code null}
for tests that
+ * configure modules independently
+ * @return all applicable sidecar modules
+ */
+ public static List<Module> all(@Nullable SidecarConfiguration config)
+ {
+ boolean cdcEnabled = config == null ||
config.serviceConfiguration().cdcConfiguration().isEnabled();
+ return new Builder()
+ .add(Modules.disableCircularProxiesModule())
+ .add(new ApiModule())
+ .add(new AuthModule())
+ .add(new CassandraOperationsModule())
+ .add(new ConfigurationModule(config))
+ .add(new CoordinationModule())
+ .add(new HealthCheckModule())
+ .add(new LifecycleModule())
+ .add(new LiveMigrationModule())
+ .add(new MultiBindingTypeResolverModule())
+ .add(new OpenApiModule())
+ .add(new RestoreJobModule())
+ .add(new SchedulingModule())
+ .add(new SchemaReportingModule())
+ .add(new SidecarSchemaModule())
+ .add(new SSTablesAccessModule())
+ .add(new TelemetryModule())
+ .add(new UtilitiesModule())
+ .add(new SysInfoModule())
+ .addIf(cdcEnabled, new CdcModule())
+ .build();
+ }
+
+ private static class Builder
+ {
+ private final List<Module> modules = new ArrayList<>();
+
+ Builder add(Module module)
+ {
+ modules.add(module);
+ return this;
+ }
+
+ Builder addIf(boolean condition, Module module)
+ {
+ if (condition)
+ {
+ modules.add(module);
+ }
+ return this;
+ }
+
+ List<Module> build()
+ {
+ return List.copyOf(modules);
+ }
+ }
+
+ /**
+ * All sidecar modules, loading configuration from the given YAML path.
+ * {@link CdcModule} is included only when CDC is enabled in the parsed
configuration.
+ *
+ * @param confPath path to the configuration file, or {@code null} to omit
path-based config
+ * @return all applicable sidecar modules
*/
public static List<Module> all(@Nullable Path confPath)
{
- // To prevent unexpected circular dependency chains in your code, we
recommend that you disable Guice's circular proxy feature.
- return List.of(Modules.disableCircularProxiesModule(),
- new ApiModule(),
- new AuthModule(),
- new CassandraOperationsModule(),
- new CdcModule(),
- new ConfigurationModule(confPath),
- new CoordinationModule(),
- new HealthCheckModule(),
- new LifecycleModule(),
- new LiveMigrationModule(),
- new MultiBindingTypeResolverModule(),
- new OpenApiModule(),
- new RestoreJobModule(),
- new SchedulingModule(),
- new SchemaReportingModule(),
- new SidecarSchemaModule(),
- new SSTablesAccessModule(),
- new TelemetryModule(),
- new UtilitiesModule(),
- new SysInfoModule());
+ if (confPath == null)
+ {
+ return all((SidecarConfiguration) null);
+ }
+ try
+ {
+ return
all(SidecarConfigurationImpl.readYamlConfiguration(confPath));
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Failed to read configuration
from " + confPath, e);
+ }
}
/**
@@ -72,6 +131,6 @@ public class SidecarModules
*/
public static List<Module> all()
{
- return all(null);
+ return all((SidecarConfiguration) null);
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SysInfoModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SysInfoModule.java
index ed75625d..710453df 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SysInfoModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SysInfoModule.java
@@ -50,7 +50,10 @@ public class SysInfoModule extends AbstractModule
responseCode = "200",
content = @Content(mediaType = "application/json",
schema = @Schema(type = SchemaType.ARRAY, implementation =
DiskInfo.class,
- example =
"[{\"totalSpace\":1000000000000,\"freeSpace\":500000000000,\"usableSpace\":450000000000,\"name\":\"data1\",\"mount\":\"/dev/sda1\",\"type\":\"ext4\"},{\"totalSpace\":2000000000000,\"freeSpace\":1500000000000,\"usableSpace\":1400000000000,\"name\":\"data2\",\"mount\":\"/dev/sdb1\",\"type\":\"xfs\"}]")))
+ example =
"[{\"totalSpace\":1000000000000,\"freeSpace\":500000000000,\"usableSpace\":450000000000,"
+
+
"\"name\":\"data1\",\"mount\":\"/dev/sda1\",\"type\":\"ext4\"}," +
+
"{\"totalSpace\":2000000000000,\"freeSpace\":1500000000000,\"usableSpace\":1400000000000,"
+
+
"\"name\":\"data2\",\"mount\":\"/dev/sdb1\",\"type\":\"xfs\"}]")))
@APIResponse(description = "Unauthorized - requires SYSTEM permission",
responseCode = "401")
@APIResponse(description = "Service unavailable - unable to fetch disk
information",
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/UtilitiesModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/UtilitiesModule.java
index f7dc3430..2a37f425 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/UtilitiesModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/UtilitiesModule.java
@@ -22,11 +22,15 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
+import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
import
org.apache.cassandra.sidecar.config.CassandraInputValidationConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import
org.apache.cassandra.sidecar.coordination.DynamicSidecarInstancesProvider;
import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.DigestAlgorithmProvider;
@@ -88,6 +92,14 @@ public class UtilitiesModule extends AbstractModule
return new SidecarVersionProvider("/sidecar.version");
}
+ @Provides
+ @Singleton
+ SidecarInstancesProvider sidecarInstancesProvider(InstancesMetadata
instancesMetadata,
+ ServiceConfiguration
serviceConfiguration)
+ {
+ return new DynamicSidecarInstancesProvider(instancesMetadata,
serviceConfiguration);
+ }
+
@Provides
@Singleton
CassandraInputValidator
cassandraInputValidatorFactory(SidecarConfiguration configuration)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/guice-best-practice.md
b/server/src/main/java/org/apache/cassandra/sidecar/modules/guice-best-practice.md
index d185a287..2026e5f2 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/guice-best-practice.md
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/guice-best-practice.md
@@ -116,3 +116,58 @@ that can be later switched transparently without affecting
the classes that depe
Prefer creating concrete classes over utility methods. The concrete classes
can be managed as a
[Singleton](https://en.wikipedia.org/wiki/Singleton_pattern) if they do not
have external dependencies that might
change their behavior based on the configuration.
+
+## Scope all dependencies to the feature module that owns them
+
+When a feature has its own module, all bindings required exclusively by that
feature should live in that module, not in a
+shared or base module. This prevents dependency leakage and keeps the module
graph clean.
+
+For example, [CdcModule](CdcModule.java) owns every binding that CDC
components depend on, including database accessors,
+schema stores, and coordination helpers. None of these CDC-specific bindings
appear in general-purpose modules such as
+[UtilitiesModule](UtilitiesModule.java) or
[SchedulingModule](SchedulingModule.java).
+
+The benefit is bi-directional: other modules stay lean (no unused bindings
when the feature is disabled), and the feature
+module documents its own dependency surface without requiring readers to hunt
across multiple modules.
+
+## Conditional module installation
+
+When a feature can be disabled at runtime, its module should be conditionally
installed based on configuration.
+[SidecarModules](SidecarModules.java) illustrates this pattern with `addIf`:
+
+```java
+boolean cdcEnabled = config == null ||
config.serviceConfiguration().cdcConfiguration().isEnabled();
+...
+.addIf(cdcEnabled, new CdcModule())
+```
+
+Because all CDC bindings live in `CdcModule`, conditionally omitting the
module is enough to exclude the entire feature.
+This only works if the previous practice — scope all dependencies to the
feature module — is followed. A conditionally
+installed module must be self-contained: it must not leave unsatisfied
bindings referenced by other always-installed
+modules.
+
+## Declare scope in the module for conditionally installed features
+
+The [Guice wiki on Scopes](https://github.com/google/guice/wiki/Scopes)
recommends placing scope annotations directly on
+the implementation class:
+
+> *"Specify the scope for a type by applying the scope annotation to the
implementation class. As well as being
+> functional, this annotation also serves as documentation."*
+
+This is good general advice, but it has a subtle consequence: a class
annotated with both `@Singleton` and `@Inject` is
+eligible for Guice's just-in-time (JIT) binding. Guice will instantiate and
cache it as a singleton even if the module
+that owns it was never installed. For always-installed modules this is
harmless, but for conditionally installed modules
+it defeats the gate entirely.
+
+Therefore, for classes that belong exclusively to a conditionally installed
module (such as [CdcModule](CdcModule.java)):
+
+- **Do not** place `@Singleton` on the class.
+- **Do** declare the singleton scope in the owning module, either via
`bind(X.class).in(Scopes.SINGLETON)` in
+ `configure()`, or via `@Provides @Singleton` for interface-bound types.
+
+```java
+// In CdcModule.configure() — scope is owned by the module, not the class
+bind(CdcDatabaseAccessor.class).in(Scopes.SINGLETON);
+bind(CachingSchemaStore.class).in(Scopes.SINGLETON);
+```
+
+This ensures the singleton guarantee only exists when `CdcModule` is
installed. No module, no binding, no object.
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
index 0ec22789..4aeff6aa 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java
@@ -33,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Singleton;
import io.vertx.core.Promise;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CdcBridge;
@@ -59,7 +58,6 @@ import org.jetbrains.annotations.NotNull;
* focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
* in the Cassandra cluster and manages CDC-enabled table metadata.
*/
-@Singleton
public class CassandraClusterSchemaMonitor implements PeriodicTask
{
// 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java
index 19f1d0b0..bb81b957 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.codecs.CdcConfigMappingsCodec;
@@ -44,7 +43,6 @@ import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR
/**
* Periodic task notifying if the CDC config changed
*/
-@Singleton
public class CdcConfigRefresherNotifierTask implements PeriodicTask
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcConfigRefresherNotifierTask.class);
@@ -93,7 +91,8 @@ public class CdcConfigRefresherNotifierTask implements
PeriodicTask
@Override
public ScheduleDecision scheduleDecision()
{
- if
(!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
|| !sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
+ if
(!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+ ||
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
{
LOGGER.trace("Skipping config refreshing");
return ScheduleDecision.SKIP;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
index 28234689..03ed40a1 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
@@ -39,14 +39,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.server.CdcMetrics;
@@ -64,7 +63,6 @@ import static
org.apache.cassandra.sidecar.utils.CdcUtil.parseSegmentId;
* PeriodTask to monitor and remove the oldest commit log segments in the
`cdc_raw` directory
* when the space used hits the `cdc_total_space` limit set in the yaml file.
*/
-@Singleton
public class CdcRawDirectorySpaceCleaner implements PeriodicTask
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcRawDirectorySpaceCleaner.class);
@@ -72,7 +70,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
public static final String CDC_DIR_NAME = "cdc_raw";
private final TimeProvider timeProvider;
- private final SystemViewsDatabaseAccessor systemViewsDatabaseAccessor;
+ private final CdcSystemViewsDatabaseAccessor systemViewsDatabaseAccessor;
private final CdcConfiguration cdcConfiguration;
private final InstancesMetadata instancesMetadata;
private final CdcMetrics cdcMetrics;
@@ -87,7 +85,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
@Inject
public CdcRawDirectorySpaceCleaner(TimeProvider timeProvider,
- SystemViewsDatabaseAccessor
systemViewsDatabaseAccessor,
+ CdcSystemViewsDatabaseAccessor
systemViewsDatabaseAccessor,
ServiceConfiguration
serviceConfiguration,
InstancesMetadata instancesMetadata,
SidecarMetrics metrics)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
index 8b9677a6..e3ddf7eb 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
@@ -304,12 +304,14 @@ public class ClusterTopologyMonitor implements
PeriodicTask
final TokenRange currRange = curr.get(i);
if
(!prevRange.range.lowerEndpoint().equals(currRange.range.lowerEndpoint()))
{
- LOGGER.info("Change in instance lower token instanceId={}
prev={} curr={}", instanceId, prevRange.range.lowerEndpoint(),
currRange.range.lowerEndpoint());
+ LOGGER.info("Change in instance lower token instanceId={}
prev={} curr={}",
+ instanceId, prevRange.range.lowerEndpoint(),
currRange.range.lowerEndpoint());
result = true;
}
if
(!prevRange.range.upperEndpoint().equals(currRange.range.upperEndpoint()))
{
- LOGGER.info("Change in instance upper token instanceId={}
prev={} curr={}", instanceId, prevRange.range.upperEndpoint(),
currRange.range.upperEndpoint());
+ LOGGER.info("Change in instance upper token instanceId={}
prev={} curr={}",
+ instanceId, prevRange.range.upperEndpoint(),
currRange.range.upperEndpoint());
result = true;
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/CassandraSidecarDaemonTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/CassandraSidecarDaemonTest.java
index 714bc5f1..a64a00c9 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/CassandraSidecarDaemonTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/CassandraSidecarDaemonTest.java
@@ -108,7 +108,7 @@ class CassandraSidecarDaemonTest
}
@Test
- void testSuccessfulStartup()
+ void testSuccessfulStartup() throws Exception
{
Path path = Paths.get("../conf/sidecar.yaml");
assertThat(path).exists();
@@ -186,7 +186,7 @@ class CassandraSidecarDaemonTest
}
@Test
- void testLogbackConfiguration()
+ void testLogbackConfiguration() throws Exception
{
Path path = Paths.get("../conf/sidecar.yaml");
assertThat(path).exists();
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
index c5259cf1..180a6ec5 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.mockito.Mock;
@@ -78,7 +78,7 @@ public class CdcPublisherTests
@Mock
private ICdcStats cdcStats;
@Mock
- private VirtualTablesDatabaseAccessor virtualTables;
+ private CdcSystemViewsDatabaseAccessor systemViews;
@Mock
private SidecarCdcStats sidecarCdcStats;
@Mock
@@ -119,7 +119,7 @@ public class CdcPublisherTests
cdcConfig,
databaseAccessor,
cdcStats,
- virtualTables,
+ systemViews,
sidecarCdcStats,
rangeManager,
cassandraBridgeFactory,
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
index 86670fb7..6c0dc734 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
-import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.server.CdcMetrics;
@@ -89,7 +89,7 @@ class CdcRawDirectorySpaceCleanerTest
void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws
IOException
{
TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER;
- SystemViewsDatabaseAccessor systemViewsDatabaseAccessor =
mock(SystemViewsDatabaseAccessor.class);
+ CdcSystemViewsDatabaseAccessor systemViewsDatabaseAccessor =
mock(CdcSystemViewsDatabaseAccessor.class);
when(systemViewsDatabaseAccessor.getSettings(any()))
.thenAnswer((Answer<Map<String, String>>) invocation ->
Map.of("cdc_total_space", "1MiB"));
when(systemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenCallRealMethod();
@@ -145,9 +145,9 @@ class CdcRawDirectorySpaceCleanerTest
{
FakeTimeProvider fakeTimeProvider = new FakeTimeProvider();
InstancesMetadata instancesMetadata = mock(InstancesMetadata.class);
- SystemViewsDatabaseAccessor mockSystemViewsDatabaseAccessor =
mock(SystemViewsDatabaseAccessor.class);
+ CdcSystemViewsDatabaseAccessor mockCdcSystemViewsDatabaseAccessor =
mock(CdcSystemViewsDatabaseAccessor.class);
// First return 1MiB
-
when(mockSystemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenReturn(1L
<< 20)
+
when(mockCdcSystemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenReturn(1L
<< 20)
//
Next return 1GiB
.thenReturn(1L << 30)
//
Next throw an exception when accessing Accessor layer
@@ -163,53 +163,53 @@ class CdcRawDirectorySpaceCleanerTest
CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
fakeTimeProvider,
- mockSystemViewsDatabaseAccessor,
+ mockCdcSystemViewsDatabaseAccessor,
serviceConfiguration,
instancesMetadata,
mockSidecarMetrics
);
// start with no interactions
- verifyNoInteractions(mockSystemViewsDatabaseAccessor);
+ verifyNoInteractions(mockCdcSystemViewsDatabaseAccessor);
assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
assertThat(cleaner.maxUsageBytes()).as("Should read from the cached
value").isEqualTo(1_024L * 1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
// Advance the time provider to 1 millisecond before the cache expires
fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis() -
1, TimeUnit.MILLISECONDS);
// Let's assert it again to ensure we are not reading from Accessor
layer
assertThat(cleaner.maxUsageBytes()).as("Should read from the cached
value").isEqualTo(1_024L * 1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
// Now advance the time provider by the configured cache max usage
fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(),
TimeUnit.MILLISECONDS);
// and we should now read 1GiB
assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(2)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(2)).cdcTotalSpaceBytesSetting();
// Now advance the time provider by the configured cache max usage
fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(),
TimeUnit.MILLISECONDS);
// we should fall back when a SchemaUnavailableException is thrown
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
- verify(mockSystemViewsDatabaseAccessor,
times(3)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(3)).cdcTotalSpaceBytesSetting();
// also fall back when another exception is thrown
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
- verify(mockSystemViewsDatabaseAccessor,
times(4)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(4)).cdcTotalSpaceBytesSetting();
// Finally, we recover and are able to read from accessor. Should read
1GiB
assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
// And read cached value
assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
- verify(mockSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
+ verify(mockCdcSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
}
/* test utils */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]