Copilot commented on code in PR #2221:
URL: https://github.com/apache/fluss/pull/2221#discussion_r2638404191


##########
fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java:
##########
@@ -171,6 +177,334 @@ void testAddSnapshotFailedShouldNotRemoveOldOnes() {
         
assertThat(completedSnapshotStore.getLatestSnapshot().get().getSnapshotID()).isOne();
     }
 
+    @Test
+    void testConcurrentAdds() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(10, defaultHandleStore, 
Collections.emptyList());
+
+        final int numThreads = 10;
+        final int snapshotsPerThread = 5;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numThreads, new 
ExecutorThreadFactory("concurrent-add-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up threads to add snapshots concurrently
+            for (int threadId = 0; threadId < numThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerThread; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerThread + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+            assertThat(allSnapshots.size())
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(10);
+
+            Set<Long> snapshotIds = new HashSet<>();
+            for (CompletedSnapshot snapshot : allSnapshots) {
+                assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                        .as("Snapshot IDs should be unique (no corruption)")
+                        .isTrue();
+            }
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentReadsAndWrites() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(5, defaultHandleStore, 
Collections.emptyList());
+
+        final int numWriterThreads = 5;
+        final int numReaderThreads = 3;
+        final int snapshotsPerWriter = 3;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numWriterThreads + numReaderThreads,
+                        new ExecutorThreadFactory("concurrent-read-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch =
+                    new CountDownLatch(numWriterThreads + numReaderThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up snapshot writer threads
+            for (int threadId = 0; threadId < numWriterThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerWriter; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerWriter + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Spin up snapshot reader threads (during writes)
+            for (int threadId = 0; threadId < numReaderThreads; threadId++) {
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < 50; i++) {
+                                    // Read operations
+                                    completedSnapshotStore.getNumSnapshots();
+                                    completedSnapshotStore.getAllSnapshots();
+                                    completedSnapshotStore.getLatestSnapshot();
+                                    // Introduce tiny wait to intersperse 
reads/writes
+                                    Thread.sleep(2);
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                exceptionCount.incrementAndGet();
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            assertThat(numSnapshots)
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(5);
+
+            if (!allSnapshots.isEmpty()) {
+                Set<Long> snapshotIds = new HashSet<>();
+                for (CompletedSnapshot snapshot : allSnapshots) {
+                    assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                            .as("Snapshot IDs should be unique (no 
corruption)")
+                            .isTrue();
+                }
+            }
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentAddsWithSnapshotRetention() throws Exception {
+        final int maxRetain = 3;
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(
+                        maxRetain, defaultHandleStore, 
Collections.emptyList());
+
+        final int numThreads = 5;
+        final int snapshotsPerThread = 3;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        numThreads, new 
ExecutorThreadFactory("concurrent-add-retention-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numThreads);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up threads to add snapshots concurrently
+            for (int threadId = 0; threadId < numThreads; threadId++) {
+                final int finalThreadId = threadId;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                for (int i = 0; i < snapshotsPerThread; i++) {
+                                    long snapshotId =
+                                            (long) finalThreadId * 
snapshotsPerThread + i + 1;
+                                    CompletedSnapshot snapshot = 
getSnapshot(snapshotId);
+                                    completedSnapshotStore.add(snapshot);
+                                }
+                            } catch (Exception e) {
+                                exceptionCount.incrementAndGet();
+                            } finally {
+                                completionLatch.countDown();
+                            }
+                        });
+            }
+
+            // Start all threads simultaneously
+            startLatch.countDown();
+            boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+            assertThat(completed).as("All threads should complete").isTrue();
+
+            // Ensure time for async cleanup to finish
+            Thread.sleep(100);
+
+            assertThat(exceptionCount.get()).as("No exceptions should 
occur").isEqualTo(0);
+
+            List<CompletedSnapshot> allSnapshots = 
completedSnapshotStore.getAllSnapshots();
+
+            assertThat(allSnapshots.size())
+                    .as("Should retain at most maxNumberOfSnapshotsToRetain 
snapshots")
+                    .isLessThanOrEqualTo(maxRetain);
+
+            Set<Long> snapshotIds = new HashSet<>();
+            for (CompletedSnapshot snapshot : allSnapshots) {
+                assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+                        .as("Snapshot IDs should be unique (no corruption)")
+                        .isTrue();
+            }
+
+            long numSnapshots = completedSnapshotStore.getNumSnapshots();
+            assertThat(numSnapshots)
+                    .as("getNumSnapshots() should match 
getAllSnapshots().size()")
+                    .isEqualTo(allSnapshots.size());
+
+            if (!allSnapshots.isEmpty()) {
+                Optional<CompletedSnapshot> latest = 
completedSnapshotStore.getLatestSnapshot();
+                assertThat(latest).as("Latest snapshot should be 
present").isPresent();
+                assertThat(latest.get())
+                        .as("Latest snapshot should match last in 
getAllSnapshots()")
+                        .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+            }
+        } finally {
+            testExecutor.shutdown();
+        }
+    }
+
+    @Test
+    void testConcurrentGetNumSnapshotsAccuracy() throws Exception {
+        final CompletedSnapshotStore completedSnapshotStore =
+                createCompletedSnapshotStore(10, defaultHandleStore, 
Collections.emptyList());
+
+        final int numOperations = 30;
+        final ExecutorService testExecutor =
+                Executors.newFixedThreadPool(
+                        10, new 
ExecutorThreadFactory("concurrent-read-thread"));
+
+        try {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numOperations);
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+
+            // Spin up various different snapshot operations
+            for (int i = 0; i < numOperations; i++) {
+                final int operationId = i;
+                testExecutor.submit(
+                        () -> {
+                            try {
+                                startLatch.await();
+                                if (operationId % 2 == 0) {
+                                    // Add snapshot
+                                    CompletedSnapshot snapshot = 
getSnapshot(operationId + 1);
+                                    completedSnapshotStore.add(snapshot);
+                                } else {
+                                    // Read reapshot

Review Comment:
   Corrected spelling of 'reapshot' to 'snapshot'.
   ```suggestion
                                       // Read snapshot
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to