This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ad115a5 [flink] disable compaction when unaware-bucket table writes 
in batch mode (#1455)
c7ad115a5 is described below

commit c7ad115a554dd79e8891e55018527434842da2e1
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jun 29 17:39:23 2023 +0800

    [flink] disable compaction when unaware-bucket table writes in batch mode 
(#1455)
---
 .../paimon/flink/sink/UnawareBucketWriteSink.java  | 15 ++---
 .../org/apache/paimon/flink/CatalogITCaseBase.java |  8 +++
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 78 +++++++++++++++++-----
 3 files changed, 77 insertions(+), 24 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
index 84e1a4818..ad10f96ba 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
@@ -57,18 +57,17 @@ public class UnawareBucketWriteSink extends FileStoreSink {
         // do the actually writing action, no snapshot generated in this stage
         DataStream<Committable> written = doWrite(input, initialCommitUser, 
parallelism);
 
+        boolean isStreamingMode =
+                input.getExecutionEnvironment()
+                                .getConfiguration()
+                                .get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
         // if enable compaction, we need to add compaction topology to this job
-        if (enableCompaction) {
-            boolean isStreamingMode =
-                    input.getExecutionEnvironment()
-                                    .getConfiguration()
-                                    .get(ExecutionOptions.RUNTIME_MODE)
-                            == RuntimeExecutionMode.STREAMING;
-
+        if (enableCompaction && isStreamingMode) {
             UnawareBucketCompactionTopoBuilder builder =
                     new UnawareBucketCompactionTopoBuilder(
                             input.getExecutionEnvironment(), table.name(), 
table);
-            builder.withContinuousMode(isStreamingMode);
+            builder.withContinuousMode(true);
             written = 
written.union(builder.fetchUncommitted(initialCommitUser));
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index c15a6841a..d49afc681 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -173,6 +173,14 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         return id == null ? null : snapshotManager.snapshot(id);
     }
 
+    @Nullable
+    protected Snapshot findSnapshot(String tableName, long snapshotId) {
+        SnapshotManager snapshotManager =
+                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
+        Long id = snapshotManager.latestSnapshotId();
+        return id == null ? null : id >= snapshotId ? 
snapshotManager.snapshot(snapshotId) : null;
+    }
+
     protected String toWarehouse(String path) {
         return path;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 915f66015..8e3840eca 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -36,6 +37,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -121,42 +123,42 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testAutoCompaction() {
+    public void testNoCompactionInBatchMode() {
         batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
         batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
 
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')",
                 1L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')",
                 2L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 
'CCC'), (4, 'DDD')",
                 3L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')",
                 4L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')",
-                6L,
-                Snapshot.CommitKind.COMPACT);
-        assertAutoCompaction(
+                5L,
+                Snapshot.CommitKind.APPEND);
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')",
-                7L,
+                6L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')",
-                8L,
+                7L,
                 Snapshot.CommitKind.APPEND);
-        assertAutoCompaction(
+        assertExecuteExpected(
                 "INSERT INTO append_table VALUES (13, 'NNN'), (14, 'OOO')",
-                10L,
-                Snapshot.CommitKind.COMPACT);
+                8L,
+                Snapshot.CommitKind.APPEND);
 
         List<Row> rows = batchSql("SELECT * FROM append_table");
         assertThat(rows.size()).isEqualTo(18);
@@ -182,6 +184,28 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
                         Row.of(14, "OOO"));
     }
 
+    @Test
+    public void testCompactionInStreamingMode() throws Exception {
+        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
+        batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE Orders_in (\n"
+                        + "    f0        INT,\n"
+                        + "    f1        STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'rows-per-second' = '1',\n"
+                        + "    'number-of-rows' = '4'\n"
+                        + ")");
+
+        assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM 
Orders_in", 60000);
+
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(4);
+    }
+
     @Test
     public void testRejectDelete() {
         testRejectChanges(RowKind.DELETE);
@@ -308,11 +332,33 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
                 .hasRootCauseMessage("Append only writer can not accept row 
with RowKind %s", kind);
     }
 
-    private void assertAutoCompaction(
+    private void assertExecuteExpected(
             String sql, long expectedSnapshotId, Snapshot.CommitKind 
expectedCommitKind) {
         batchSql(sql);
         Snapshot snapshot = findLatestSnapshot("append_table");
         assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
         assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
     }
+
+    private void assertStreamingHasCompact(String sql, long timeout) throws 
Exception {
+        long start = System.currentTimeMillis();
+        long currentId = 1;
+        sEnv.executeSql(sql);
+        Snapshot snapshot;
+        while (true) {
+            snapshot = findSnapshot("append_table", currentId);
+            if (snapshot != null) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    break;
+                }
+                currentId++;
+            }
+            long now = System.currentTimeMillis();
+            if (now - start > timeout) {
+                throw new RuntimeException(
+                        "Time up for streaming execute, don't get expected 
result.");
+            }
+            Thread.sleep(1000);
+        }
+    }
 }

Reply via email to