This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 097d22180 [server] Added thread-safety to CoordinatorEventProcessor 
for Completed Snapshots (#2221)
097d22180 is described below

commit 097d221808fa17c0250c2d14ef3fe9ac68af511a
Author: Rion Williams <[email protected]>
AuthorDate: Wed Dec 24 10:24:50 2025 -0600

    [server] Added thread-safety to CoordinatorEventProcessor for Completed 
Snapshots (#2221)
---
 .../server/kv/snapshot/CompletedSnapshotStore.java |  76 +++--
 .../kv/snapshot/CompletedSnapshotStoreTest.java    | 334 +++++++++++++++++++++
 2 files changed, 382 insertions(+), 28 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
index 3e93cc46a..b67f98466 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
@@ -26,6 +26,8 @@ import org.apache.fluss.metadata.TableBucket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,8 +37,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
 /* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -47,6 +51,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
  * managing the completed snapshots including store/subsume/get completed 
snapshots for single one
  * table bucket.
  */
+@ThreadSafe
 public class CompletedSnapshotStore {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompletedSnapshotStore.class);
@@ -62,6 +67,8 @@ public class CompletedSnapshotStore {
     private final Executor ioExecutor;
     private final SnapshotsCleaner snapshotsCleaner;
 
+    private final ReentrantLock lock = new ReentrantLock();
+
     /**
      * Local copy of the completed snapshots in snapshot store. This is 
restored from snapshot
      * handel store when recovering.
@@ -84,7 +91,11 @@ public class CompletedSnapshotStore {
     }
 
     public void add(final CompletedSnapshot completedSnapshot) throws 
Exception {
-        addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, () 
-> {});
+        inLock(
+                lock,
+                () ->
+                        addSnapshotAndSubsumeOldestOne(
+                                completedSnapshot, snapshotsCleaner, () -> 
{}));
     }
 
     public long getPhysicalStorageRemoteKvSize() {
@@ -92,7 +103,7 @@ public class CompletedSnapshotStore {
     }
 
     public long getNumSnapshots() {
-        return completedSnapshots.size();
+        return inLock(lock, () -> completedSnapshots.size());
     }
 
     /**
@@ -117,34 +128,43 @@ public class CompletedSnapshotStore {
                 snapshot.getTableBucket(), snapshot.getSnapshotID(), 
completedSnapshotHandle);
 
         // Now add the new one. If it fails, we don't want to lose existing 
data.
-        completedSnapshots.addLast(snapshot);
-
-        // Remove completed snapshot from queue and snapshotStateHandleStore, 
not discard.
-        Optional<CompletedSnapshot> subsume =
-                subsume(
-                        completedSnapshots,
-                        maxNumberOfSnapshotsToRetain,
-                        completedSnapshot -> {
-                            remove(
-                                    completedSnapshot.getTableBucket(),
-                                    completedSnapshot.getSnapshotID());
-                            
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
-                        });
-
-        findLowest(completedSnapshots)
-                .ifPresent(
-                        id -> {
-                            // unregister the unused kv file, which will then 
cause the kv file
-                            // deletion
-                            sharedKvFileRegistry.unregisterUnusedKvFile(id);
-                            snapshotsCleaner.cleanSubsumedSnapshots(
-                                    id, Collections.emptySet(), postCleanup, 
ioExecutor);
-                        });
-        return subsume.orElse(null);
+        return inLock(
+                lock,
+                () -> {
+                    completedSnapshots.addLast(snapshot);
+
+                    // Remove completed snapshot from queue and 
snapshotStateHandleStore, not
+                    // discard.
+                    Optional<CompletedSnapshot> subsume =
+                            subsume(
+                                    completedSnapshots,
+                                    maxNumberOfSnapshotsToRetain,
+                                    completedSnapshot -> {
+                                        remove(
+                                                
completedSnapshot.getTableBucket(),
+                                                
completedSnapshot.getSnapshotID());
+                                        
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
+                                    });
+
+                    findLowest(completedSnapshots)
+                            .ifPresent(
+                                    id -> {
+                                        // unregister the unused kv file, 
which will then cause the
+                                        // kv file
+                                        // deletion
+                                        
sharedKvFileRegistry.unregisterUnusedKvFile(id);
+                                        
snapshotsCleaner.cleanSubsumedSnapshots(
+                                                id,
+                                                Collections.emptySet(),
+                                                postCleanup,
+                                                ioExecutor);
+                                    });
+                    return subsume.orElse(null);
+                });
     }
 
     public List<CompletedSnapshot> getAllSnapshots() {
-        return new ArrayList<>(completedSnapshots);
+        return inLock(lock, () -> new ArrayList<>(completedSnapshots));
     }
 
     private static Optional<CompletedSnapshot> subsume(
@@ -211,7 +231,7 @@ public class CompletedSnapshotStore {
      * added.
      */
     public Optional<CompletedSnapshot> getLatestSnapshot() {
-        return Optional.ofNullable(completedSnapshots.peekLast());
+        return inLock(lock, () -> 
Optional.ofNullable(completedSnapshots.peekLast()));
     }
 
     /**
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
index 8d45628d8..1ab73cfee 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
@@ -34,12 +34,16 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
@@ -53,11 +57,13 @@ class CompletedSnapshotStoreTest {
 
     private ExecutorService executorService;
     private TestCompletedSnapshotHandleStore.Builder builder;
+    private TestCompletedSnapshotHandleStore defaultHandleStore;
     private @TempDir Path tempDir;
 
     @BeforeEach
     void setup() {
         builder = TestCompletedSnapshotHandleStore.newBuilder();
+        defaultHandleStore = builder.build();
         executorService = Executors.newFixedThreadPool(2, new 
ExecutorThreadFactory("IO-Executor"));
     }
 
@@ -171,6 +177,334 @@ class CompletedSnapshotStoreTest {
         
assertThat(completedSnapshotStore.getLatestSnapshot().get().getSnapshotID()).isOne();
     }
 
+    @Test
+    void testConcurrentAdds() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(10, defaultHandleStore, 
Collections.emptyList());
+
+        final int numThreads = 10;
+        final int snapshotsPerThread = 5;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numThreads, new 
ExecutorThreadFactory("concurrent-add-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up threads to add snapshots concurrently
+            for (int threadId = 0; threadId < numThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerThread; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerThread + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+            assertThat(allSnapshots.size())
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(10);
+
+            Set<Long> snapshotIds = new HashSet<>();
+            for (CompletedSnapshot snapshot : allSnapshots) {
+                assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                        .as("Snapshot IDs should be unique (no corruption)")
+                        .isTrue();
+            }
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentReadsAndWrites() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(5, defaultHandleStore, 
Collections.emptyList());
+
+        final int numWriterThreads = 5;
+        final int numReaderThreads = 3;
+        final int snapshotsPerWriter = 3;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numWriterThreads + numReaderThreads,
+                        new ExecutorThreadFactory("concurrent-read-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch =
+                    new CountDownLatch(numWriterThreads + numReaderThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up snapshot writer threads
+            for (int threadId = 0; threadId < numWriterThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerWriter; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerWriter + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Spin up snapshot reader threads (during writes)
+            for (int threadId = 0; threadId < numReaderThreads; threadId++) {
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < 50; i++) {
+                                    // Read operations
+                                    completedSnapshotStore.getNumSnapshots();
+                                    completedSnapshotStore.getAllSnapshots();
+                                    completedSnapshotStore.getLatestSnapshot();
+                                    // Introduce tiny wait to intersperse 
reads/writes
+                                    Thread.sleep(2);
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                exceptionCount.incrementAndGet();
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            assertThat(numSnapshots)
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(5);
+
+            if (!allSnapshots.isEmpty()) {
+                Set<Long> snapshotIds = new HashSet<>();
+                for (CompletedSnapshot snapshot : allSnapshots) {
+                    assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                            .as("Snapshot IDs should be unique (no 
corruption)")
+                            .isTrue();
+                }
+            }
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentAddsWithSnapshotRetention() throws Exception {
+        final int maxRetain = 3;
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(
+                        maxRetain, defaultHandleStore, 
Collections.emptyList());
+
+        final int numThreads = 5;
+        final int snapshotsPerThread = 3;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numThreads, new 
ExecutorThreadFactory("concurrent-add-retention-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up threads to add snapshots concurrently
+            for (int threadId = 0; threadId < numThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerThread; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerThread + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+
+            assertThat(allSnapshots.size())
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(maxRetain);
+
+            Set<Long> snapshotIds = new HashSet<>();
+            for (CompletedSnapshot snapshot : allSnapshots) {
+                assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                        .as("Snapshot IDs should be unique (no corruption)")
+                        .isTrue();
+            }
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentGetNumSnapshotsAccuracy() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(10, defaultHandleStore, 
Collections.emptyList());
+
+        final int numOperations = 30;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        10, new 
ExecutorThreadFactory("concurrent-read-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numOperations);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up various different snapshot operations
+            for (int i = 0; i < numOperations; i++) {
+                final int operationId = i;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                if (operationId % 2 == 0) {
+                                    // Add snapshot
+                                    CompletedSnapshot snapshot = 
getSnapshot(operationId + 1);
+                                    completedSnapshotStore.add(snapshot);
+                                } else {
+                                    // Read snapshot
+                                    long numSnapshots = 
completedSnapshotStore.getNumSnapshots();
+                                    List<CompletedSnapshot> allSnapshots =
+                                            
completedSnapshotStore.getAllSnapshots();
+                                    assertThat(numSnapshots)
+                                            .as(
+                                                    "getNumSnapshots() should 
match getAllSnapshots().size()")
+                                            .isEqualTo(allSnapshots.size());
+                                }
+                            } catch (AssertionError e) {
+                                throw e;
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all operations simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All operations should 
complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+            assertThat(numSnapshots)
+                    .as("Final getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
     private List<CompletedSnapshot> mapToCompletedSnapshot(
             List<Tuple2<CompletedSnapshotHandle, String>> snapshotHandles) {
         return snapshotHandles.stream()

Reply via email to