This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ec768f3b [core] fix bug of watermark override with append only table 
(#3872)
2ec768f3b is described below

commit 2ec768f3b6b2c5239fe65a7db75a5cff1c7d5c42
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 21:26:14 2024 +0800

    [core] fix bug of watermark override with append only table (#3872)
---
 .../UnawareBucketCompactionTopoBuilder.java        |  8 +++---
 .../flink/source/BucketUnawareCompactSource.java   | 10 +++++++-
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 30 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index d10c5c11c..ee44a72d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -71,14 +71,14 @@ public class UnawareBucketCompactionTopoBuilder {
 
     public void build() {
         // build source from UnawareSourceFunction
-        DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+        DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);
 
         // from source, construct the full flink job
         sinkFromSource(source);
     }
 
     public DataStream<Committable> fetchUncommitted(String commitUser) {
-        DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+        DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);
 
         // rebalance input to default or assigned parallelism
         DataStream<AppendOnlyCompactionTask> rebalanced = 
rebalanceInput(source);
@@ -87,11 +87,11 @@ public class UnawareBucketCompactionTopoBuilder {
                 .doWrite(rebalanced, commitUser, rebalanced.getParallelism());
     }
 
-    private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
+    private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean 
emitMaxWatermark) {
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         BucketUnawareCompactSource source =
                 new BucketUnawareCompactSource(
-                        table, isContinuous, scanInterval, partitionPredicate);
+                        table, isContinuous, scanInterval, partitionPredicate, 
emitMaxWatermark);
 
         return BucketUnawareCompactSource.buildSource(env, source, 
isContinuous, tableIdentifier);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 7926fa60a..936b2cd2c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
     private transient AppendOnlyTableCompactionCoordinator 
compactionCoordinator;
     private transient SourceContext<AppendOnlyCompactionTask> ctx;
     private volatile boolean isRunning = true;
+    private final boolean emitMaxWatermark;
 
     public BucketUnawareCompactSource(
             FileStoreTable table,
             boolean isStreaming,
             long scanInterval,
-            @Nullable Predicate filter) {
+            @Nullable Predicate filter,
+            boolean emitMaxWatermark) {
         this.table = table;
         this.streaming = isStreaming;
         this.scanInterval = scanInterval;
         this.filter = filter;
+        this.emitMaxWatermark = emitMaxWatermark;
     }
 
     @Override
@@ -94,6 +98,10 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
                     List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.run();
                     isEmpty = tasks.isEmpty();
                     tasks.forEach(ctx::collect);
+
+                    if (emitMaxWatermark) {
+                        ctx.emitWatermark(Watermark.MAX_WATERMARK);
+                    }
                 } catch (EndOfScanException esf) {
                     LOG.info("Catching EndOfStreamException, the stream is 
finished.");
                     return;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 85bee4bb5..55bd89b01 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -208,6 +209,35 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
         assertThat(rows.size()).isEqualTo(10);
     }
 
+    @Test
+    public void testCompactionInStreamingModeWithMaxWatermark() throws 
Exception {
+        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
+        batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE Orders_in (\n"
+                        + "    f0        INT,\n"
+                        + "    f1        STRING,\n"
+                        + "    ts        TIMESTAMP(3),\n"
+                        + "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'rows-per-second' = '1',\n"
+                        + "    'number-of-rows' = '10'\n"
+                        + ")");
+
+        assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM 
Orders_in", 60000);
+        // ensure data gen finished
+        Thread.sleep(5000);
+
+        Snapshot snapshot = findLatestSnapshot("append_table");
+        Assertions.assertNotNull(snapshot);
+        Long watermark = snapshot.watermark();
+        Assertions.assertNotNull(watermark);
+        Assertions.assertTrue(watermark > Long.MIN_VALUE);
+    }
+
     @Test
     public void testRejectDelete() {
         testRejectChanges(RowKind.DELETE);

Reply via email to