This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 275a2ec88 [test] Fix unstable test ReplicaTest.testRestore (#2469)
275a2ec88 is described below
commit 275a2ec88a677376145c59f128adcf71d6fca94c
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jan 26 16:04:23 2026 +0800
[test] Fix unstable test ReplicaTest.testRestore (#2469)
---
.../kv/snapshot/PeriodicSnapshotManager.java | 16 ++++++-
.../apache/fluss/server/replica/ReplicaTest.java | 53 ++++++----------------
.../ManuallyTriggeredScheduledExecutorService.java | 26 +++++++++--
3 files changed, 51 insertions(+), 44 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index caa1a531c..c6b42f0ef 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -79,6 +80,14 @@ public class PeriodicSnapshotManager implements Closeable {
/** Whether snapshot is started. */
private volatile boolean started = false;
+ /**
+ * The scheduled snapshot task.
+ *
+ * <p>Since all reads and writes of {@code scheduledTask} are protected by
synchronized, the
+ * volatile modifier is not necessary here.
+ */
+ private ScheduledFuture<?> scheduledTask = null;
+
private final long initialDelay;
/** The table bucket that the snapshot manager is for. */
private final TableBucket tableBucket;
@@ -164,7 +173,8 @@ public class PeriodicSnapshotManager implements Closeable {
"TableBucket {} schedules the next snapshot in {} seconds",
tableBucket,
delay / 1000);
- periodicExecutor.schedule(this::triggerSnapshot, delay,
TimeUnit.MILLISECONDS);
+ scheduledTask =
+ periodicExecutor.schedule(this::triggerSnapshot, delay,
TimeUnit.MILLISECONDS);
}
}
@@ -360,6 +370,10 @@ public class PeriodicSnapshotManager implements Closeable {
synchronized (this) {
// do-nothing, please make the periodicExecutor will be closed by
external
started = false;
+ // cancel the scheduled task if not completed yet
+ if (scheduledTask != null && !scheduledTask.isDone()) {
+ scheduledTask.cancel(true);
+ }
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index fd487a0bc..8bdd46db6 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -450,8 +450,8 @@ final class ReplicaTest extends ReplicaTestBase {
Tuple2.of("k2", new Object[] {3, "b1"}));
putRecordsToLeader(kvReplica, kvRecords);
- // trigger one snapshot,
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ // trigger one snapshot (task has been scheduled after becoming leader)
+ scheduledExecutorService.triggerAllNonPeriodicTasks();
// wait until the snapshot 0 success
CompletedSnapshot completedSnapshot0 =
@@ -472,7 +472,7 @@ final class ReplicaTest extends ReplicaTestBase {
putRecordsToLeader(kvReplica, kvRecords);
// trigger next checkpoint
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
// wait until the snapshot 1 success
CompletedSnapshot completedSnapshot1 =
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
@@ -512,8 +512,8 @@ final class ReplicaTest extends ReplicaTestBase {
Tuple2.of("k3", new Object[] {5, "k3"}));
putRecordsToLeader(kvReplica, kvRecords);
- // trigger another one snapshot,
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ // trigger another one snapshot (task has been scheduled after
becoming leader)
+ scheduledExecutorService.triggerAllNonPeriodicTasks();
// wait until the snapshot 2 success
CompletedSnapshot completedSnapshot2 =
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
@@ -583,8 +583,8 @@ final class ReplicaTest extends ReplicaTestBase {
Tuple2.of("k2", new Object[] {2, "b"}));
putRecordsToLeader(kvReplica, kvRecords);
- // trigger first snapshot
- triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+ // trigger first snapshot (task has been scheduled after becoming
leader)
+ scheduledExecutorService.triggerAllNonPeriodicTasks();
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
// put more data and create second snapshot
@@ -594,8 +594,8 @@ final class ReplicaTest extends ReplicaTestBase {
Tuple2.of("k3", new Object[] {4, "d"}));
putRecordsToLeader(kvReplica, kvRecords);
- // trigger second snapshot
- triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+ // trigger second snapshot (may need to wait the task being scheduled)
+
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
// put more data and create third snapshot (this will be the broken
one)
@@ -605,8 +605,8 @@ final class ReplicaTest extends ReplicaTestBase {
Tuple2.of("k5", new Object[] {6, "f"}));
putRecordsToLeader(kvReplica, kvRecords);
- // trigger third snapshot
- triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
+ // trigger third snapshot (may need to wait the task being scheduled)
+
scheduledExecutorService.triggerNextNonPeriodicScheduledTask(Duration.ofSeconds(30));
CompletedSnapshot snapshot2 =
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
// verify that snapshot2 is the latest one before we break it
@@ -688,12 +688,8 @@ final class ReplicaTest extends ReplicaTestBase {
getKeyValuePairs(genKvRecords(new Object[] {1, "a"}, new
Object[] {2, "b"}));
verifyGetKeyValues(kvTablet, expectedKeyValues);
- // We have to remove the first scheduled snapshot task since it's for
the previous kv tablet
- // whose rocksdb has been dropped.
- scheduledExecutorService.removeNonPeriodicScheduledTask();
-
- // trigger one snapshot,
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ // trigger first snapshot (task has been scheduled after becoming
leader)
+ scheduledExecutorService.triggerAllNonPeriodicTasks();
// wait until the snapshot success
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
@@ -715,9 +711,8 @@ final class ReplicaTest extends ReplicaTestBase {
// test recover with schema evolution.
short newSchemaId = 2;
- // trigger one snapshot.
- scheduledExecutorService.removeNonPeriodicScheduledTask();
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ // trigger second snapshot (task has been scheduled after becoming
leader again)
+ scheduledExecutorService.triggerAllNonPeriodicTasks();
// wait until the snapshot success
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
// write data with old schema
@@ -918,22 +913,4 @@ final class ReplicaTest extends ReplicaTestBase {
isScheduled = false;
}
}
-
- /** A helper function with support for retries for flaky triggering
operations. */
- private static void triggerSnapshotTaskWithRetry(
- ManuallyTriggeredScheduledExecutorService
scheduledExecutorService, int maxRetries)
- throws Exception {
- for (int i = 0; i < maxRetries; i++) {
- try {
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
- return;
- } catch (java.util.NoSuchElementException e) {
- if (i == maxRetries - 1) {
- throw e;
- }
-
- Thread.sleep(50);
- }
- }
- }
}
diff --git
a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
index 3e065a4f7..baaf0e2fb 100644
---
a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
+++
b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/ManuallyTriggeredScheduledExecutorService.java
@@ -19,6 +19,7 @@ package org.apache.fluss.testutils.common;
import javax.annotation.Nonnull;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -26,9 +27,10 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -45,11 +47,11 @@ public class ManuallyTriggeredScheduledExecutorService
implements ScheduledExecu
private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
- private final ConcurrentLinkedQueue<ScheduledTask<?>>
nonPeriodicScheduledTasks =
- new ConcurrentLinkedQueue<>();
+ private final BlockingQueue<ScheduledTask<?>> nonPeriodicScheduledTasks =
+ new LinkedBlockingQueue<>();
- private final ConcurrentLinkedQueue<ScheduledTask<?>>
periodicScheduledTasks =
- new ConcurrentLinkedQueue<>();
+ private final BlockingQueue<ScheduledTask<?>> periodicScheduledTasks =
+ new LinkedBlockingQueue<>();
private boolean shutdown;
@@ -255,6 +257,20 @@ public class ManuallyTriggeredScheduledExecutorService
implements ScheduledExecu
}
}
+ /**
+ * Triggers next non-periodically scheduled task. If there is no such
task, blocks until one is
+ * available for giving timeout. Throws {@link NoSuchElementException} if
timeout reached.
+ */
+ public void triggerNextNonPeriodicScheduledTask(Duration timeout) throws
InterruptedException {
+ final ScheduledTask<?> task =
+ nonPeriodicScheduledTasks.poll(timeout.toMillis(),
TimeUnit.MILLISECONDS);
+ if (task != null) {
+ task.execute();
+ } else {
+ throw new NoSuchElementException("No scheduled task available
within the timeout.");
+ }
+ }
+
/** Remove the first non-periodically scheduled task. */
public ScheduledTask<?> removeNonPeriodicScheduledTask() {
return nonPeriodicScheduledTasks.poll();