This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new dc6e74785 [test] Support manually trigger and wait for KV snapshots
for FlussClusterExtension (#2429)
dc6e74785 is described below
commit dc6e747857514d733960678f6e13d9b4159cc6c7
Author: Jark Wu <[email protected]>
AuthorDate: Wed Jan 21 23:30:24 2026 +0800
[test] Support manually trigger and wait for KV snapshots for
FlussClusterExtension (#2429)
---
.../client/admin/ClientToServerITCaseBase.java | 2 -
.../fluss/client/admin/FlussAdminITCase.java | 6 +-
.../security/acl/FlussAuthorizationITCase.java | 2 -
.../batch/KvSnapshotBatchScannerITCase.java | 15 +---
.../fluss/flink/metrics/FlinkMetricsITCase.java | 3 -
.../flink/procedure/FlinkProcedureITCase.java | 2 -
.../security/acl/FlinkAuthorizationITCase.java | 2 -
.../source/FlinkTableSourceFailOverITCase.java | 3 -
.../fluss/flink/source/FlinkTableSourceITCase.java | 79 ++++---------------
.../enumerator/FlinkSourceEnumeratorTest.java | 2 +-
.../source/reader/FlinkSourceSplitReaderTest.java | 4 +-
.../fluss/flink/tiering/FlinkTieringTestBase.java | 11 +--
.../fluss/flink/tiering/TieringFailoverITCase.java | 2 +-
.../tiering/source/TieringSplitReaderTest.java | 6 +-
.../flink/tiering/source/TieringTestBase.java | 28 ++++---
.../enumerator/TieringSourceEnumeratorTest.java | 10 +--
.../apache/fluss/flink/utils/FlinkTestBase.java | 10 ---
.../testutils/FlinkIcebergTieringTestBase.java | 10 +--
.../lake/iceberg/tiering/IcebergTieringITCase.java | 2 +-
.../lance/testutils/FlinkLanceTieringTestBase.java | 12 +--
.../testutils/FlinkPaimonTieringTestBase.java | 12 +--
.../lake/paimon/tiering/PaimonTieringITCase.java | 6 +-
.../tiering/ReCreateSameTableAfterTieringTest.java | 4 +-
.../org/apache/fluss/server/SequenceIDCounter.java | 3 +
.../server/kv/snapshot/KvTabletSnapshotTarget.java | 11 +++
.../kv/snapshot/PeriodicSnapshotManager.java | 12 ++-
.../org/apache/fluss/server/replica/Replica.java | 6 ++
.../fluss/server/zk/ZkSequenceIDCounter.java | 5 ++
.../kv/autoinc/TestingSequenceIDCounter.java | 5 ++
.../kv/snapshot/KvTabletSnapshotTargetTest.java | 5 ++
.../kv/snapshot/PeriodicSnapshotManagerTest.java | 10 +++
.../server/testutils/FlussClusterExtension.java | 90 +++++++++++++++++++++-
32 files changed, 208 insertions(+), 172 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 311d5a94f..5714ddf67 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -113,8 +113,6 @@ public abstract class ClientToServerITCaseBase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
- // set a shorter interval for testing purpose
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// set default datalake format for the cluster and enable datalake
tables
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index aaf06592e..64cd85e9f 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1001,8 +1001,8 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
Map<Integer, CompletedSnapshot> expectedSnapshots = new
HashMap<>();
for (int bucket = 0; bucket < bucketNum; bucket++) {
CompletedSnapshot completedSnapshot =
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(
- new TableBucket(tableId, bucket), 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(
+ new TableBucket(tableId, bucket));
expectedSnapshots.put(bucket, completedSnapshot);
}
@@ -1018,7 +1018,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
TableBucket tb = new TableBucket(snapshots.getTableId(), 0);
// wait until the snapshot finish
expectedSnapshots.put(
- tb.getBucket(),
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tb, 1));
+ tb.getBucket(),
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb));
// check snapshot
snapshots = admin.getLatestKvSnapshots(tablePath1).get();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index ba1fb7fe2..cdb92ab95 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -1023,8 +1023,6 @@ public class FlussAuthorizationITCase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
- // set a shorter interval for testing purpose
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// set a shorter max lag time to make tests in
FlussFailServerTableITCase faster
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME,
Duration.ofSeconds(10));
// set default datalake format for the cluster and enable datalake
tables
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index ea1aaf237..2ab2f3726 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -45,7 +45,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
@@ -104,7 +103,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
Map<TableBucket, List<InternalRow>> expectedRowByBuckets =
putRows(tableId, tablePath, 10);
// wait snapshot finish
- waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
// test read snapshot
testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -113,7 +112,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
expectedRowByBuckets = putRows(tableId, tablePath, 20);
// wait snapshot finish
- waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
// test read snapshot
testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -126,7 +125,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
// put into values with old schema.
Map<TableBucket, List<InternalRow>> oldSchemaRowByBuckets =
putRows(tableId, tablePath, 10);
- waitUntilAllSnapshotFinished(oldSchemaRowByBuckets.keySet(), 0);
+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(oldSchemaRowByBuckets.keySet());
// add a new column and rename an existing column
admin.alterTable(
@@ -175,7 +174,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
}
// wait snapshot finish
- waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
// test read snapshot with new Schema
testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -239,10 +238,4 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
byte[] key = keyEncoder.encodeKey(row);
return function.bucketing(key, DEFAULT_BUCKET_NUM);
}
-
- private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets,
long snapshotId) {
- for (TableBucket tableBucket : tableBuckets) {
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
index 596ad7d1d..f13e4860e 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
@@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -67,8 +66,6 @@ abstract class FlinkMetricsITCase {
FlussClusterExtension.builder()
.setClusterConf(
new org.apache.fluss.config.Configuration()
- // set snapshot interval to 1s for testing
purposes
- .set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofSeconds(1))
// not to clean snapshots for test purpose
.set(
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index c1231c6e9..5138ca869 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -782,8 +782,6 @@ public abstract class FlinkProcedureITCase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
- // set a shorter interval for testing purpose
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// set a shorter max lag time to make tests in
FlussFailServerTableITCase faster
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME,
Duration.ofSeconds(10));
// set default datalake format for the cluster and enable datalake
tables
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 96e2ed199..2fd889989 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -456,8 +456,6 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
- // set a shorter interval for testing purpose
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// set a shorter max lag time to make tests in
FlussFailServerTableITCase faster
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME,
Duration.ofSeconds(10));
// set default datalake format for the cluster and enable datalake
tables
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
index 916b10654..80dfbcd4c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
import java.io.File;
-import java.time.Duration;
import java.time.Year;
import java.util.ArrayList;
import java.util.Collections;
@@ -76,8 +75,6 @@ abstract class FlinkTableSourceFailOverITCase {
FlussClusterExtension.builder()
.setClusterConf(
new org.apache.fluss.config.Configuration()
- // set snapshot interval to 1s for testing
purposes
- .set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofSeconds(1))
// not to clean snapshots for test purpose
.set(
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 4d056a12d..5d903cb8d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.source;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
-import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.ConfigOptions;
@@ -78,7 +77,6 @@ import static
org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -91,8 +89,6 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
FlussClusterExtension.builder()
.setClusterConf(
new Configuration()
- // set snapshot interval to 1s for testing
purposes
- .set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofSeconds(1))
// not to clean snapshots for test purpose
.set(
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
@@ -161,8 +157,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
-
- waitUntilAllBucketFinishSnapshot(admin, tablePath);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -181,7 +176,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
- waitUntilAllBucketFinishSnapshot(admin, tablePath);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -288,7 +283,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait snapshot before collect job start,
// to make sure reading from kv snapshot
writeRows(conn, tablePath, rows, false);
- waitUntilAllBucketFinishSnapshot(admin,
TablePath.of(DEFAULT_DB, tableName));
+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(TablePath.of(DEFAULT_DB,
tableName));
}
} else {
writeRows(conn, tablePath, rows, true);
@@ -336,7 +331,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
- waitUntilAllBucketFinishSnapshot(admin, tablePath);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -461,7 +456,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait generate snapshot.
writeRows(conn, tablePath, rows1, false);
- waitUntilAllBucketFinishSnapshot(admin, tablePath);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
List<InternalRow> rows2 = Arrays.asList(row(1, "v11"), row(2, "v22"),
row(4, "v4"));
@@ -531,12 +526,8 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait generate snapshot.
writeRows(conn, tablePath, rows1, false);
- if (partitionName == null) {
- waitUntilAllBucketFinishSnapshot(admin, tablePath);
- } else {
- waitUntilAllBucketFinishSnapshot(
- admin, tablePath, Collections.singleton(partitionName));
- }
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
CLOCK.advanceTime(Duration.ofMillis(100));
List<InternalRow> rows2 =
@@ -605,7 +596,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
List<String> expectedRowValues =
writeRowsToPartition(conn, tablePath,
partitionNameById.values());
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
partitionNameById.values());
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
// This test requires dynamically discovering newly created
partitions, so
// 'scan.partition.discovery.interval' needs to be set to 2s (default
is 1 minute),
@@ -988,7 +979,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
writeRowsToPartition(conn, tablePath, Arrays.asList("2025",
"2026")).stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String plan = tEnv.explainSql("select * from partitioned_table where c
='2025'");
assertThat(plan)
@@ -1079,8 +1070,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(
- admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String plan = tEnv.explainSql("select * from multi_partitioned_table
where c ='2025'");
assertThat(plan)
@@ -1107,7 +1097,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025$3", "2026$2"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
String plan2 =
@@ -1155,7 +1145,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
}
writeRows(conn, tablePath, rows, false);
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String plan =
tEnv.explainSql(
@@ -1203,7 +1193,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
List<String> expectedRowValues =
writeRowsToPartition(conn, tablePath, Arrays.asList("2025",
"2026"));
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from
partitioned_table_no_filter").collect();
@@ -1249,7 +1239,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.stream()
.filter(s -> s.contains("2025") || s.contains("2026"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026", "2027"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String query1 = "select * from partitioned_table_in where c in
('2025','2026')";
String plan = tEnv.explainSql(query1);
@@ -1300,7 +1290,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
}
writeRows(conn, tablePath, rows, false);
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026", "2027"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String query1 =
"select a,c,d from combined_filters_table_in where c in
('2025','2026') and d % 200 = 0";
@@ -1339,7 +1329,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
allData.stream()
.filter(s -> s.contains("2025") || s.contains("2026"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026", "3026"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String query1 = "select * from partitioned_table_like where c like
'202%'";
String plan = tEnv.explainSql(query1);
@@ -1392,8 +1382,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
allData.stream()
.filter(s -> s.contains("v3") && !s.contains("2025,
2"))
.collect(Collectors.toList());
- waitUntilAllBucketFinishSnapshot(
- admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
String query =
"select * from partitioned_table_complex where a = 3\n"
@@ -1544,40 +1533,6 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
return tableName;
}
- private void waitUntilAllBucketFinishSnapshot(Admin admin, TablePath
tablePath) {
- waitUntil(
- () -> {
- KvSnapshots snapshots =
admin.getLatestKvSnapshots(tablePath).get();
- for (int bucketId : snapshots.getBucketIds()) {
- if (!snapshots.getSnapshotId(bucketId).isPresent()) {
- return false;
- }
- }
- return true;
- },
- Duration.ofMinutes(1),
- "Fail to wait until all bucket finish snapshot");
- }
-
- private void waitUntilAllBucketFinishSnapshot(
- Admin admin, TablePath tablePath, Collection<String> partitions) {
- waitUntil(
- () -> {
- for (String partition : partitions) {
- KvSnapshots snapshots =
- admin.getLatestKvSnapshots(tablePath,
partition).get();
- for (int bucketId : snapshots.getBucketIds()) {
- if
(!snapshots.getSnapshotId(bucketId).isPresent()) {
- return false;
- }
- }
- }
- return true;
- },
- Duration.ofMinutes(1),
- "Fail to wait until all bucket finish snapshot");
- }
-
private GenericRow rowWithPartition(Object[] values, @Nullable String
partition) {
if (partition == null) {
return row(values);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index b18adfc45..238accf0b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -146,7 +146,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
// write data and wait snapshot finish to make sure
// we can hava snapshot split
Map<Integer, Integer> bucketIdToNumRecords =
putRows(DEFAULT_TABLE_PATH, 10);
- waitUntilSnapshot(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(DEFAULT_TABLE_PATH);
try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(numSubtasks)) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index ffdd95652..59f1116ed 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -144,7 +144,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
Map<TableBucket, List<InternalRow>> rows = putRows(tableId,
tablePath, 10);
// check the expected records
- waitUntilSnapshot(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
hybridSnapshotLogSplits = getHybridSnapshotLogSplits(tablePath);
@@ -252,7 +252,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
Map<TableBucket, List<InternalRow>> rows = putRows(tableId,
tablePath, 10);
// check the expected records
- waitUntilSnapshot(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
List<SourceSplitBase> totalSplits =
new ArrayList<>(getHybridSnapshotLogSplits(tablePath));
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
index c87078d7f..130a30812 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
@@ -67,8 +67,7 @@ class FlinkTieringTestBase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+ conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
// Configure the tiering sink to be Lance
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.LANCE);
@@ -150,14 +149,6 @@ class FlinkTieringTestBase {
}
}
- protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
- for (int i = 0; i < bucketNum; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
-
- @SuppressWarnings("resource")
public List<InternalRow> getValuesRecords(TablePath tablePath) {
return TestingValuesLake.getResults(tablePath.toString());
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
index a8ee17a6d..847031225 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
@@ -84,7 +84,7 @@ class TieringFailoverITCase extends FlinkTieringTestBase {
List<InternalRow> rows = Arrays.asList(row(1, "i1"), row(2, "i2"),
row(3, "i3"));
List<InternalRow> expectedRows = new ArrayList<>(rows);
writeRows(t1, rows);
- waitUntilSnapshot(t1Id, 1, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(t1);
// fail the first write to the pk table
TestingValuesLake.failWhen(t1.toString()).failWriteOnce();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
index 4ae4cf55c..fdf7a8477 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
@@ -103,7 +103,7 @@ class TieringSplitReaderTest extends FlinkTestBase {
Map<TableBucket, List<InternalRow>> firstRows = putRows(tableId,
tablePath, 10);
// check the expected records
- waitUntilSnapshot(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
splitsAddition =
new SplitsAddition<>(
@@ -168,8 +168,8 @@ class TieringSplitReaderTest extends FlinkTestBase {
createTieringReader(connection)) {
Map<TableBucket, List<InternalRow>> table0Rows = putRows(tableId0,
tablePath0, 10);
Map<TableBucket, List<InternalRow>> table1Rows = putRows(tableId1,
tablePath1, 10);
- waitUntilSnapshot(tableId0, 0);
- waitUntilSnapshot(tableId1, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath1);
// first add snapshot split of bucket 0, bucket 1 of table id 0
SplitsAddition<TieringSplit> splitsAddition =
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
index 756ed240e..4475ba983 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java
@@ -52,7 +52,9 @@ import javax.annotation.Nullable;
import java.nio.file.Files;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -179,8 +181,6 @@ public class TieringTestBase extends AbstractTestBase {
private static Configuration flussClusterConfig() {
Configuration conf = new Configuration();
- // set snapshot interval to 1s for testing purposes
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// not to clean snapshots for test purpose
conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
@@ -219,25 +219,23 @@ public class TieringTestBase extends AbstractTestBase {
return admin.getTableInfo(tablePath).get().getTableId();
}
- protected void waitUntilSnapshot(long tableId, long snapshotId) {
- waitUntilSnapshot(tableId, null, snapshotId);
+ protected void triggerAndWaitSnapshot(long tableId) {
+ List<TableBucket> allBuckets = new ArrayList<>();
+ for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+ allBuckets.add(new TableBucket(tableId, null, i));
+ }
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(allBuckets);
}
- protected void waitUntilPartitionTableSnapshot(
- long tableId, Map<String, Long> partitionNameByIds, long
snapshotId) {
+ protected void triggerAndWaitUntilPartitionTableSnapshot(
+ long tableId, Map<String, Long> partitionNameByIds) {
+ List<TableBucket> allBuckets = new ArrayList<>();
for (Map.Entry<String, Long> entry : partitionNameByIds.entrySet()) {
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
- TableBucket tableBucket = new TableBucket(tableId,
entry.getValue(), i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ allBuckets.add(new TableBucket(tableId, entry.getValue(), i));
}
}
- }
-
- protected void waitUntilSnapshot(long tableId, @Nullable Long partitionId,
long snapshotId) {
- for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
- TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(allBuckets);
}
protected static Map<Long, Map<Integer, Long>>
upsertRowForPartitionedTable(
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 84e5bd5db..15f62f3d1 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -111,8 +111,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Integer, Long> bucketOffsetOfSecondWrite =
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10);
- long snapshotId = 0;
- waitUntilSnapshot(tableId, snapshotId);
+ triggerAndWaitSnapshot(tableId);
// request tiering table splits
for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
@@ -150,7 +149,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
final Map<Integer, Long> bucketOffsetOfInitialWrite =
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10);
long snapshotId = 0;
- waitUntilSnapshot(tableId, snapshotId);
+ triggerAndWaitSnapshot(tableId);
int expectNumberOfSplits = 3;
@@ -203,8 +202,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Integer, Long> bucketOffsetOfSecondWrite =
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 10, 20);
- snapshotId = 1;
- waitUntilSnapshot(tableId, snapshotId);
+ triggerAndWaitSnapshot(tableId);
// request tiering table splits
for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
@@ -392,7 +390,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
upsertRowForPartitionedTable(
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR,
partitionNameByIds, 10, 20);
- waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, 0);
+ triggerAndWaitUntilPartitionTableSnapshot(tableId,
partitionNameByIds);
// request tiering table splits
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 32088399e..bfa814ce2 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -30,7 +30,6 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -70,8 +69,6 @@ public class FlinkTestBase extends AbstractTestBase {
FlussClusterExtension.builder()
.setClusterConf(
new Configuration()
- // set snapshot interval to 1s for testing
purposes
- .set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofSeconds(1))
// not to clean snapshots for test purpose
.set(
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
@@ -181,13 +178,6 @@ public class FlinkTestBase extends AbstractTestBase {
return admin.getTableInfo(tablePath).get().getTableId();
}
- protected void waitUntilSnapshot(long tableId, long snapshotId) {
- for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
-
/**
* Wait until the default number of partitions is created. Return the map
from partition id to
* partition name.
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 99fd340cf..5bd021b9a 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -108,8 +108,7 @@ public class FlinkIcebergTieringTestBase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+ conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
// Configure the tiering sink to be Iceberg
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
@@ -319,13 +318,6 @@ public class FlinkIcebergTieringTestBase {
}
}
- protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
- for (int i = 0; i < bucketNum; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
-
protected List<Record> getIcebergRecords(TablePath tablePath) throws
IOException {
List<Record> icebergRecords = new ArrayList<>();
try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 11de6baed..0e1c80eea 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -172,7 +172,7 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
BinaryString.fromString("abc"),
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
writeRows(t1, rows, false);
- waitUntilSnapshot(t1Id, 1, 0);
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(t1);
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 0b94cedb5..8a429764d 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -78,9 +78,8 @@ public class FlinkLanceTieringTestBase {
private static Configuration initConfig() {
Configuration conf = new Configuration();
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- // not to clean snapshots for test purpose
- .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+ // not to clean snapshots for test purpose
+ conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
conf.setString("datalake.format", "lance");
try {
warehousePath =
@@ -210,11 +209,4 @@ public class FlinkLanceTieringTestBase {
DataLakeFormat.LANCE.toString())
.build();
}
-
- protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
- for (int i = 0; i < bucketNum; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index e98596e9e..90ab136fd 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -93,9 +93,8 @@ public abstract class FlinkPaimonTieringTestBase {
protected static Configuration initConfig() {
Configuration conf = new Configuration();
- conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- // not to clean snapshots for test purpose
- .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+ // not to clean snapshots for test purpose
+ conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
conf.setString("datalake.format", "paimon");
conf.setString("datalake.paimon.metastore", "filesystem");
try {
@@ -163,11 +162,12 @@ public abstract class FlinkPaimonTieringTestBase {
return admin.getTableInfo(tablePath).get().getTableId();
}
- protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
+ protected void triggerAndWaitSnapshot(long tableId, int bucketNum) {
+ List<TableBucket> allBuckets = new ArrayList<>();
for (int i = 0; i < bucketNum; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- getFlussClusterExtension().waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ allBuckets.add(new TableBucket(tableId, i));
}
+ getFlussClusterExtension().triggerAndWaitSnapshots(allBuckets);
}
protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 19cbeb558..bf92cd5f5 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -104,7 +104,7 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
// write records
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
writeRows(t1, rows, false);
- waitUntilSnapshot(t1Id, 1, 0);
+ triggerAndWaitSnapshot(t1Id, 1);
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
@@ -339,7 +339,7 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
// write records
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
writeRows(t1, rows, false);
- waitUntilSnapshot(t1Id, 1, 0);
+ triggerAndWaitSnapshot(t1Id, 1);
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
@@ -445,7 +445,7 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
// write records
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
writeRows(t1, rows, false);
- waitUntilSnapshot(t1Id, 1, 0);
+ triggerAndWaitSnapshot(t1Id, 1);
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index 6520770c3..76248aab9 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -66,7 +66,7 @@ class ReCreateSameTableAfterTieringTest extends
FlinkPaimonTieringTestBase {
// write records
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
writeRows(t1, rows, false);
- waitUntilSnapshot(t1Id, 1, 0);
+ triggerAndWaitSnapshot(t1Id, 1);
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
@@ -84,7 +84,7 @@ class ReCreateSameTableAfterTieringTest extends
FlinkPaimonTieringTestBase {
List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"));
writeRows(t1, newRows, false);
// new table, so the snapshot id should be 0
- waitUntilSnapshot(t2Id, 1, 0);
+ triggerAndWaitSnapshot(t2Id, 1);
// check the status of replica after synced
assertReplicaStatus(t2Bucket, 2);
// check data in paimon
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
index a5d3ff025..fa058f6bc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
@@ -20,6 +20,9 @@ package org.apache.fluss.server;
/** A counter for generating unique sequence IDs. */
public interface SequenceIDCounter {
+ /** Gets the current sequence ID without incrementing it. */
+ long getCurrent() throws Exception;
+
/**
* Atomically increments the sequence ID.
*
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
index c84a26b70..e61a4999a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.snapshot;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
@@ -155,6 +156,16 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
this.snapshotsCleaner = new SnapshotsCleaner();
}
+ @Override
+ public long currentSnapshotId() {
+ try {
+ return snapshotIdCounter.getCurrent();
+ } catch (Exception e) {
+ throw new FlussRuntimeException(
+ "Failed to get current snapshot ID for TableBucket " +
tableBucket + ".", e);
+ }
+ }
+
@Override
public Optional<PeriodicSnapshotManager.SnapshotRunnable> initSnapshot()
throws Exception {
long logOffset = logOffsetSupplier.get();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index 0c4314469..caa1a531c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -168,6 +168,11 @@ public class PeriodicSnapshotManager implements Closeable {
}
}
+ @VisibleForTesting
+ public long currentSnapshotId() {
+ return target.currentSnapshotId();
+ }
+
public void triggerSnapshot() {
// todo: consider shrink the scope
// of using guardedExecutor
@@ -229,8 +234,9 @@ public class PeriodicSnapshotManager implements Closeable {
snapshotLocation,
snapshotResult);
LOG.info(
- "TableBucket {} snapshot finished
successfully, cost {} ms.",
+ "TableBucket {} snapshot {}
finished successfully, cost {} ms.",
tableBucket,
+ snapshotId,
System.currentTimeMillis() -
triggerTime);
} catch (Throwable t) {
LOG.warn(
@@ -312,6 +318,10 @@ public class PeriodicSnapshotManager implements Closeable {
/** {@link SnapshotRunnable} provider and consumer. */
@NotThreadSafe
public interface SnapshotTarget {
+
+ /** Gets current snapshot id. */
+ long currentSnapshotId();
+
/**
* Initialize kv snapshot.
*
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e0f3d3933..7a2003786 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1932,4 +1932,10 @@ public final class Replica {
public SchemaGetter getSchemaGetter() {
return schemaGetter;
}
+
+ @VisibleForTesting
+ @Nullable
+ public PeriodicSnapshotManager getKvSnapshotManager() {
+ return kvSnapshotManager;
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
index 7781340bc..ceec056da 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
@@ -47,6 +47,11 @@ public class ZkSequenceIDCounter implements
SequenceIDCounter {
BASE_SLEEP_MS, MAX_SLEEP_MS, RETRY_TIMES));
}
+ @Override
+ public long getCurrent() throws Exception {
+ return sequenceIdCounter.get().postValue();
+ }
+
/**
* Atomically increments the current sequence ID.
*
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
index 72f10f5d4..7ec78165f 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
@@ -39,6 +39,11 @@ public class TestingSequenceIDCounter implements
SequenceIDCounter {
this.failedTrigger = failedTrigger;
}
+ @Override
+ public long getCurrent() {
+ return idGenerator.get();
+ }
+
@Override
public long getAndIncrement() {
return idGenerator.getAndIncrement();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 9f26bc772..4077e0e65 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -655,6 +655,11 @@ class KvTabletSnapshotTargetTest {
private class TestingSnapshotIDCounter implements SequenceIDCounter {
+ @Override
+ public long getCurrent() {
+ return snapshotIdGenerator.get();
+ }
+
@Override
public long getAndIncrement() {
return snapshotIdGenerator.getAndIncrement();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
index 774232ed9..31e53490d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
@@ -157,6 +157,11 @@ class PeriodicSnapshotManagerTest {
private static class NopSnapshotTarget implements
PeriodicSnapshotManager.SnapshotTarget {
private static final NopSnapshotTarget INSTANCE = new
NopSnapshotTarget();
+ @Override
+ public long currentSnapshotId() {
+ return 0;
+ }
+
@Override
public Optional<PeriodicSnapshotManager.SnapshotRunnable>
initSnapshot() {
return Optional.empty();
@@ -205,6 +210,11 @@ class PeriodicSnapshotManagerTest {
}
}
+ @Override
+ public long currentSnapshotId() {
+ return 0;
+ }
+
@Override
public Optional<PeriodicSnapshotManager.SnapshotRunnable>
initSnapshot() {
RunnableFuture<SnapshotResult> runnableFuture =
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index b63138ea8..43b12943d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -51,6 +51,7 @@ import
org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
+import org.apache.fluss.server.kv.snapshot.PeriodicSnapshotManager;
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.replica.Replica;
@@ -65,6 +66,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;
@@ -103,6 +105,7 @@ import static
org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
import static org.apache.fluss.utils.function.FunctionUtils.uncheckedFunction;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/**
* A Junit {@link Extension} which starts a Fluss Cluster.
@@ -681,7 +684,90 @@ public final class FlussClusterExtension
});
}
- public CompletedSnapshot waitUntilSnapshotFinished(TableBucket
tableBucket, long snapshotId) {
+ public void triggerAndWaitSnapshot(TablePath tablePath) throws Exception {
+ Optional<TableRegistration> table =
zooKeeperClient.getTable(tablePath);
+ //noinspection SimplifyOptionalCallChains (Java 8 compatibility)
+ if (!table.isPresent()) {
+ throw new IllegalStateException("Table not found for table path "
+ tablePath);
+ }
+
+ TableRegistration tableRegistration = table.get();
+ long tableId = tableRegistration.tableId;
+ int bucketCount = tableRegistration.bucketCount;
+
+ List<TableBucket> tableBuckets = new ArrayList<>();
+ if (!tableRegistration.isPartitioned()) {
+ for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+ tableBuckets.add(new TableBucket(tableId, null, bucketId));
+ }
+ } else {
+ Map<String, Long> partitions =
zooKeeperClient.getPartitionNameAndIds(tablePath);
+ for (Long partitionId : partitions.values()) {
+ for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+ tableBuckets.add(new TableBucket(tableId, partitionId,
bucketId));
+ }
+ }
+ }
+
+ // trigger and wait all snapshots finished
+ triggerAndWaitSnapshots(tableBuckets);
+ }
+
+ public void triggerAndWaitSnapshots(Collection<TableBucket> tableBuckets) {
+ Map<TableBucket, Long> snapshotMap = new HashMap<>();
+ for (TableBucket tableBucket : tableBuckets) {
+ Long snapshotId = triggerSnapshot(tableBucket);
+ if (snapshotId != null) {
+ snapshotMap.put(tableBucket, snapshotId);
+ }
+ }
+ // wait all snapshots finished
+ for (Map.Entry<TableBucket, Long> entry : snapshotMap.entrySet()) {
+ waitUntilSnapshotFinished(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket) {
+ Long snapshotId = triggerSnapshot(tableBucket);
+ if (snapshotId != null) {
+ return waitUntilSnapshotFinished(tableBucket, snapshotId);
+ } else {
+ fail("No new snapshot triggered for table bucket " + tableBucket);
+ return null;
+ }
+ }
+
+ private Long triggerSnapshot(TableBucket tableBucket) {
+ Long snapshotId = null;
+ Long nextSnapshotId = null;
+ for (TabletServer ts : tabletServers.values()) {
+ ReplicaManager.HostedReplica replica =
ts.getReplicaManager().getReplica(tableBucket);
+ if (replica instanceof ReplicaManager.OnlineReplica) {
+ Replica r = ((ReplicaManager.OnlineReplica)
replica).getReplica();
+ PeriodicSnapshotManager kvSnapshotManager =
r.getKvSnapshotManager();
+ if (r.isLeader() && kvSnapshotManager != null) {
+ snapshotId = kvSnapshotManager.currentSnapshotId();
+ kvSnapshotManager.triggerSnapshot();
+ nextSnapshotId = kvSnapshotManager.currentSnapshotId();
+ break;
+ }
+ }
+ }
+
+ if (snapshotId != null) {
+ if (nextSnapshotId > snapshotId) {
+ // only there is a new snapshot triggered, we return the
snapshot id
+ return snapshotId;
+ } else {
+ return null;
+ }
+ } else {
+ fail("No KV snapshot manager found for table bucket " +
tableBucket);
+ return null;
+ }
+ }
+
+ private CompletedSnapshot waitUntilSnapshotFinished(TableBucket
tableBucket, long snapshotId) {
ZooKeeperClient zkClient = getZooKeeperClient();
return waitValue(
() -> {
@@ -693,7 +779,7 @@ public final class FlussClusterExtension
uncheckedFunction(
CompletedSnapshotHandle::retrieveCompleteSnapshot));
},
- Duration.ofMinutes(2),
+ Duration.ofSeconds(30),
String.format(
"Fail to wait bucket %s snapshot %d finished",
tableBucket, snapshotId));
}