This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 275a2ec88 [test] Fix unstable test ReplicaTest.testRestore (#2469)
275a2ec88 is described below

commit 275a2ec88a677376145c59f128adcf71d6fca94c
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jan 26 16:04:23 2026 +0800

    [test] Fix unstable test ReplicaTest.testRestore (#2469)
---
 .../kv/snapshot/PeriodicSnapshotManager.java       | 16 ++++++-
 .../apache/fluss/server/replica/ReplicaTest.java   | 53 ++++++----------------
 .../ManuallyTriggeredScheduledExecutorService.java | 26 +++++++++--
 3 files changed, 51 insertions(+), 44 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index caa1a531c..c6b42f0ef 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -79,6 +80,14 @@ public class PeriodicSnapshotManager implements Closeable {
     /** Whether snapshot is started. */
     private volatile boolean started = false;
 
+    /**
+     * The scheduled snapshot task.
+     *
+     * <p>Since all reads and writes of {@code scheduledTask} are protected by 
synchronized, the
+     * volatile modifier is not necessary here.
+     */
+    private ScheduledFuture<?> scheduledTask = null;
+
     private final long initialDelay;
     /** The table bucket that the snapshot manager is for. */
     private final TableBucket tableBucket;
@@ -164,7 +173,8 @@ public class PeriodicSnapshotManager implements Closeable {
                     "TableBucket {} schedules the next snapshot in {} seconds",
                     tableBucket,
                     delay / 1000);
-            periodicExecutor.schedule(this::triggerSnapshot, delay, 
TimeUnit.MILLISECONDS);
+            scheduledTask =
+                    periodicExecutor.schedule(this::triggerSnapshot, delay, 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -360,6 +370,10 @@ public class PeriodicSnapshotManager implements Closeable {
         synchronized (this) {
             // do-nothing, please make the periodicExecutor will be closed by 
external
             started = false;
+            // cancel the scheduled task if not completed yet
+            if (scheduledTask != null && !scheduledTask.isDone()) {
+                scheduledTask.cancel(true);
+            }
         }
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index fd487a0bc..8bdd46db6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -450,8 +450,8 @@ final class ReplicaTest extends ReplicaTestBase {
                         Tuple2.of("k2", new Object[] {3, "b1"}));
         putRecordsToLeader(kvReplica, kvRecords);
 
-        // trigger one snapshot,
-        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        // trigger one snapshot (task has been scheduled after becoming leader)
+        scheduledExecutorService.triggerAllNonPeriodicTasks();
 
         // wait until the snapshot 0 success
         CompletedSnapshot completedSnapshot0 =
@@ -472,7 +472,7 @@ final class ReplicaTest extends ReplicaTestBase {
         putRecordsToLeader(kvReplica, kvRecords);
 
         // trigger next checkpoint
-        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
         // wait until the snapshot 1 success
         CompletedSnapshot completedSnapshot1 =
                 kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
@@ -512,8 +512,8 @@ final class ReplicaTest extends ReplicaTestBase {
                         Tuple2.of("k3", new Object[] {5, "k3"}));
         putRecordsToLeader(kvReplica, kvRecords);
 
-        // trigger another one snapshot,
-        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        // trigger another one snapshot (task has been scheduled after 
becoming leader)
+        scheduledExecutorService.triggerAllNonPeriodicTasks();
         //  wait until the snapshot 2 success
         CompletedSnapshot completedSnapshot2 =
                 kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
@@ -583,8 +583,8 @@ final class ReplicaTest extends ReplicaTestBase {
                         Tuple2.of("k2", new Object[] {2, "b"}));
         putRecordsToLeader(kvReplica, kvRecords);
 
-        // trigger first snapshot
-        triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+        // trigger first snapshot (task has been scheduled after becoming 
leader)
+        scheduledExecutorService.triggerAllNonPeriodicTasks();
         kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
 
         // put more data and create second snapshot
@@ -594,8 +594,8 @@ final class ReplicaTest extends ReplicaTestBase {
                         Tuple2.of("k3", new Object[] {4, "d"}));
         putRecordsToLeader(kvReplica, kvRecords);
 
-        // trigger second snapshot
-        triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+        // trigger second snapshot (may need to wait the task being scheduled)
+        
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
         kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
 
         // put more data and create third snapshot (this will be the broken 
one)
@@ -605,8 +605,8 @@ final class ReplicaTest extends ReplicaTestBase {
                         Tuple2.of("k5", new Object[] {6, "f"}));
         putRecordsToLeader(kvReplica, kvRecords);
 
-        // trigger third snapshot
-        triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+        // trigger third snapshot (may need to wait the task being scheduled)
+        
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
         CompletedSnapshot snapshot2 = 
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
 
         // verify that snapshot2 is the latest one before we break it
@@ -688,12 +688,8 @@ final class ReplicaTest extends ReplicaTestBase {
                 getKeyValuePairs(genKvRecords(new Object[] {1, "a"}, new 
Object[] {2, "b"}));
         verifyGetKeyValues(kvTablet, expectedKeyValues);
 
-        // We have to remove the first scheduled snapshot task since it's for 
the previous kv tablet
-        // whose rocksdb has been dropped.
-        scheduledExecutorService.removeNonPeriodicScheduledTask();
-
-        // trigger one snapshot,
-        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        // trigger first snapshot (task has been scheduled after becoming 
leader)
+        scheduledExecutorService.triggerAllNonPeriodicTasks();
         // wait until the snapshot success
         kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
 
@@ -715,9 +711,8 @@ final class ReplicaTest extends ReplicaTestBase {
 
         // test recover with schema evolution.
         short newSchemaId = 2;
-        // trigger one snapshot.
-        scheduledExecutorService.removeNonPeriodicScheduledTask();
-        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        // trigger second snapshot (task has been scheduled after becoming 
leader again)
+        scheduledExecutorService.triggerAllNonPeriodicTasks();
         // wait until the snapshot success
         kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
         // write data with old schema
@@ -918,22 +913,4 @@ final class ReplicaTest extends ReplicaTestBase {
             isScheduled = false;
         }
     }
-
-    /** A helper function with support for retries for flaky triggering 
operations. */
-    private static void triggerSnapshotTaskWithRetry(
-            ManuallyTriggeredScheduledExecutorService 
scheduledExecutorService, int maxRetries)
-            throws Exception {
-        for (int i = 0; i < maxRetries; i++) {
-            try {
-                scheduledExecutorService.triggerNonPeriodicScheduledTask();
-                return;
-            } catch (java.util.NoSuchElementException e) {
-                if (i == maxRetries - 1) {
-                    throw e;
-                }
-
-                Thread.sleep(50);
-            }
-        }
-    }
 }
diff --git 
a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
 
b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
index 3e065a4f7..baaf0e2fb 100644
--- 
a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
+++ 
b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
@@ -19,6 +19,7 @@ package org.apache.fluss.testutils.common;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,9 +27,10 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -45,11 +47,11 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
 
     private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
 
-    private final ConcurrentLinkedQueue<ScheduledTask<?>> 
nonPeriodicScheduledTasks =
-            new ConcurrentLinkedQueue<>();
+    private final BlockingQueue<ScheduledTask<?>> nonPeriodicScheduledTasks =
+            new LinkedBlockingQueue<>();
 
-    private final ConcurrentLinkedQueue<ScheduledTask<?>> 
periodicScheduledTasks =
-            new ConcurrentLinkedQueue<>();
+    private final BlockingQueue<ScheduledTask<?>> periodicScheduledTasks =
+            new LinkedBlockingQueue<>();
 
     private boolean shutdown;
 
@@ -255,6 +257,20 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
         }
     }
 
+    /**
+     * Triggers next non-periodically scheduled task. If there is no such 
task, blocks until one is
+     * available for giving timeout. Throws {@link NoSuchElementException} if 
timeout reached.
+     */
+    public void triggerNextNonPeriodicScheduledTask(Duration timeout) throws 
InterruptedException {
+        final ScheduledTask<?> task =
+                nonPeriodicScheduledTasks.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        if (task != null) {
+            task.execute();
+        } else {
+            throw new NoSuchElementException("No scheduled task available 
within the timeout.");
+        }
+    }
+
     /** Remove the first non-periodically scheduled task. */
     public ScheduledTask<?> removeNonPeriodicScheduledTask() {
         return nonPeriodicScheduledTasks.poll();

Reply via email to