Copilot commented on code in PR #2221:
URL: https://github.com/apache/fluss/pull/2221#discussion_r2638404191
##########
fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java:
##########
@@ -171,6 +177,334 @@ void testAddSnapshotFailedShouldNotRemoveOldOnes() {
assertThat(completedSnapshotStore.getLatestSnapshot().get().getSnapshotID()).isOne();
}
+ @Test
+ void testConcurrentAdds() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(10, defaultHandleStore,
Collections.emptyList());
+
+ final int numThreads = 10;
+ final int snapshotsPerThread = 5;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numThreads, new
ExecutorThreadFactory("concurrent-add-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up threads to add snapshots concurrently
+ for (int threadId = 0; threadId < numThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerThread; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerThread + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+ assertThat(allSnapshots.size())
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(10);
+
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no corruption)")
+ .isTrue();
+ }
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentReadsAndWrites() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(5, defaultHandleStore,
Collections.emptyList());
+
+ final int numWriterThreads = 5;
+ final int numReaderThreads = 3;
+ final int snapshotsPerWriter = 3;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numWriterThreads + numReaderThreads,
+ new ExecutorThreadFactory("concurrent-read-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch =
+ new CountDownLatch(numWriterThreads + numReaderThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up snapshot writer threads
+ for (int threadId = 0; threadId < numWriterThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerWriter; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerWriter + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Spin up snapshot reader threads (during writes)
+ for (int threadId = 0; threadId < numReaderThreads; threadId++) {
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < 50; i++) {
+ // Read operations
+ completedSnapshotStore.getNumSnapshots();
+ completedSnapshotStore.getAllSnapshots();
+ completedSnapshotStore.getLatestSnapshot();
+ // Introduce tiny wait to intersperse
reads/writes
+ Thread.sleep(2);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ exceptionCount.incrementAndGet();
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ assertThat(numSnapshots)
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(5);
+
+ if (!allSnapshots.isEmpty()) {
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no
corruption)")
+ .isTrue();
+ }
+ }
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentAddsWithSnapshotRetention() throws Exception {
+ final int maxRetain = 3;
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(
+ maxRetain, defaultHandleStore,
Collections.emptyList());
+
+ final int numThreads = 5;
+ final int snapshotsPerThread = 3;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ numThreads, new
ExecutorThreadFactory("concurrent-add-retention-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numThreads);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up threads to add snapshots concurrently
+ for (int threadId = 0; threadId < numThreads; threadId++) {
+ final int finalThreadId = threadId;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < snapshotsPerThread; i++) {
+ long snapshotId =
+ (long) finalThreadId *
snapshotsPerThread + i + 1;
+ CompletedSnapshot snapshot =
getSnapshot(snapshotId);
+ completedSnapshotStore.add(snapshot);
+ }
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+ boolean completed = completionLatch.await(30, TimeUnit.SECONDS);
+ assertThat(completed).as("All threads should complete").isTrue();
+
+ // Ensure time for async cleanup to finish
+ Thread.sleep(100);
+
+ assertThat(exceptionCount.get()).as("No exceptions should
occur").isEqualTo(0);
+
+ List<CompletedSnapshot> allSnapshots =
completedSnapshotStore.getAllSnapshots();
+
+ assertThat(allSnapshots.size())
+ .as("Should retain at most maxNumberOfSnapshotsToRetain
snapshots")
+ .isLessThanOrEqualTo(maxRetain);
+
+ Set<Long> snapshotIds = new HashSet<>();
+ for (CompletedSnapshot snapshot : allSnapshots) {
+ assertThat(snapshotIds.add(snapshot.getSnapshotID()))
+ .as("Snapshot IDs should be unique (no corruption)")
+ .isTrue();
+ }
+
+ long numSnapshots = completedSnapshotStore.getNumSnapshots();
+ assertThat(numSnapshots)
+ .as("getNumSnapshots() should match
getAllSnapshots().size()")
+ .isEqualTo(allSnapshots.size());
+
+ if (!allSnapshots.isEmpty()) {
+ Optional<CompletedSnapshot> latest =
completedSnapshotStore.getLatestSnapshot();
+ assertThat(latest).as("Latest snapshot should be
present").isPresent();
+ assertThat(latest.get())
+ .as("Latest snapshot should match last in
getAllSnapshots()")
+ .isEqualTo(allSnapshots.get(allSnapshots.size() - 1));
+ }
+ } finally {
+ testExecutor.shutdown();
+ }
+ }
+
+ @Test
+ void testConcurrentGetNumSnapshotsAccuracy() throws Exception {
+ final CompletedSnapshotStore completedSnapshotStore =
+ createCompletedSnapshotStore(10, defaultHandleStore,
Collections.emptyList());
+
+ final int numOperations = 30;
+ final ExecutorService testExecutor =
+ Executors.newFixedThreadPool(
+ 10, new
ExecutorThreadFactory("concurrent-read-thread"));
+
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(numOperations);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ // Spin up various different snapshot operations
+ for (int i = 0; i < numOperations; i++) {
+ final int operationId = i;
+ testExecutor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ if (operationId % 2 == 0) {
+ // Add snapshot
+ CompletedSnapshot snapshot =
getSnapshot(operationId + 1);
+ completedSnapshotStore.add(snapshot);
+ } else {
+ // Read reapshot
Review Comment:
Corrected spelling of 'reapshot' to 'snapshot'.
```suggestion
// Read snapshot
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]