This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d66e17e1 [flink] support watermark in batch mode (#3426)
0d66e17e1 is described below

commit 0d66e17e13936130ec4d2cde41198ce19257fa2a
Author: wangwj <[email protected]>
AuthorDate: Tue Jun 4 10:23:39 2024 +0800

    [flink] support watermark in batch mode (#3426)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |   7 +
 .../flink/sink/CombinedTableCompactorSink.java     |   4 +-
 .../paimon/flink/sink/CommitterOperator.java       |  25 +++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   4 +-
 .../paimon/flink/FlinkEndInputWatermarkITCase.java | 188 +++++++++++++++++++++
 6 files changed, 232 insertions(+), 2 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6787cee3b..f45486f02 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Boolean</td>
             <td>When changelog-producer is set to LOOKUP, commit will wait for 
changelog generation by lookup.</td>
         </tr>
+        <tr>
+            <td><h5>end.input.watermark</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>Optional endInput watermark used in case of batch mode or 
bounded stream.</td>
+        </tr>
         <tr>
             <td><h5>lookup.async</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 0256a5500..ea48e8bf1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -424,6 +424,13 @@ public class FlinkConnectorOptions {
                             "Specifies the sample factor. Let S represent the 
total number of samples, F represent the sample factor, "
                                     + "and P represent the sink parallelism, 
then S=F×P. The minimum allowed sample factor is 20.");
 
+    public static final ConfigOption<Long> END_INPUT_WATERMARK =
+            key("end.input.watermark")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional endInput watermark used in case of batch 
mode or bounded stream.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 02351a037..d6128c249 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -40,6 +40,7 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
@@ -152,7 +153,8 @@ public class CombinedTableCompactorSink implements 
Serializable {
                                         
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
                                         commitUser,
                                         createCommitterFactory(),
-                                        createCommittableStateManager()))
+                                        createCommittableStateManager(),
+                                        options.get(END_INPUT_WATERMARK)))
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 3220512b0..1bb087aa9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -86,6 +86,8 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
 
     private transient String commitUser;
 
+    private final Long endInputWatermark;
+
     public CommitterOperator(
             boolean streamingCheckpointEnabled,
             boolean forceSingleParallelism,
@@ -93,12 +95,31 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
             String initialCommitUser,
             Committer.Factory<CommitT, GlobalCommitT> committerFactory,
             CommittableStateManager<GlobalCommitT> committableStateManager) {
+        this(
+                streamingCheckpointEnabled,
+                forceSingleParallelism,
+                chaining,
+                initialCommitUser,
+                committerFactory,
+                committableStateManager,
+                null);
+    }
+
+    public CommitterOperator(
+            boolean streamingCheckpointEnabled,
+            boolean forceSingleParallelism,
+            boolean chaining,
+            String initialCommitUser,
+            Committer.Factory<CommitT, GlobalCommitT> committerFactory,
+            CommittableStateManager<GlobalCommitT> committableStateManager,
+            Long endInputWatermark) {
         this.streamingCheckpointEnabled = streamingCheckpointEnabled;
         this.forceSingleParallelism = forceSingleParallelism;
         this.initialCommitUser = initialCommitUser;
         this.committablesPerCheckpoint = new TreeMap<>();
         this.committerFactory = checkNotNull(committerFactory);
         this.committableStateManager = committableStateManager;
+        this.endInputWatermark = endInputWatermark;
         setChainingStrategy(chaining ? ChainingStrategy.ALWAYS : 
ChainingStrategy.NEVER);
     }
 
