This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2ec768f3b [core] fix bug of watermark override with append only table
(#3872)
2ec768f3b is described below
commit 2ec768f3b6b2c5239fe65a7db75a5cff1c7d5c42
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 21:26:14 2024 +0800
[core] fix bug of watermark override with append only table (#3872)
---
.../UnawareBucketCompactionTopoBuilder.java | 8 +++---
.../flink/source/BucketUnawareCompactSource.java | 10 +++++++-
.../flink/UnawareBucketAppendOnlyTableITCase.java | 30 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index d10c5c11c..ee44a72d4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -71,14 +71,14 @@ public class UnawareBucketCompactionTopoBuilder {
public void build() {
// build source from UnawareSourceFunction
- DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+ DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);
// from source, construct the full flink job
sinkFromSource(source);
}
public DataStream<Committable> fetchUncommitted(String commitUser) {
- DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+ DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);
// rebalance input to default or assigned parallelism
DataStream<AppendOnlyCompactionTask> rebalanced =
rebalanceInput(source);
@@ -87,11 +87,11 @@ public class UnawareBucketCompactionTopoBuilder {
.doWrite(rebalanced, commitUser, rebalanced.getParallelism());
}
- private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
+ private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean
emitMaxWatermark) {
long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
- table, isContinuous, scanInterval, partitionPredicate);
+ table, isContinuous, scanInterval, partitionPredicate,
emitMaxWatermark);
return BucketUnawareCompactSource.buildSource(env, source,
isContinuous, tableIdentifier);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 7926fa60a..936b2cd2c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -32,6 +32,7 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
private transient AppendOnlyTableCompactionCoordinator
compactionCoordinator;
private transient SourceContext<AppendOnlyCompactionTask> ctx;
private volatile boolean isRunning = true;
+ private final boolean emitMaxWatermark;
public BucketUnawareCompactSource(
FileStoreTable table,
boolean isStreaming,
long scanInterval,
- @Nullable Predicate filter) {
+ @Nullable Predicate filter,
+ boolean emitMaxWatermark) {
this.table = table;
this.streaming = isStreaming;
this.scanInterval = scanInterval;
this.filter = filter;
+ this.emitMaxWatermark = emitMaxWatermark;
}
@Override
@@ -94,6 +98,10 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
List<AppendOnlyCompactionTask> tasks =
compactionCoordinator.run();
isEmpty = tasks.isEmpty();
tasks.forEach(ctx::collect);
+
+ if (emitMaxWatermark) {
+ ctx.emitWatermark(Watermark.MAX_WATERMARK);
+ }
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is
finished.");
return;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 85bee4bb5..55bd89b01 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -208,6 +209,35 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
assertThat(rows.size()).isEqualTo(10);
}
+ @Test
+ public void testCompactionInStreamingModeWithMaxWatermark() throws
Exception {
+ batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
+ batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE Orders_in (\n"
+ + " f0 INT,\n"
+ + " f1 STRING,\n"
+ + " ts TIMESTAMP(3),\n"
+ + "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'number-of-rows' = '10'\n"
+ + ")");
+
+ assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM
Orders_in", 60000);
+ // ensure data gen finished
+ Thread.sleep(5000);
+
+ Snapshot snapshot = findLatestSnapshot("append_table");
+ Assertions.assertNotNull(snapshot);
+ Long watermark = snapshot.watermark();
+ Assertions.assertNotNull(watermark);
+ Assertions.assertTrue(watermark > Long.MIN_VALUE);
+ }
+
@Test
public void testRejectDelete() {
testRejectChanges(RowKind.DELETE);