This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new dc6e74785 [test] Support manually trigger and wait for KV snapshots 
for FlussClusterExtension (#2429)
dc6e74785 is described below

commit dc6e747857514d733960678f6e13d9b4159cc6c7
Author: Jark Wu <[email protected]>
AuthorDate: Wed Jan 21 23:30:24 2026 +0800

    [test] Support manually trigger and wait for KV snapshots for 
FlussClusterExtension (#2429)
---
 .../client/admin/ClientToServerITCaseBase.java     |  2 -
 .../fluss/client/admin/FlussAdminITCase.java       |  6 +-
 .../security/acl/FlussAuthorizationITCase.java     |  2 -
 .../batch/KvSnapshotBatchScannerITCase.java        | 15 +---
 .../fluss/flink/metrics/FlinkMetricsITCase.java    |  3 -
 .../flink/procedure/FlinkProcedureITCase.java      |  2 -
 .../security/acl/FlinkAuthorizationITCase.java     |  2 -
 .../source/FlinkTableSourceFailOverITCase.java     |  3 -
 .../fluss/flink/source/FlinkTableSourceITCase.java | 79 ++++---------------
 .../enumerator/FlinkSourceEnumeratorTest.java      |  2 +-
 .../source/reader/FlinkSourceSplitReaderTest.java  |  4 +-
 .../fluss/flink/tiering/FlinkTieringTestBase.java  | 11 +--
 .../fluss/flink/tiering/TieringFailoverITCase.java |  2 +-
 .../tiering/source/TieringSplitReaderTest.java     |  6 +-
 .../flink/tiering/source/TieringTestBase.java      | 28 ++++---
 .../enumerator/TieringSourceEnumeratorTest.java    | 10 +--
 .../apache/fluss/flink/utils/FlinkTestBase.java    | 10 ---
 .../testutils/FlinkIcebergTieringTestBase.java     | 10 +--
 .../lake/iceberg/tiering/IcebergTieringITCase.java |  2 +-
 .../lance/testutils/FlinkLanceTieringTestBase.java | 12 +--
 .../testutils/FlinkPaimonTieringTestBase.java      | 12 +--
 .../lake/paimon/tiering/PaimonTieringITCase.java   |  6 +-
 .../tiering/ReCreateSameTableAfterTieringTest.java |  4 +-
 .../org/apache/fluss/server/SequenceIDCounter.java |  3 +
 .../server/kv/snapshot/KvTabletSnapshotTarget.java | 11 +++
 .../kv/snapshot/PeriodicSnapshotManager.java       | 12 ++-
 .../org/apache/fluss/server/replica/Replica.java   |  6 ++
 .../fluss/server/zk/ZkSequenceIDCounter.java       |  5 ++
 .../kv/autoinc/TestingSequenceIDCounter.java       |  5 ++
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    |  5 ++
 .../kv/snapshot/PeriodicSnapshotManagerTest.java   | 10 +++
 .../server/testutils/FlussClusterExtension.java    | 90 +++++++++++++++++++++-
 32 files changed, 208 insertions(+), 172 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 311d5a94f..5714ddf67 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -113,8 +113,6 @@ public abstract class ClientToServerITCaseBase {
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
-        // set a shorter interval for testing purpose
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
         // set default datalake format for the cluster and enable datalake 
tables
         conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index aaf06592e..64cd85e9f 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1001,8 +1001,8 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
             Map<Integer, CompletedSnapshot> expectedSnapshots = new 
HashMap<>();
             for (int bucket = 0; bucket < bucketNum; bucket++) {
                 CompletedSnapshot completedSnapshot =
-                        FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(
-                                new TableBucket(tableId, bucket), 0);
+                        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(
+                                new TableBucket(tableId, bucket));
                 expectedSnapshots.put(bucket, completedSnapshot);
             }
 
@@ -1018,7 +1018,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
             TableBucket tb = new TableBucket(snapshots.getTableId(), 0);
             // wait until the snapshot finish
             expectedSnapshots.put(
-                    tb.getBucket(), 
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tb, 1));
+                    tb.getBucket(), 
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb));
 
             // check snapshot
             snapshots = admin.getLatestKvSnapshots(tablePath1).get();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index ba1fb7fe2..cdb92ab95 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -1023,8 +1023,6 @@ public class FlussAuthorizationITCase {
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
-        // set a shorter interval for testing purpose
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
         // set a shorter max lag time to make tests in 
FlussFailServerTableITCase faster
         conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, 
Duration.ofSeconds(10));
         // set default datalake format for the cluster and enable datalake 