@@ -158,6 +179,10 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     @Override
     public void endInput() throws Exception {
         endInput = true;
+        if (endInputWatermark != null) {
+            currentWatermark = endInputWatermark;
+        }
+
         if (streamingCheckpointEnabled) {
             return;
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index c3460f39c..2473b9a93 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -54,6 +54,7 @@ import java.util.UUID;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
@@ -234,7 +235,8 @@ public abstract class FlinkSink<T> implements Serializable {
                         options.get(SINK_COMMITTER_OPERATOR_CHAINING),
                         commitUser,
                         createCommitterFactory(),
-                        createCommittableStateManager());
+                        createCommittableStateManager(),
+                        options.get(END_INPUT_WATERMARK));
 
         if (options.get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
             committerOperator =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
new file mode 100644
index 000000000..3480a426e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.source.ContinuousFileStoreSource;
+import org.apache.paimon.flink.source.StaticFileStoreSource;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
+import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
+
+/**
+ * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
+ * FixedBucketSink}.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {
+
+    private static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("v", new IntType()),
+                            new RowType.RowField("p", new VarCharType(10)),
+                            // rename key
+                            new RowType.RowField("_k", new IntType())));
+
+    private static final List<RowData> SOURCE_DATA =
+            Arrays.asList(
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+                    wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p2"), 
1)));
+
+    private static SerializableRowData wrap(RowData row) {
+        return new SerializableRowData(row, 
InternalSerializers.create(TABLE_TYPE));
+    }
+
+    private static final long END_INPUT_WATERMARK = 11111;
+
+    private final StreamExecutionEnvironment env;
+
+    public FlinkEndInputWatermarkITCase() {
+        this.env = 
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+    }
+
+    @Parameters(name = "isBatch-{0}")
+    public static List<Boolean> getVarSeg() {
+        return Arrays.asList(true, false);
+    }
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
+    }
+
+    @TestTemplate
+    public void testEndInputWatermarkBySQL() throws Exception {
+        batchSql(
+                "INSERT INTO T /*+ OPTIONS('end.input.watermark'= '%s') */ 
VALUES (1, 11, 111), (2, 22, 222)",
+                String.valueOf(END_INPUT_WATERMARK));
+
+        FileStoreTable table = paimonTable("T");
+        Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
+        Long waterMark = table.snapshotManager().latestSnapshot().watermark();
+        Assertions.assertNotNull(waterMark);
+        Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+    }
+
+    @TestTemplate
+    public void testEndInputWatermark() throws Exception {
+        FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
+
+        // write
+        DataStreamSource<RowData> source =
+                env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(TABLE_TYPE));
+        DataStream<Row> input =
+                source.map(
+                                (MapFunction<RowData, Row>)
+                                        r ->
+                                                Row.of(
+                                                        r.getInt(0),
+                                                        
r.getString(1).toString(),
+                                                        r.getInt(2)))
+                        .setParallelism(source.getParallelism());
+        DataType inputType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("v", DataTypes.INT()),
+                        DataTypes.FIELD("p", DataTypes.STRING()),
+                        DataTypes.FIELD("_k", DataTypes.INT()));
+        new FlinkSinkBuilder(table).forRow(input, inputType).build();
+        env.execute();
+
+        Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
+        Long waterMark = table.snapshotManager().latestSnapshot().watermark();
+        Assertions.assertNotNull(waterMark);
+        Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+    }
+
+    private FileStoreTable buildFileStoreTable(int[] partitions, int[] 
primaryKey)
+            throws Exception {
+        Options options = new Options();
+        options.set(BUCKET, 3);
+        options.set(PATH, getTempDirPath());
+        options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
+        options.set(
+                FlinkConnectorOptions.END_INPUT_WATERMARK.key(),
+                String.valueOf(END_INPUT_WATERMARK));
+
+        Path tablePath = new CoreOptions(options.toMap()).path();
+        if (primaryKey.length == 0) {
+            options.set(BUCKET_KEY, "_k");
+        }
+        Schema schema =
+                new Schema(
+                        toDataType(TABLE_TYPE).getFields(),
+                        Arrays.stream(partitions)
+                                .mapToObj(i -> 
TABLE_TYPE.getFieldNames().get(i))
+                                .collect(Collectors.toList()),
+                        Arrays.stream(primaryKey)
+                                .mapToObj(i -> 
TABLE_TYPE.getFieldNames().get(i))
+                                .collect(Collectors.toList()),
+                        options.toMap(),
+                        "");
+        return retryArtificialException(
+                () -> {
+                    new SchemaManager(LocalFileIO.create(), 
tablePath).createTable(schema);
+                    return FileStoreTableFactory.create(LocalFileIO.create(), 
options);
+                });
+    }
+}

Reply via email to