This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a426a42101 [core] add more tests for clustering bucketed append table
(#6981)
a426a42101 is described below
commit a426a421011fb285a4a79cc461f0fe240cf5ca8c
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jan 8 16:12:33 2026 +0800
[core] add more tests for clustering bucketed append table (#6981)
---
.../apache/paimon/flink/AppendOnlyTableITCase.java | 31 +++++++++
.../action/IncrementalClusterActionITCase.java | 77 ++++++++++++++++++++--
2 files changed, 103 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 4a433f4e2e..6dc41981bf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -472,6 +472,37 @@ public class AppendOnlyTableITCase extends
CatalogITCaseBase {
Row.of(12, "MMM"));
}
+ @Test
+ public void testAutoCluster() {
+ batchSql("ALTER TABLE append_table SET
('num-sorted-run.compaction-trigger' = '3')");
+ batchSql("ALTER TABLE append_table SET ('num-levels' = '6')");
+ batchSql("ALTER TABLE append_table SET ('bucket-append-ordered' =
'false')");
+ batchSql("ALTER TABLE append_table SET ('clustering.columns' =
'data')");
+ batchSql("ALTER TABLE append_table SET ('clustering.strategy' =
'order')");
+ batchSql("ALTER TABLE append_table SET ('clustering.incremental' =
'true')");
+
+ assertAutoCompaction(
+ "INSERT INTO append_table VALUES (1, '9')", 1L,
Snapshot.CommitKind.APPEND);
+ assertAutoCompaction(
+ "INSERT INTO append_table VALUES (2, '8')", 2L,
Snapshot.CommitKind.APPEND);
+ assertAutoCompaction(
+ "INSERT INTO append_table VALUES (3, '7')", 4L,
Snapshot.CommitKind.COMPACT);
+ assertAutoCompaction(
+ "INSERT INTO append_table VALUES (4, '6')", 5L,
Snapshot.CommitKind.APPEND);
+ assertAutoCompaction(
+ "INSERT INTO append_table VALUES (5, '5')", 7L,
Snapshot.CommitKind.COMPACT);
+
+ List<Row> rows = batchSql("SELECT * FROM append_table");
+ assertThat(rows.size()).isEqualTo(5);
+ assertThat(rows)
+ .containsExactly(
+ Row.of(5, "5"),
+ Row.of(4, "6"),
+ Row.of(3, "7"),
+ Row.of(2, "8"),
+ Row.of(1, "9"));
+ }
+
@Test
public void testRejectDelete() {
testRejectChanges(RowKind.DELETE);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 1538f8c083..86800b590f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -40,10 +40,12 @@ import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -56,6 +58,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -686,6 +689,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterWithBucket() throws Exception {
Map<String, String> dynamicOptions = commonOptions();
+ dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
dynamicOptions.put(CoreOptions.BUCKET.key(), "2");
dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
@@ -848,6 +852,35 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(result5).containsExactlyElementsOf(expected5);
}
+ @Test
+ public void testStreamingClusterWithBucket() throws Exception {
+ Map<String, String> dynamicOptions = commonOptions();
+ dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ dynamicOptions.put(CoreOptions.BUCKET.key(), "1");
+ dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
+ dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+ dynamicOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ FileStoreTable table = createTable(null, dynamicOptions);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // base records
+ writeData(GenericRow.of(2, 2, BinaryString.fromString("test"), 0));
+ writeData(GenericRow.of(2, 1, BinaryString.fromString("test"), 0));
+ writeData(GenericRow.of(2, 0, BinaryString.fromString("test"), 0));
+
+ checkSnapshot(table, Snapshot.CommitKind.APPEND);
+ runAction(true, Collections.emptyList());
+ checkSnapshot(table, 4, Snapshot.CommitKind.COMPACT, 60_000);
+
+ // incremental records
+ writeData(GenericRow.of(1, 2, BinaryString.fromString("test"), 0));
+ writeData(GenericRow.of(1, 1, BinaryString.fromString("test"), 0));
+ checkSnapshot(table, 7, Snapshot.CommitKind.COMPACT, 60_000);
+ }
+
protected FileStoreTable createTable(String partitionKeys) throws
Exception {
return createTable(partitionKeys, commonOptions());
}
@@ -923,8 +956,28 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
}
private void checkSnapshot(FileStoreTable table) {
- assertThat(table.latestSnapshot().get().commitKind())
- .isEqualTo(Snapshot.CommitKind.COMPACT);
+ checkSnapshot(table, Snapshot.CommitKind.COMPACT);
+ }
+
+ private void checkSnapshot(FileStoreTable table, Snapshot.CommitKind
commitKind) {
+
assertThat(table.latestSnapshot().get().commitKind()).isEqualTo(commitKind);
+ }
+
+ protected void checkSnapshot(
+ FileStoreTable table, long snapshotId, Snapshot.CommitKind
commitKind, long timeout)
+ throws Exception {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ long start = System.currentTimeMillis();
+ while (!Objects.equals(snapshotManager.latestSnapshotId(),
snapshotId)) {
+ Thread.sleep(500);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new RuntimeException("can't wait for a compaction.");
+ }
+ }
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(snapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(commitKind);
}
private List<CommitMessage> produceDvIndexMessages(
@@ -954,7 +1007,16 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
}
private void runAction(List<String> extra) throws Exception {
- StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
+ runAction(false, extra);
+ }
+
+ private void runAction(boolean isStreaming, List<String> extra) throws
Exception {
+ StreamExecutionEnvironment env;
+ if (isStreaming) {
+ env = streamExecutionEnvironmentBuilder().streamingMode().build();
+ } else {
+ env = streamExecutionEnvironmentBuilder().batchMode().build();
+ }
ArrayList<String> baseArgs =
Lists.newArrayList("compact", "--database", database,
"--table", tableName);
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -966,7 +1028,12 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
baseArgs.addAll(extra);
CompactAction action = createAction(CompactAction.class,
baseArgs.toArray(new String[0]));
- action.withStreamExecutionEnvironment(env);
- action.run();
+ action.withStreamExecutionEnvironment(env).build();
+ if (isStreaming) {
+ env.executeAsync();
+ } else {
+ env.execute();
+ }
+ // action.run();
}
}