tables
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index ea1aaf237..2ab2f3726 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -45,7 +45,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
@@ -104,7 +103,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
         Map<TableBucket, List<InternalRow>> expectedRowByBuckets = 
putRows(tableId, tablePath, 10);
 
         // wait snapshot finish
-        waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
+        
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
 
         // test read snapshot
         testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -113,7 +112,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
         expectedRowByBuckets = putRows(tableId, tablePath, 20);
 
         // wait snapshot finish
-        waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
+        
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
 
         // test read snapshot
         testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -126,7 +125,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
 
         // put into values with old schema.
         Map<TableBucket, List<InternalRow>> oldSchemaRowByBuckets = 
putRows(tableId, tablePath, 10);
-        waitUntilAllSnapshotFinished(oldSchemaRowByBuckets.keySet(), 0);
+        
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(oldSchemaRowByBuckets.keySet());
 
         // add a new column and rename an existing column
         admin.alterTable(
@@ -175,7 +174,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
         }
 
         // wait snapshot finish
-        waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
+        
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
 
         // test read snapshot with new Schema
         testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -239,10 +238,4 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
         byte[] key = keyEncoder.encodeKey(row);
         return function.bucketing(key, DEFAULT_BUCKET_NUM);
     }
-
-    private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, 
long snapshotId) {
-        for (TableBucket tableBucket : tableBuckets) {
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
index 596ad7d1d..f13e4860e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
@@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
@@ -67,8 +66,6 @@ abstract class FlinkMetricsITCase {
             FlussClusterExtension.builder()
                     .setClusterConf(
                             new org.apache.fluss.config.Configuration()
-                                    // set snapshot interval to 1s for testing 
purposes
-                                    .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofSeconds(1))
                                     // not to clean snapshots for test purpose
                                     .set(
                                             
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index c1231c6e9..5138ca869 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -782,8 +782,6 @@ public abstract class FlinkProcedureITCase {
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
-        // set a shorter interval for testing purpose
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
         // set a shorter max lag time to make tests in 
FlussFailServerTableITCase faster
         conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, 
Duration.ofSeconds(10));
         // set default datalake format for the cluster and enable datalake 
tables
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 96e2ed199..2fd889989 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -456,8 +456,6 @@ abstract class FlinkAuthorizationITCase extends 
AbstractTestBase {
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
-        // set a shorter interval for testing purpose
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
         // set a shorter max lag time to make tests in 
FlussFailServerTableITCase faster
         conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, 
Duration.ofSeconds(10));
         // set default datalake format for the cluster and enable datalake 
tables
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
index 916b10654..80dfbcd4c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.io.TempDir;
 import javax.annotation.Nullable;
 
 import java.io.File;
-import java.time.Duration;
 import java.time.Year;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -76,8 +75,6 @@ abstract class FlinkTableSourceFailOverITCase {
             FlussClusterExtension.builder()
                     .setClusterConf(
                             new org.apache.fluss.config.Configuration()
-                                    // set snapshot interval to 1s for testing 
purposes
-                                    .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofSeconds(1))
                                     // not to clean snapshots for test purpose
                                     .set(
                                             
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 4d056a12d..5d903cb8d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.source;
 import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
-import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.ConfigOptions;
@@ -78,7 +77,6 @@ import static 
org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -91,8 +89,6 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
             FlussClusterExtension.builder()
                     .setClusterConf(
                             new Configuration()
-                                    // set snapshot interval to 1s for testing 
purposes
-                                    .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofSeconds(1))
                                     // not to clean snapshots for test purpose
                                     .set(
                                             
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
@@ -161,8 +157,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         // write records
         writeRows(conn, tablePath, rows, false);
-
-        waitUntilAllBucketFinishSnapshot(admin, tablePath);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
 
@@ -181,7 +176,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         // write records
         writeRows(conn, tablePath, rows, false);
 
-        waitUntilAllBucketFinishSnapshot(admin, tablePath);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
 
@@ -288,7 +283,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 // write records and wait snapshot before collect job start,
                 // to make sure reading from kv snapshot
                 writeRows(conn, tablePath, rows, false);
-                waitUntilAllBucketFinishSnapshot(admin, 
TablePath.of(DEFAULT_DB, tableName));
+                
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(TablePath.of(DEFAULT_DB, 
tableName));
             }
         } else {
             writeRows(conn, tablePath, rows, true);
@@ -336,7 +331,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         // write records
         writeRows(conn, tablePath, rows, false);
 
-        waitUntilAllBucketFinishSnapshot(admin, tablePath);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
 
@@ -461,7 +456,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         // write records and wait generate snapshot.
         writeRows(conn, tablePath, rows1, false);
-        waitUntilAllBucketFinishSnapshot(admin, tablePath);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         List<InternalRow> rows2 = Arrays.asList(row(1, "v11"), row(2, "v22"), 
row(4, "v4"));
 
@@ -531,12 +526,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         // write records and wait generate snapshot.
         writeRows(conn, tablePath, rows1, false);
-        if (partitionName == null) {
-            waitUntilAllBucketFinishSnapshot(admin, tablePath);
-        } else {
-            waitUntilAllBucketFinishSnapshot(
-                    admin, tablePath, Collections.singleton(partitionName));
-        }
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
         CLOCK.advanceTime(Duration.ofMillis(100));
 
         List<InternalRow> rows2 =
@@ -605,7 +596,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         List<String> expectedRowValues =
                 writeRowsToPartition(conn, tablePath, 
partitionNameById.values());
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
partitionNameById.values());
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         // This test requires dynamically discovering newly created 
partitions, so
         // 'scan.partition.discovery.interval' needs to be set to 2s (default 
is 1 minute),
@@ -988,7 +979,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 writeRowsToPartition(conn, tablePath, Arrays.asList("2025", 
"2026")).stream()
                         .filter(s -> s.contains("2025"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String plan = tEnv.explainSql("select * from partitioned_table where c 
='2025'");
         assertThat(plan)
@@ -1079,8 +1070,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         .stream()
                         .filter(s -> s.contains("2025"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(
-                admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String plan = tEnv.explainSql("select * from multi_partitioned_table 
where c ='2025'");
         assertThat(plan)
@@ -1107,7 +1097,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         .stream()
                         .filter(s -> s.contains("2025"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025$3", "2026$2"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
 
         String plan2 =
@@ -1155,7 +1145,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         }
 
         writeRows(conn, tablePath, rows, false);
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String plan =
                 tEnv.explainSql(
@@ -1203,7 +1193,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         List<String> expectedRowValues =
                 writeRowsToPartition(conn, tablePath, Arrays.asList("2025", 
"2026"));
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         org.apache.flink.util.CloseableIterator<Row> rowIter =
                 tEnv.executeSql("select * from 
partitioned_table_no_filter").collect();
@@ -1249,7 +1239,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         .stream()
                         .filter(s -> s.contains("2025") || s.contains("2026"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "2027"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String query1 = "select * from partitioned_table_in where c in 
('2025','2026')";
         String plan = tEnv.explainSql(query1);
@@ -1300,7 +1290,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         }
 
         writeRows(conn, tablePath, rows, false);
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "2027"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String query1 =
                 "select a,c,d from combined_filters_table_in where c in 
('2025','2026') and d % 200 = 0";
@@ -1339,7 +1329,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 allData.stream()
                         .filter(s -> s.contains("2025") || s.contains("2026"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "3026"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String query1 = "select * from partitioned_table_like where c like 
'202%'";
         String plan = tEnv.explainSql(query1);
@@ -1392,8 +1382,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 allData.stream()
                         .filter(s -> s.contains("v3") && !s.contains("2025, 
2"))
                         .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(
-                admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
         String query =
                 "select * from partitioned_table_complex where  a = 3\n"
@@ -1544,40 +1533,6 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         return tableName;
     }
 
-    private void waitUntilAllBucketFinishSnapshot(Admin admin, TablePath 
tablePath) {
-        waitUntil(
-                () -> {
-                    KvSnapshots snapshots = 
admin.getLatestKvSnapshots(tablePath).get();
-                    for (int bucketId : snapshots.getBucketIds()) {
-                        if (!snapshots.getSnapshotId(bucketId).isPresent()) {
-                            return false;
-                        }
-                    }
-                    return true;
-                },
-                Duration.ofMinutes(1),
-                "Fail to wait until all bucket finish snapshot");
-    }
-
-    private void waitUntilAllBucketFinishSnapshot(
-            Admin admin, TablePath tablePath, Collection<String> partitions) {
-        waitUntil(
-                () -> {
-                    for (String partition : partitions) {
-                        KvSnapshots snapshots =
-                                admin.getLatestKvSnapshots(tablePath, 
partition).get();
-                        for (int bucketId : snapshots.getBucketIds()) {
-                            if 
(!snapshots.getSnapshotId(bucketId).isPresent()) {
-                                return false;
-                            }
-                        }
-                    }
-                    return true;
-                },
-                Duration.ofMinutes(1),
-                "Fail to wait until all bucket finish snapshot");
-    }
-
     private GenericRow rowWithPartition(Object[] values, @Nullable String 
partition) {
         if (partition == null) {
             return row(values);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index b18adfc45..238accf0b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -146,7 +146,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
         // write data and wait snapshot finish to make sure
         // we can hava snapshot split
         Map<Integer, Integer> bucketIdToNumRecords = 
putRows(DEFAULT_TABLE_PATH, 10);
-        waitUntilSnapshot(tableId, 0);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(DEFAULT_TABLE_PATH);
 
         try (MockSplitEnumeratorContext<SourceSplitBase> context =
                 new MockSplitEnumeratorContext<>(numSubtasks)) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index ffdd95652..59f1116ed 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -144,7 +144,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
             Map<TableBucket, List<InternalRow>> rows = putRows(tableId, 
tablePath, 10);
 
             // check the expected records
-            waitUntilSnapshot(tableId, 0);
+            FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
             hybridSnapshotLogSplits = getHybridSnapshotLogSplits(tablePath);
 
@@ -252,7 +252,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
             Map<TableBucket, List<InternalRow>> rows = putRows(tableId, 
tablePath, 10);
 
             // check the expected records
-            waitUntilSnapshot(tableId, 0);
+            FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
             List<SourceSplitBase> totalSplits =
                     new ArrayList<>(getHybridSnapshotLogSplits(tablePath));
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
index c87078d7f..130a30812 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
@@ -67,8 +67,7 @@ class FlinkTieringTestBase {
 
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
 
         // Configure the tiering sink to be Lance
         conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.LANCE);
@@ -150,14 +149,6 @@ class FlinkTieringTestBase {
         }
     }
 
-    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
-        for (int i = 0; i < bucketNum; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
-
-    @SuppressWarnings("resource")
     public List<InternalRow> getValuesRecords(TablePath tablePath) {
         return TestingValuesLake.getResults(tablePath.toString());
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
index a8ee17a6d..847031225 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
@@ -84,7 +84,7 @@ class TieringFailoverITCase extends FlinkTieringTestBase {
         List<InternalRow> rows = Arrays.asList(row(1, "i1"), row(2, "i2"), 
row(3, "i3"));
         List<InternalRow> expectedRows = new ArrayList<>(rows);
         writeRows(t1, rows);
-        waitUntilSnapshot(t1Id, 1, 0);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(t1);
 
         // fail the first write to the pk table
         TestingValuesLake.failWhen(t1.toString()).failWriteOnce();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
index 4ae4cf55c..fdf7a8477 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
@@ -103,7 +103,7 @@ class TieringSplitReaderTest extends FlinkTestBase {
             Map<TableBucket, List<InternalRow>> firstRows = putRows(tableId, 
tablePath, 10);
 
             // check the expected records
-            waitUntilSnapshot(tableId, 0);
+            FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
 
             splitsAddition =
                     new SplitsAddition<>(
@@ -168,8 +168,8 @@ class TieringSplitReaderTest extends FlinkTestBase {
                         createTieringReader(connection)) {
             Map<TableBucket, List<InternalRow>> table0Rows = putRows(tableId0, 
tablePath0, 10);
             Map<TableBucket, List<InternalRow>> table1Rows = putRows(tableId1, 
tablePath1, 10);
-            waitUntilSnapshot(tableId0, 0);
-            waitUntilSnapshot(tableId1, 0);
+            FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath0);
+            FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath1);
 
             // first add snapshot split of bucket 0, bucket 1 of table id 0
             SplitsAddition<TieringSplit> splitsAddition =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
index 756ed240e..4475ba983 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
@@ -52,7 +52,9 @@ import javax.annotation.Nullable;
 
 import java.nio.file.Files;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -179,8 +181,6 @@ public class TieringTestBase extends AbstractTestBase {
 
     private static Configuration flussClusterConfig() {
         Configuration conf = new Configuration();
-        // set snapshot interval to 1s for testing purposes
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
         // not to clean snapshots for test purpose
         conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
 
@@ -219,25 +219,23 @@ public class TieringTestBase extends AbstractTestBase {
         return admin.getTableInfo(tablePath).get().getTableId();
     }
 
-    protected void waitUntilSnapshot(long tableId, long snapshotId) {
-        waitUntilSnapshot(tableId, null, snapshotId);
+    protected void triggerAndWaitSnapshot(long tableId) {
+        List<TableBucket> allBuckets = new ArrayList<>();
+        for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+            allBuckets.add(new TableBucket(tableId, null, i));
+        }
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(allBuckets);
     }
 
-    protected void waitUntilPartitionTableSnapshot(
-            long tableId, Map<String, Long> partitionNameByIds, long 
snapshotId) {
+    protected void triggerAndWaitUntilPartitionTableSnapshot(
+            long tableId, Map<String, Long> partitionNameByIds) {
+        List<TableBucket> allBuckets = new ArrayList<>();
         for (Map.Entry<String, Long> entry : partitionNameByIds.entrySet()) {
             for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
-                TableBucket tableBucket = new TableBucket(tableId, 
entry.getValue(), i);
-                FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+                allBuckets.add(new TableBucket(tableId, entry.getValue(), i));
             }
         }
-    }
-
-    protected void waitUntilSnapshot(long tableId, @Nullable Long partitionId, 
long snapshotId) {
-        for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(allBuckets);
     }
 
     protected static Map<Long, Map<Integer, Long>> 
upsertRowForPartitionedTable(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 84e5bd5db..15f62f3d1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -111,8 +111,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
 
             Map<Integer, Long> bucketOffsetOfSecondWrite =
                     upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10);
-            long snapshotId = 0;
-            waitUntilSnapshot(tableId, snapshotId);
+            triggerAndWaitSnapshot(tableId);
 
             // request tiering table splits
             for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
@@ -150,7 +149,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         final Map<Integer, Long> bucketOffsetOfInitialWrite =
                 upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10);
         long snapshotId = 0;
-        waitUntilSnapshot(tableId, snapshotId);
+        triggerAndWaitSnapshot(tableId);
 
         int expectNumberOfSplits = 3;
 
@@ -203,8 +202,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
 
             Map<Integer, Long> bucketOffsetOfSecondWrite =
                     upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 10, 20);
-            snapshotId = 1;
-            waitUntilSnapshot(tableId, snapshotId);
+            triggerAndWaitSnapshot(tableId);
 
             // request tiering table splits
             for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
@@ -392,7 +390,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
                     upsertRowForPartitionedTable(
                             tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 
partitionNameByIds, 10, 20);
-            waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, 0);
+            triggerAndWaitUntilPartitionTableSnapshot(tableId, 
partitionNameByIds);
 
             // request tiering table splits
             for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 32088399e..bfa814ce2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -30,7 +30,6 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -70,8 +69,6 @@ public class FlinkTestBase extends AbstractTestBase {
             FlussClusterExtension.builder()
                     .setClusterConf(
                             new Configuration()
-                                    // set snapshot interval to 1s for testing 
purposes
-                                    .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofSeconds(1))
                                     // not to clean snapshots for test purpose
                                     .set(
                                             
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
@@ -181,13 +178,6 @@ public class FlinkTestBase extends AbstractTestBase {
         return admin.getTableInfo(tablePath).get().getTableId();
     }
 
-    protected void waitUntilSnapshot(long tableId, long snapshotId) {
-        for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
-
     /**
      * Wait until the default number of partitions is created. Return the map 
from partition id to
      * partition name.
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 99fd340cf..5bd021b9a 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -108,8 +108,7 @@ public class FlinkIcebergTieringTestBase {
 
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
 
         // Configure the tiering sink to be Iceberg
         conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
@@ -319,13 +318,6 @@ public class FlinkIcebergTieringTestBase {
         }
     }
 
-    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
-        for (int i = 0; i < bucketNum; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
-
     protected List<Record> getIcebergRecords(TablePath tablePath) throws 
IOException {
         List<Record> icebergRecords = new ArrayList<>();
         try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 11de6baed..0e1c80eea 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -172,7 +172,7 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
                                 BinaryString.fromString("abc"),
                                 new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
         writeRows(t1, rows, false);
-        waitUntilSnapshot(t1Id, 1, 0);
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(t1);
 
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 0b94cedb5..8a429764d 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -78,9 +78,8 @@ public class FlinkLanceTieringTestBase {
 
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-                // not to clean snapshots for test purpose
-                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        // not to clean snapshots for test purpose
+        conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
         conf.setString("datalake.format", "lance");
         try {
             warehousePath =
@@ -210,11 +209,4 @@ public class FlinkLanceTieringTestBase {
                         DataLakeFormat.LANCE.toString())
                 .build();
     }
-
-    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
-        for (int i = 0; i < bucketNum; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index e98596e9e..90ab136fd 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -93,9 +93,8 @@ public abstract class FlinkPaimonTieringTestBase {
 
     protected static Configuration initConfig() {
         Configuration conf = new Configuration();
-        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-                // not to clean snapshots for test purpose
-                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        // not to clean snapshots for test purpose
+        conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
         conf.setString("datalake.format", "paimon");
         conf.setString("datalake.paimon.metastore", "filesystem");
         try {
@@ -163,11 +162,12 @@ public abstract class FlinkPaimonTieringTestBase {
         return admin.getTableInfo(tablePath).get().getTableId();
     }
 
-    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
+    protected void triggerAndWaitSnapshot(long tableId, int bucketNum) {
+        List<TableBucket> allBuckets = new ArrayList<>();
         for (int i = 0; i < bucketNum; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            getFlussClusterExtension().waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+            allBuckets.add(new TableBucket(tableId, i));
         }
+        getFlussClusterExtension().triggerAndWaitSnapshots(allBuckets);
     }
 
     protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 19cbeb558..bf92cd5f5 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -104,7 +104,7 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         // write records
         List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
-        waitUntilSnapshot(t1Id, 1, 0);
+        triggerAndWaitSnapshot(t1Id, 1);
 
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
@@ -339,7 +339,7 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         // write records
         List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
-        waitUntilSnapshot(t1Id, 1, 0);
+        triggerAndWaitSnapshot(t1Id, 1);
 
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
@@ -445,7 +445,7 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         // write records
         List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
-        waitUntilSnapshot(t1Id, 1, 0);
+        triggerAndWaitSnapshot(t1Id, 1);
 
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index 6520770c3..76248aab9 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -66,7 +66,7 @@ class ReCreateSameTableAfterTieringTest extends 
FlinkPaimonTieringTestBase {
         // write records
         List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
-        waitUntilSnapshot(t1Id, 1, 0);
+        triggerAndWaitSnapshot(t1Id, 1);
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
 
@@ -84,7 +84,7 @@ class ReCreateSameTableAfterTieringTest extends 
FlinkPaimonTieringTestBase {
         List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"));
         writeRows(t1, newRows, false);
         // new table, so the snapshot id should be 0
-        waitUntilSnapshot(t2Id, 1, 0);
+        triggerAndWaitSnapshot(t2Id, 1);
         // check the status of replica after synced
         assertReplicaStatus(t2Bucket, 2);
         // check data in paimon
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java 
b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
index a5d3ff025..fa058f6bc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
@@ -20,6 +20,9 @@ package org.apache.fluss.server;
 /** A counter for generating unique sequence IDs. */
 public interface SequenceIDCounter {
 
+    /** Gets the current sequence ID without incrementing it. */
+    long getCurrent() throws Exception;
+
     /**
      * Atomically increments the sequence ID.
      *
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
index c84a26b70..e61a4999a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.snapshot;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
@@ -155,6 +156,16 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
         this.snapshotsCleaner = new SnapshotsCleaner();
     }
 
+    @Override
+    public long currentSnapshotId() {
+        try {
+            return snapshotIdCounter.getCurrent();
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    "Failed to get current snapshot ID for TableBucket " + 
tableBucket + ".", e);
+        }
+    }
+
     @Override
     public Optional<PeriodicSnapshotManager.SnapshotRunnable> initSnapshot() 
throws Exception {
         long logOffset = logOffsetSupplier.get();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index 0c4314469..caa1a531c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -168,6 +168,11 @@ public class PeriodicSnapshotManager implements Closeable {
         }
     }
 
+    @VisibleForTesting
+    public long currentSnapshotId() {
+        return target.currentSnapshotId();
+    }
+
     public void triggerSnapshot() {
         // todo: consider shrink the scope
         // of using guardedExecutor
@@ -229,8 +234,9 @@ public class PeriodicSnapshotManager implements Closeable {
                                             snapshotLocation,
                                             snapshotResult);
                                     LOG.info(
-                                            "TableBucket {} snapshot finished 
successfully, cost {} ms.",
+                                            "TableBucket {} snapshot {} 
finished successfully, cost {} ms.",
                                             tableBucket,
+                                            snapshotId,
                                             System.currentTimeMillis() - 
triggerTime);
                                 } catch (Throwable t) {
                                     LOG.warn(
@@ -312,6 +318,10 @@ public class PeriodicSnapshotManager implements Closeable {
     /** {@link SnapshotRunnable} provider and consumer. */
     @NotThreadSafe
     public interface SnapshotTarget {
+
+        /** Gets current snapshot id. */
+        long currentSnapshotId();
+
         /**
          * Initialize kv snapshot.
          *
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e0f3d3933..7a2003786 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1932,4 +1932,10 @@ public final class Replica {
     public SchemaGetter getSchemaGetter() {
         return schemaGetter;
     }
+
+    @VisibleForTesting
+    @Nullable
+    public PeriodicSnapshotManager getKvSnapshotManager() {
+        return kvSnapshotManager;
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
index 7781340bc..ceec056da 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
@@ -47,6 +47,11 @@ public class ZkSequenceIDCounter implements 
SequenceIDCounter {
                                 BASE_SLEEP_MS, MAX_SLEEP_MS, RETRY_TIMES));
     }
 
+    @Override
+    public long getCurrent() throws Exception {
+        return sequenceIdCounter.get().postValue();
+    }
+
     /**
      * Atomically increments the current sequence ID.
      *
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
index 72f10f5d4..7ec78165f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
@@ -39,6 +39,11 @@ public class TestingSequenceIDCounter implements 
SequenceIDCounter {
         this.failedTrigger = failedTrigger;
     }
 
+    @Override
+    public long getCurrent() {
+        return idGenerator.get();
+    }
+
     @Override
     public long getAndIncrement() {
         return idGenerator.getAndIncrement();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 9f26bc772..4077e0e65 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -655,6 +655,11 @@ class KvTabletSnapshotTargetTest {
 
     private class TestingSnapshotIDCounter implements SequenceIDCounter {
 
+        @Override
+        public long getCurrent() {
+            return snapshotIdGenerator.get();
+        }
+
         @Override
         public long getAndIncrement() {
             return snapshotIdGenerator.getAndIncrement();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
index 774232ed9..31e53490d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
@@ -157,6 +157,11 @@ class PeriodicSnapshotManagerTest {
     private static class NopSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotTarget {
         private static final NopSnapshotTarget INSTANCE = new 
NopSnapshotTarget();
 
+        @Override
+        public long currentSnapshotId() {
+            return 0;
+        }
+
         @Override
         public Optional<PeriodicSnapshotManager.SnapshotRunnable> 
initSnapshot() {
             return Optional.empty();
@@ -205,6 +210,11 @@ class PeriodicSnapshotManagerTest {
             }
         }
 
+        @Override
+        public long currentSnapshotId() {
+            return 0;
+        }
+
         @Override
         public Optional<PeriodicSnapshotManager.SnapshotRunnable> 
initSnapshot() {
             RunnableFuture<SnapshotResult> runnableFuture =
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index b63138ea8..43b12943d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -51,6 +51,7 @@ import 
org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
+import org.apache.fluss.server.kv.snapshot.PeriodicSnapshotManager;
 import org.apache.fluss.server.metadata.ServerInfo;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.replica.Replica;
@@ -65,6 +66,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.server.zk.data.PartitionAssignment;
 import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
 import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.server.zk.data.TableRegistration;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.clock.SystemClock;
@@ -103,6 +105,7 @@ import static 
org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
 import static org.apache.fluss.utils.function.FunctionUtils.uncheckedFunction;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * A Junit {@link Extension} which starts a Fluss Cluster.
@@ -681,7 +684,90 @@ public final class FlussClusterExtension
                 });
     }
 
-    public CompletedSnapshot waitUntilSnapshotFinished(TableBucket 
tableBucket, long snapshotId) {
+    public void triggerAndWaitSnapshot(TablePath tablePath) throws Exception {
+        Optional<TableRegistration> table = 
zooKeeperClient.getTable(tablePath);
+        //noinspection SimplifyOptionalCallChains (Java 8 compatibility)
+        if (!table.isPresent()) {
+            throw new IllegalStateException("Table not found for table path " 
+ tablePath);
+        }
+
+        TableRegistration tableRegistration = table.get();
+        long tableId = tableRegistration.tableId;
+        int bucketCount = tableRegistration.bucketCount;
+
+        List<TableBucket> tableBuckets = new ArrayList<>();
+        if (!tableRegistration.isPartitioned()) {
+            for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+                tableBuckets.add(new TableBucket(tableId, null, bucketId));
+            }
+        } else {
+            Map<String, Long> partitions = 
zooKeeperClient.getPartitionNameAndIds(tablePath);
+            for (Long partitionId : partitions.values()) {
+                for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+                    tableBuckets.add(new TableBucket(tableId, partitionId, 
bucketId));
+                }
+            }
+        }
+
+        // trigger and wait all snapshots finished
+        triggerAndWaitSnapshots(tableBuckets);
+    }
+
+    public void triggerAndWaitSnapshots(Collection<TableBucket> tableBuckets) {
+        Map<TableBucket, Long> snapshotMap = new HashMap<>();
+        for (TableBucket tableBucket : tableBuckets) {
+            Long snapshotId = triggerSnapshot(tableBucket);
+            if (snapshotId != null) {
+                snapshotMap.put(tableBucket, snapshotId);
+            }
+        }
+        // wait all snapshots finished
+        for (Map.Entry<TableBucket, Long> entry : snapshotMap.entrySet()) {
+            waitUntilSnapshotFinished(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket) {
+        Long snapshotId = triggerSnapshot(tableBucket);
+        if (snapshotId != null) {
+            return waitUntilSnapshotFinished(tableBucket, snapshotId);
+        } else {
+            fail("No new snapshot triggered for table bucket " + tableBucket);
+            return null;
+        }
+    }
+
+    private Long triggerSnapshot(TableBucket tableBucket) {
+        Long snapshotId = null;
+        Long nextSnapshotId = null;
+        for (TabletServer ts : tabletServers.values()) {
+            ReplicaManager.HostedReplica replica = 
ts.getReplicaManager().getReplica(tableBucket);
+            if (replica instanceof ReplicaManager.OnlineReplica) {
+                Replica r = ((ReplicaManager.OnlineReplica) 
replica).getReplica();
+                PeriodicSnapshotManager kvSnapshotManager = 
r.getKvSnapshotManager();
+                if (r.isLeader() && kvSnapshotManager != null) {
+                    snapshotId = kvSnapshotManager.currentSnapshotId();
+                    kvSnapshotManager.triggerSnapshot();
+                    nextSnapshotId = kvSnapshotManager.currentSnapshotId();
+                    break;
+                }
+            }
+        }
+
+        if (snapshotId != null) {
+            if (nextSnapshotId > snapshotId) {
+                // only there is a new snapshot triggered, we return the 
snapshot id
+                return snapshotId;
+            } else {
+                return null;
+            }
+        } else {
+            fail("No KV snapshot manager found for table bucket " + 
tableBucket);
+            return null;
+        }
+    }
+
+    private CompletedSnapshot waitUntilSnapshotFinished(TableBucket 
tableBucket, long snapshotId) {
         ZooKeeperClient zkClient = getZooKeeperClient();
         return waitValue(
                 () -> {
@@ -693,7 +779,7 @@ public final class FlussClusterExtension
                                     uncheckedFunction(
                                             
CompletedSnapshotHandle::retrieveCompleteSnapshot));
                 },
-                Duration.ofMinutes(2),
+                Duration.ofSeconds(30),
                 String.format(
                         "Fail to wait bucket %s snapshot %d finished", 
tableBucket, snapshotId));
     }

Reply via email to