This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a426a42101 [core] add more tests for clustering bucketed append table 
(#6981)
a426a42101 is described below

commit a426a421011fb285a4a79cc461f0fe240cf5ca8c
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jan 8 16:12:33 2026 +0800

    [core] add more tests for clustering bucketed append table (#6981)
---
 .../apache/paimon/flink/AppendOnlyTableITCase.java | 31 +++++++++
 .../action/IncrementalClusterActionITCase.java     | 77 ++++++++++++++++++++--
 2 files changed, 103 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 4a433f4e2e..6dc41981bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -472,6 +472,37 @@ public class AppendOnlyTableITCase extends 
CatalogITCaseBase {
                         Row.of(12, "MMM"));
     }
 
+    @Test
+    public void testAutoCluster() {
+        batchSql("ALTER TABLE append_table SET 
('num-sorted-run.compaction-trigger' = '3')");
+        batchSql("ALTER TABLE append_table SET ('num-levels' = '6')");
+        batchSql("ALTER TABLE append_table SET ('bucket-append-ordered' = 
'false')");
+        batchSql("ALTER TABLE append_table SET ('clustering.columns' = 
'data')");
+        batchSql("ALTER TABLE append_table SET ('clustering.strategy' = 
'order')");
+        batchSql("ALTER TABLE append_table SET ('clustering.incremental' = 
'true')");
+
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (1, '9')", 1L, 
Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (2, '8')", 2L, 
Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (3, '7')", 4L, 
Snapshot.CommitKind.COMPACT);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (4, '6')", 5L, 
Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (5, '5')", 7L, 
Snapshot.CommitKind.COMPACT);
+
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(5);
+        assertThat(rows)
+                .containsExactly(
+                        Row.of(5, "5"),
+                        Row.of(4, "6"),
+                        Row.of(3, "7"),
+                        Row.of(2, "8"),
+                        Row.of(1, "9"));
+    }
+
     @Test
     public void testRejectDelete() {
         testRejectChanges(RowKind.DELETE);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 1538f8c083..86800b590f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -40,10 +40,12 @@ import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -56,6 +58,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -686,6 +689,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
     @Test
     public void testClusterWithBucket() throws Exception {
         Map<String, String> dynamicOptions = commonOptions();
+        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
         dynamicOptions.put(CoreOptions.BUCKET.key(), "2");
         dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
         dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
@@ -848,6 +852,35 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(result5).containsExactlyElementsOf(expected5);
     }
 
+    @Test
+    public void testStreamingClusterWithBucket() throws Exception {
+        Map<String, String> dynamicOptions = commonOptions();
+        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+        dynamicOptions.put(CoreOptions.BUCKET.key(), "1");
+        dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
+        dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+        dynamicOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        FileStoreTable table = createTable(null, dynamicOptions);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        // base records
+        writeData(GenericRow.of(2, 2, BinaryString.fromString("test"), 0));
+        writeData(GenericRow.of(2, 1, BinaryString.fromString("test"), 0));
+        writeData(GenericRow.of(2, 0, BinaryString.fromString("test"), 0));
+
+        checkSnapshot(table, Snapshot.CommitKind.APPEND);
+        runAction(true, Collections.emptyList());
+        checkSnapshot(table, 4, Snapshot.CommitKind.COMPACT, 60_000);
+
+        // incremental records
+        writeData(GenericRow.of(1, 2, BinaryString.fromString("test"), 0));
+        writeData(GenericRow.of(1, 1, BinaryString.fromString("test"), 0));
+        checkSnapshot(table, 7, Snapshot.CommitKind.COMPACT, 60_000);
+    }
+
     protected FileStoreTable createTable(String partitionKeys) throws 
Exception {
         return createTable(partitionKeys, commonOptions());
     }
@@ -923,8 +956,28 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
     }
 
     private void checkSnapshot(FileStoreTable table) {
-        assertThat(table.latestSnapshot().get().commitKind())
-                .isEqualTo(Snapshot.CommitKind.COMPACT);
+        checkSnapshot(table, Snapshot.CommitKind.COMPACT);
+    }
+
+    private void checkSnapshot(FileStoreTable table, Snapshot.CommitKind 
commitKind) {
+        
assertThat(table.latestSnapshot().get().commitKind()).isEqualTo(commitKind);
+    }
+
+    protected void checkSnapshot(
+            FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind, long timeout)
+            throws Exception {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        long start = System.currentTimeMillis();
+        while (!Objects.equals(snapshotManager.latestSnapshotId(), 
snapshotId)) {
+            Thread.sleep(500);
+            if (System.currentTimeMillis() - start > timeout) {
+                throw new RuntimeException("can't wait for a compaction.");
+            }
+        }
+
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(snapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(commitKind);
     }
 
     private List<CommitMessage> produceDvIndexMessages(
@@ -954,7 +1007,16 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
     }
 
     private void runAction(List<String> extra) throws Exception {
-        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
+        runAction(false, extra);
+    }
+
+    private void runAction(boolean isStreaming, List<String> extra) throws 
Exception {
+        StreamExecutionEnvironment env;
+        if (isStreaming) {
+            env = streamExecutionEnvironmentBuilder().streamingMode().build();
+        } else {
+            env = streamExecutionEnvironmentBuilder().batchMode().build();
+        }
         ArrayList<String> baseArgs =
                 Lists.newArrayList("compact", "--database", database, 
"--table", tableName);
         ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -966,7 +1028,12 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         baseArgs.addAll(extra);
 
         CompactAction action = createAction(CompactAction.class, 
baseArgs.toArray(new String[0]));
-        action.withStreamExecutionEnvironment(env);
-        action.run();
+        action.withStreamExecutionEnvironment(env).build();
+        if (isStreaming) {
+            env.executeAsync();
+        } else {
+            env.execute();
+        }
+        //        action.run();
     }
 }

Reply via email to