This is an automated email from the ASF dual-hosted git repository.
pauloricardomg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new a420412d CASSSIDECAR-426: Add FileBasedConfigurationProvider for
file-based overlay persistence (#357)
a420412d is described below
commit a420412d6bf50c52cac0441b8c7142b613cd5c64
Author: Paulo Motta <[email protected]>
AuthorDate: Thu May 28 17:50:07 2026 -0400
CASSSIDECAR-426: Add FileBasedConfigurationProvider for file-based overlay
persistence (#357)
Implement the default ConfigurationProvider that persists overlays as
JSON files under per-instance directories
({configDir}/{instanceId}/config.json).
Writes are atomic via temp file + rename. Hash-based optimistic concurrency
control rejects stale updates. Add Jackson annotations and JavaTimeModule
to ConfigurationOverlaySnapshot for direct JSON serialization.
Patch by Paulo Motta; Reviewed by Francisco Guerrero for CASSSIDECAR-426
---
CHANGES.txt | 1 +
.../CassandraConfigurationOverlay.java | 23 ++
.../ConfigurationOverlaySnapshot.java | 27 ++
.../FileBasedConfigurationProvider.java | 145 ++++++++++
.../FileBasedConfigurationProviderTest.java | 318 +++++++++++++++++++++
5 files changed, 514 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d68339f..2761042b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Add FileBasedConfigurationProvider for file-based overlay persistence
(CASSSIDECAR-426)
* Avoid blocking the event loop in CdcPublisher event-bus handler
(CASSSIDECAR-452)
* Added validations for Live migration configuration (CASSSIDECAR-470)
* Route CDC events to topics by corresponding topic format configuration
(CASSSIDECAR-453)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/CassandraConfigurationOverlay.java
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/CassandraConfigurationOverlay.java
index 577bc8b0..90f5babe 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/CassandraConfigurationOverlay.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/CassandraConfigurationOverlay.java
@@ -93,6 +93,29 @@ public class CassandraConfigurationOverlay
.put("extraJvmOpts", new JsonObject(new
LinkedHashMap<>(extraJvmOpts)));
}
+ /**
+ * Creates a {@link CassandraConfigurationOverlay} from its JSON
representation.
+ *
+ * @param json the JSON object containing {@code cassandraYaml} and {@code
extraJvmOpts}
+ * @return a new overlay instance
+ */
+ @NotNull
+ public static CassandraConfigurationOverlay fromJson(@NotNull JsonObject
json)
+ {
+ JsonObject cassandraYaml = json.getJsonObject("cassandraYaml");
+ JsonObject jvmOptsJson = json.getJsonObject("extraJvmOpts");
+ Map<String, String> extraJvmOpts = null;
+ if (jvmOptsJson != null)
+ {
+ extraJvmOpts = new LinkedHashMap<>();
+ for (Map.Entry<String, Object> entry : jvmOptsJson)
+ {
+ extraJvmOpts.put(entry.getKey(),
String.valueOf(entry.getValue()));
+ }
+ }
+ return new CassandraConfigurationOverlay(cassandraYaml, extraJvmOpts);
+ }
+
/**
* Returns a new overlay with the given updates applied. The current
instance is not modified.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/ConfigurationOverlaySnapshot.java
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/ConfigurationOverlaySnapshot.java
index 984cac07..c36326f3 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/ConfigurationOverlaySnapshot.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/ConfigurationOverlaySnapshot.java
@@ -75,6 +75,33 @@ public class ConfigurationOverlaySnapshot
return overlay;
}
+ /**
+ * Returns a JSON representation of this snapshot, suitable for
persistence.
+ *
+ * @return a new {@link JsonObject} representing this snapshot
+ */
+ @NotNull
+ public JsonObject toJson()
+ {
+ return new JsonObject()
+ .put("lastModified", lastModified.toString())
+ .put("overlay", overlay.toJson());
+ }
+
+ /**
+ * Creates a {@link ConfigurationOverlaySnapshot} from its JSON
representation.
+ *
+ * @param json the JSON object containing {@code lastModified} and {@code
overlay}
+ * @return a new snapshot instance
+ */
+ @NotNull
+ public static ConfigurationOverlaySnapshot fromJson(@NotNull JsonObject
json)
+ {
+ Instant lastModified = Instant.parse(json.getString("lastModified"));
+ CassandraConfigurationOverlay overlay =
CassandraConfigurationOverlay.fromJson(json.getJsonObject("overlay"));
+ return new ConfigurationOverlaySnapshot(lastModified, overlay);
+ }
+
private String computeHash()
{
try
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProvider.java
new file mode 100644
index 00000000..b9d48d29
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProvider.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.configmanagement;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.vertx.core.json.JsonObject;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File-based implementation of {@link ConfigurationProvider} that persists
configuration
+ * overlays as JSON files within a configuration store directory.
+ *
+ * <p>Each instance's overlay is stored at {@code
{configDir}/{instanceId}/overlay.json}.
+ * Writes are atomic (write to temp file, then rename) to prevent corruption
from crashes.
+ *
+ * <p>Concurrency is handled via {@link ConcurrentHashMap#compute}, which
provides per-key
+ * mutual exclusion.
+ */
+public class FileBasedConfigurationProvider implements ConfigurationProvider
+{
+ private static final String CONFIG_FILE_NAME = "overlay.json";
+
+ private final Path configDir;
+ private final ConcurrentHashMap<Integer, ConfigurationOverlaySnapshot>
overlays = new ConcurrentHashMap<>();
+
+ public FileBasedConfigurationProvider(Path configDir)
+ {
+ this.configDir = Objects.requireNonNull(configDir, "configDir must not
be null");
+ }
+
+ @Override
+ @Nullable
+ public ConfigurationOverlaySnapshot getOverlay(InstanceMetadata instance)
+ {
+ return readFromDisk(instance);
+ }
+
+ @Override
+ public boolean storeOverlay(InstanceMetadata instance,
+ @Nullable String originalHash,
+ @NotNull ConfigurationOverlaySnapshot
newSnapshot)
+ {
+ Objects.requireNonNull(newSnapshot, "newSnapshot must not be null");
+ boolean storeOverlay = overlays.compute(instance.id(), (k, cached) -> {
+ ConfigurationOverlaySnapshot current = cached != null ? cached :
readFromDisk(instance);
+
+ if (current == null && originalHash != null)
+ {
+ return null;
+ }
+
+ if (current != null && (originalHash == null ||
!current.hash().equals(originalHash)))
+ {
+ return current;
+ }
+
+ return newSnapshot;
+ }) == newSnapshot;
+
+ if (storeOverlay)
+ {
+ writeToDisk(instance, newSnapshot);
+ }
+ return storeOverlay;
+ }
+
+ @Nullable
+ private ConfigurationOverlaySnapshot readFromDisk(InstanceMetadata
instance)
+ {
+ Path configFile =
resolveInstanceDir(instance).resolve(CONFIG_FILE_NAME);
+ if (!Files.exists(configFile))
+ {
+ return null;
+ }
+ try
+ {
+ String content = Files.readString(configFile,
StandardCharsets.UTF_8);
+ return ConfigurationOverlaySnapshot.fromJson(new
JsonObject(content));
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException("Failed to read configuration
overlay for instance " + instance.id(), e);
+ }
+ }
+
+ private void writeToDisk(InstanceMetadata instance,
ConfigurationOverlaySnapshot snapshot)
+ {
+ Path instanceDir = resolveInstanceDir(instance);
+ Path configFile = instanceDir.resolve(CONFIG_FILE_NAME);
+ Path tempFile = null;
+ try
+ {
+ Files.createDirectories(instanceDir);
+ tempFile = Files.createTempFile(instanceDir, "config", ".tmp");
+ Files.writeString(tempFile, snapshot.toJson().encodePrettily(),
StandardCharsets.UTF_8);
+ Files.move(tempFile, configFile, StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ catch (IOException e)
+ {
+ if (tempFile != null)
+ {
+ try
+ {
+ Files.deleteIfExists(tempFile);
+ }
+ catch (IOException suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ }
+ throw new UncheckedIOException("Failed to store configuration
overlay for instance " + instance.id(), e);
+ }
+ }
+
+ private Path resolveInstanceDir(InstanceMetadata instance)
+ {
+ return configDir.resolve(String.valueOf(instance.id()));
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProviderTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProviderTest.java
new file mode 100644
index 00000000..68ca3998
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/configmanagement/FileBasedConfigurationProviderTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.configmanagement;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.json.JsonObject;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link FileBasedConfigurationProvider}
+ */
+class FileBasedConfigurationProviderTest
+{
+ @TempDir
+ Path tempDir;
+
+ private FileBasedConfigurationProvider provider;
+ private InstanceMetadata instance1;
+ private InstanceMetadata instance2;
+
+ @BeforeEach
+ void setUp()
+ {
+ provider = new FileBasedConfigurationProvider(tempDir);
+ instance1 = mockInstance(1);
+ instance2 = mockInstance(2);
+ }
+
+ @Test
+ void testGetReturnsNullForUnknownInstance()
+ {
+ assertThat(provider.getOverlay(instance1)).isNull();
+ }
+
+ @Test
+ void testStoreAndGet()
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", 64);
+
+ boolean stored = provider.storeOverlay(instance1, null, snapshot);
+
+ assertThat(stored).isTrue();
+ ConfigurationOverlaySnapshot fetched = provider.getOverlay(instance1);
+ assertThat(fetched).isNotNull();
+ assertThat(fetched.overlay()).isEqualTo(snapshot.overlay());
+ assertThat(fetched.lastModified()).isEqualTo(snapshot.lastModified());
+ assertThat(fetched.hash()).isEqualTo(snapshot.hash());
+ }
+
+ @Test
+ void testStoreReturnsTrueOnSuccess()
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("memtable_flush_writers", 8);
+
+ assertThat(provider.storeOverlay(instance1, null, snapshot)).isTrue();
+ }
+
+ @Test
+ void testStoreReturnsFalseOnHashMismatch()
+ {
+ ConfigurationOverlaySnapshot initial =
createSnapshot("concurrent_reads", 32);
+ provider.storeOverlay(instance1, null, initial);
+
+ ConfigurationOverlaySnapshot update =
createSnapshot("concurrent_reads", 64);
+ assertThat(provider.storeOverlay(instance1, "sha256:stale",
update)).isFalse();
+
+ // Original is preserved
+ ConfigurationOverlaySnapshot fetched = provider.getOverlay(instance1);
+ assertThat(fetched.overlay()).isEqualTo(initial.overlay());
+ }
+
+ @Test
+ void testStoreReturnsFalseWhenNoOverlayButHashProvided()
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", 32);
+ assertThat(provider.storeOverlay(instance1, "sha256:unexpected",
snapshot)).isFalse();
+ assertThat(provider.getOverlay(instance1)).isNull();
+ }
+
+ @Test
+ void testStoreReturnsFalseWhenOverlayExistsButNullHashProvided()
+ {
+ ConfigurationOverlaySnapshot initial =
createSnapshot("concurrent_reads", 32);
+ provider.storeOverlay(instance1, null, initial);
+
+ ConfigurationOverlaySnapshot update =
createSnapshot("concurrent_reads", 64);
+ assertThat(provider.storeOverlay(instance1, null, update)).isFalse();
+
+ // Original is preserved
+ ConfigurationOverlaySnapshot fetched = provider.getOverlay(instance1);
+ assertThat(fetched.overlay()).isEqualTo(initial.overlay());
+ }
+
+ @Test
+ void testInstanceIsolation()
+ {
+ ConfigurationOverlaySnapshot snap1 =
createSnapshot("concurrent_reads", 32);
+ ConfigurationOverlaySnapshot snap2 =
createSnapshot("concurrent_reads", 64);
+
+ provider.storeOverlay(instance1, null, snap1);
+ provider.storeOverlay(instance2, null, snap2);
+
+ ConfigurationOverlaySnapshot fetched1 = provider.getOverlay(instance1);
+ ConfigurationOverlaySnapshot fetched2 = provider.getOverlay(instance2);
+ assertThat(fetched1.overlay()).isEqualTo(snap1.overlay());
+ assertThat(fetched2.overlay()).isEqualTo(snap2.overlay());
+ }
+
+ @Test
+ void testConcurrentStoresDifferentInstances() throws Exception
+ {
+ int instanceCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(instanceCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ for (int i = 0; i < instanceCount; i++)
+ {
+ int instanceId = i;
+ futures.add(executor.submit(() ->
+ {
+ startLatch.await();
+ InstanceMetadata instance = mockInstance(instanceId);
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", instanceId * 10);
+ return provider.storeOverlay(instance, null, snapshot);
+ }));
+ }
+
+ startLatch.countDown();
+ for (Future<Boolean> future : futures)
+ {
+ assertThat(future.get(5, TimeUnit.SECONDS)).isTrue();
+ }
+
+ for (int i = 0; i < instanceCount; i++)
+ {
+ assertThat(provider.getOverlay(mockInstance(i))).isNotNull();
+ }
+
+ executor.shutdown();
+ }
+
+ @Test
+ void testConcurrentStoresSameInstance() throws Exception
+ {
+ ConfigurationOverlaySnapshot initial =
createSnapshot("concurrent_reads", 32);
+ provider.storeOverlay(instance1, null, initial);
+ String hashBeforeRace = initial.hash();
+
+ int threadCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ AtomicInteger successes = new AtomicInteger(0);
+ AtomicInteger conflicts = new AtomicInteger(0);
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ int value = (i + 1) * 100;
+ futures.add(executor.submit(() ->
+ {
+ try
+ {
+ startLatch.await();
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", value);
+ boolean stored = provider.storeOverlay(instance1,
hashBeforeRace, snapshot);
+ if (stored)
+ {
+ successes.incrementAndGet();
+ }
+ else
+ {
+ conflicts.incrementAndGet();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }));
+ }
+
+ startLatch.countDown();
+ for (Future<?> future : futures)
+ {
+ future.get(5, TimeUnit.SECONDS);
+ }
+
+ assertThat(successes.get()).isEqualTo(1);
+ assertThat(conflicts.get()).isEqualTo(threadCount - 1);
+
+ executor.shutdown();
+ }
+
+ @Test
+ void testPersistenceAcrossProviderInstances()
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", 64);
+ provider.storeOverlay(instance1, null, snapshot);
+
+ FileBasedConfigurationProvider newProvider = new
FileBasedConfigurationProvider(tempDir);
+ ConfigurationOverlaySnapshot fetched =
newProvider.getOverlay(instance1);
+
+ assertThat(fetched).isNotNull();
+ assertThat(fetched.overlay()).isEqualTo(snapshot.overlay());
+ assertThat(fetched.lastModified()).isEqualTo(snapshot.lastModified());
+ assertThat(fetched.hash()).isEqualTo(snapshot.hash());
+ }
+
+ @Test
+ void testCreatesDirectoryStructure()
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", 64);
+ provider.storeOverlay(instance1, null, snapshot);
+
+ Path instanceDir = tempDir.resolve("1");
+ assertThat(instanceDir).isDirectory();
+ assertThat(instanceDir.resolve("overlay.json")).isRegularFile();
+ }
+
+ @Test
+ void testAtomicWriteNoPartialFile() throws IOException
+ {
+ ConfigurationOverlaySnapshot snapshot =
createSnapshot("concurrent_reads", 64);
+ provider.storeOverlay(instance1, null, snapshot);
+
+ Path instanceDir = tempDir.resolve("1");
+ try (Stream<Path> files = Files.list(instanceDir))
+ {
+ List<String> fileNames = files.map(p ->
p.getFileName().toString()).sorted().collect(Collectors.toList());
+ assertThat(fileNames).containsExactly("overlay.json");
+ }
+ }
+
+ @Test
+ void testOverwriteExistingOverlay()
+ {
+ ConfigurationOverlaySnapshot initial =
createSnapshot("concurrent_reads", 32);
+ provider.storeOverlay(instance1, null, initial);
+ String hash = initial.hash();
+
+ ConfigurationOverlaySnapshot updated =
createSnapshot("concurrent_reads", 128);
+ assertThat(provider.storeOverlay(instance1, hash, updated)).isTrue();
+
+ ConfigurationOverlaySnapshot fetched = provider.getOverlay(instance1);
+ assertThat(fetched.overlay()).isEqualTo(updated.overlay());
+ assertThat(fetched.hash()).isEqualTo(updated.hash());
+ }
+
+ @Test
+ void testStoreWithExtraJvmOpts()
+ {
+ JsonObject yaml = new JsonObject().put("concurrent_reads", 64);
+ CassandraConfigurationOverlay overlay = new
CassandraConfigurationOverlay(yaml,
+
Collections.singletonMap("-Xmx", "4G"));
+ ConfigurationOverlaySnapshot snapshot = new
ConfigurationOverlaySnapshot(Instant.now(), overlay);
+
+ provider.storeOverlay(instance1, null, snapshot);
+
+ ConfigurationOverlaySnapshot fetched = provider.getOverlay(instance1);
+ assertThat(fetched).isNotNull();
+ assertThat(fetched.overlay().extraJvmOpts()).containsEntry("-Xmx",
"4G");
+
assertThat(fetched.overlay().cassandraYaml().getInteger("concurrent_reads")).isEqualTo(64);
+ }
+
+ private static ConfigurationOverlaySnapshot createSnapshot(String field,
int value)
+ {
+ JsonObject yaml = new JsonObject().put(field, value);
+ CassandraConfigurationOverlay overlay = new
CassandraConfigurationOverlay(yaml, null);
+ return new ConfigurationOverlaySnapshot(Instant.now(), overlay);
+ }
+
+ private static InstanceMetadata mockInstance(int id)
+ {
+ InstanceMetadata instance = mock(InstanceMetadata.class);
+ when(instance.id()).thenReturn(id);
+ return instance;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]