This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 097d22180 [server] Added thread-safety to CoordinatorEventProcessor
for Completed Snapshots (#2221)
097d22180 is described below
commit 097d221808fa17c0250c2d14ef3fe9ac68af511a
Author: Rion Williams <[email protected]>
AuthorDate: Wed Dec 24 10:24:50 2025 -0600
[server] Added thread-safety to CoordinatorEventProcessor for Completed
Snapshots (#2221)
---
.../server/kv/snapshot/CompletedSnapshotStore.java | 76 +++--
.../kv/snapshot/CompletedSnapshotStoreTest.java | 334 +++++++++++++++++++++
2 files changed, 382 insertions(+), 28 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
index 3e93cc46a..b67f98466 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
@@ -26,6 +26,8 @@ import org.apache.fluss.metadata.TableBucket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,8 +37,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -47,6 +51,7 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
* managing the completed snapshots including store/subsume/get completed
snapshots for single one
* table bucket.
*/
+@ThreadSafe
public class CompletedSnapshotStore {
private static final Logger LOG =
LoggerFactory.getLogger(CompletedSnapshotStore.class);
@@ -62,6 +67,8 @@ public class CompletedSnapshotStore {
private final Executor ioExecutor;
private final SnapshotsCleaner snapshotsCleaner;
+ private final ReentrantLock lock = new ReentrantLock();
+
/**
* Local copy of the completed snapshots in snapshot store. This is
restored from snapshot
* handel store when recovering.
@@ -84,7 +91,11 @@ public class CompletedSnapshotStore {
}
public void add(final CompletedSnapshot completedSnapshot) throws
Exception {
- addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, ()
-> {});
+ inLock(
+ lock,
+ () ->
+ addSnapshotAndSubsumeOldestOne(
+ completedSnapshot, snapshotsCleaner, () ->
{}));
}
public long getPhysicalStorageRemoteKvSize() {
@@ -92,7 +103,7 @@ public class CompletedSnapshotStore {
}
public long getNumSnapshots() {
- return completedSnapshots.size();
+ return inLock(lock, () -> completedSnapshots.size());
}
/**
@@ -117,34 +128,43 @@ public class CompletedSnapshotStore {
snapshot.getTableBucket(), snapshot.getSnapshotID(),
completedSnapshotHandle);
// Now add the new one. If it fails, we don't want to lose existing
data.
- completedSnapshots.addLast(snapshot);
-
- // Remove completed snapshot from queue and snapshotStateHandleStore,
not discard.
- Optional<CompletedSnapshot> subsume =
- subsume(
- completedSnapshots,
- maxNumberOfSnapshotsToRetain,
- completedSnapshot -> {
- remove(
- completedSnapshot.getTableBucket(),
- completedSnapshot.getSnapshotID());
-
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
- });
-
- findLowest(completedSnapshots)
- .ifPresent(
- id -> {
- // unregister the unused kv file, which will then
cause the kv file
- // deletion
- sharedKvFileRegistry.unregisterUnusedKvFile(id);
- snapshotsCleaner.cleanSubsumedSnapshots(
- id, Collections.emptySet(), postCleanup,
ioExecutor);
- });
- return subsume.orElse(null);
+ return inLock(
+ lock,
+ () -> {
+ completedSnapshots.addLast(snapshot);
+
+ // Remove completed snapshot from queue and
snapshotStateHandleStore, not
+ // discard.
+ Optional<CompletedSnapshot> subsume =
+ subsume(
+ completedSnapshots,
+ maxNumberOfSnapshotsToRetain,
+ completedSnapshot -> {
+ remove(
+
completedSnapshot.getTableBucket(),
+
completedSnapshot.getSnapshotID());
+
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
+ });
+
+ findLowest(completedSnapshots)
+ .ifPresent(
+ id -> {
+ // unregister the unused kv file,
which will then cause the
+ // kv file
+ // deletion
+
sharedKvFileRegistry.unregisterUnusedKvFile(id);
+
snapshotsCleaner.cleanSubsumedSnapshots(
+ id,
+ Collections.emptySet(),
+ postCleanup,
+ ioExecutor);
+ });
+ return subsume.orElse(null);
+ });
}
public List<CompletedSnapshot> getAllSnapshots() {
- return new ArrayList<>(completedSnapshots);
+ return inLock(lock, () -> new ArrayList<>(completedSnapshots));
}
private static Optional<CompletedSnapshot> subsume(
@@ -211,7 +231,7 @@ public class CompletedSnapshotStore {
* added.
*/
public Optional<CompletedSnapshot> getLatestSnapshot() {
- return Optional.ofNullable(completedSnapshots.peekLast());
+ return inLock(lock, () ->
Optional.ofNullable(completedSnapshots.peekLast()));
}
/**
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
index 8d45628d8..1ab73cfee 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
@@ -34,12 +34,16 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@@ -53,11 +57,13 @@ class CompletedSnapshotStoreTest {
private ExecutorService executorService;
private TestCompletedSnapshotHandleStore.Builder builder;
+ private TestCompletedSnapshotHandleStore defaultHandleStore;
private @TempDir Path tempDir;
@BeforeEach
void setup() {
builder = TestCompletedSnapshotHandleStore.newBuilder();
+ defaultHandleStore = builder.build();
executorService = Executors.newFixedThreadPool(2, new
ExecutorThreadFactory("IO-Executor"));
}
@@ -171,6 +177,334 @@ class CompletedSnapshotStoreTest {
assertThat(completedSnapshotStore.getLatestSnapshot().get().getSnapshotID()).isOne();
}
+ @Test
+ void testConcurrentAdds() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(10, defaultHandleStore,
Collections.emptyList());
+
+ final int numThreads = 10;
+ final int snapshotsPerThread = 5;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numThreads, new
ExecutorThreadFactory("concurrent-add-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up threads to add snapshots concurrently
+ for (int threadId = 0; threadId < numThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerThread; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerThread + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+ assertThat(allSnapshots.size())
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(10);
+
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no corruption)")
+ .isTrue();
+ }
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentReadsAndWrites() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(5, defaultHandleStore,
Collections.emptyList());
+
+ final int numWriterThreads = 5;
+ final int numReaderThreads = 3;
+ final int snapshotsPerWriter = 3;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numWriterThreads + numReaderThreads,
+ new ExecutorThreadFactory("concurrent-read-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch =
+ new CountDownLatch(numWriterThreads + numReaderThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up snapshot writer threads
+ for (int threadId = 0; threadId < numWriterThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerWriter; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerWriter + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Spin up snapshot reader threads (during writes)
+ for (int threadId = 0; threadId < numReaderThreads; threadId++) {
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < 50; i++) {
+ // Read operations
+ completedSnapshotStore.getNumSnapshots();
+ completedSnapshotStore.getAllSnapshots();
+ completedSnapshotStore.getLatestSnapshot();
+ // Introduce tiny wait to intersperse
reads/writes
+ Thread.sleep(2);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ exceptionCount.incrementAndGet();
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ assertThat(numSnapshots)
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(5);
+
+ if (!allSnapshots.isEmpty()) {
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no
corruption)")
+ .isTrue();
+ }
+ }
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentAddsWithSnapshotRetention() throws Exception {
+ final int maxRetain = 3;
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(
+ maxRetain, defaultHandleStore,
Collections.emptyList());
+
+ final int numThreads = 5;
+ final int snapshotsPerThread = 3;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numThreads, new
ExecutorThreadFactory("concurrent-add-retention-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up threads to add snapshots concurrently
+ for (int threadId = 0; threadId < numThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerThread; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerThread + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+
+ assertThat(allSnapshots.size())
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(maxRetain);
+
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no corruption)")
+ .isTrue();
+ }
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentGetNumSnapshotsAccuracy() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(10, defaultHandleStore,
Collections.emptyList());
+
+ final int numOperations = 30;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ 10, new
ExecutorThreadFactory("concurrent-read-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numOperations);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up various different snapshot operations
+ for (int i = 0; i < numOperations; i++) {
+ final int operationId = i;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ if (operationId % 2 == 0) {
+ // Add snapshot
+ CompletedSnapshot snapshot =
getSnapshot(operationId + 1);
+ completedSnapshotStore.add(snapshot);
+ } else {
+ // Read snapshot
+ long numSnapshots =
completedSnapshotStore.getNumSnapshots();
+ List<CompletedSnapshot> allSnapshots =
+
completedSnapshotStore.getAllSnapshots();
+ assertThat(numSnapshots)
+ .as(
+ "getNumSnapshots() should
match getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+ }
+ } catch (AssertionError e) {
+ throw e;
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all operations simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All operations should
complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+ assertThat(numSnapshots)
+ .as("Final getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
private List<CompletedSnapshot> mapToCompletedSnapshot(
List<Tuple2<CompletedSnapshotHandle, String>> snapshotHandles) {
return snapshotHandles.stream()