This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0d66e17e1 [flink] support watermark in batch mode (#3426)
0d66e17e1 is described below
commit 0d66e17e13936130ec4d2cde41198ce19257fa2a
Author: wangwj <[email protected]>
AuthorDate: Tue Jun 4 10:23:39 2024 +0800
[flink] support watermark in batch mode (#3426)
---
.../generated/flink_connector_configuration.html | 6 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +
.../flink/sink/CombinedTableCompactorSink.java | 4 +-
.../paimon/flink/sink/CommitterOperator.java | 25 +++
.../org/apache/paimon/flink/sink/FlinkSink.java | 4 +-
.../paimon/flink/FlinkEndInputWatermarkITCase.java | 188 +++++++++++++++++++++
6 files changed, 232 insertions(+), 2 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6787cee3b..f45486f02 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for
changelog generation by lookup.</td>
</tr>
+ <tr>
+ <td><h5>end.input.watermark</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional endInput watermark used in case of batch mode or
bounded stream.</td>
+ </tr>
<tr>
<td><h5>lookup.async</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 0256a5500..ea48e8bf1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -424,6 +424,13 @@ public class FlinkConnectorOptions {
"Specifies the sample factor. Let S represent the
total number of samples, F represent the sample factor, "
+ "and P represent the sink parallelism,
then S=F×P. The minimum allowed sample factor is 20.");
+ public static final ConfigOption<Long> END_INPUT_WATERMARK =
+ key("end.input.watermark")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional endInput watermark used in case of batch
mode or bounded stream.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 02351a037..d6128c249 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -40,6 +40,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
@@ -152,7 +153,8 @@ public class CombinedTableCompactorSink implements
Serializable {
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(),
- createCommittableStateManager()))
+ createCommittableStateManager(),
+ options.get(END_INPUT_WATERMARK)))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 3220512b0..1bb087aa9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -86,6 +86,8 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
private transient String commitUser;
+ private final Long endInputWatermark;
+
public CommitterOperator(
boolean streamingCheckpointEnabled,
boolean forceSingleParallelism,
@@ -93,12 +95,31 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
String initialCommitUser,
Committer.Factory<CommitT, GlobalCommitT> committerFactory,
CommittableStateManager<GlobalCommitT> committableStateManager) {
+ this(
+ streamingCheckpointEnabled,
+ forceSingleParallelism,
+ chaining,
+ initialCommitUser,
+ committerFactory,
+ committableStateManager,
+ null);
+ }
+
+ public CommitterOperator(
+ boolean streamingCheckpointEnabled,
+ boolean forceSingleParallelism,
+ boolean chaining,
+ String initialCommitUser,
+ Committer.Factory<CommitT, GlobalCommitT> committerFactory,
+ CommittableStateManager<GlobalCommitT> committableStateManager,
+ Long endInputWatermark) {
this.streamingCheckpointEnabled = streamingCheckpointEnabled;
this.forceSingleParallelism = forceSingleParallelism;
this.initialCommitUser = initialCommitUser;
this.committablesPerCheckpoint = new TreeMap<>();
this.committerFactory = checkNotNull(committerFactory);
this.committableStateManager = committableStateManager;
+ this.endInputWatermark = endInputWatermark;
setChainingStrategy(chaining ? ChainingStrategy.ALWAYS :
ChainingStrategy.NEVER);
}
@@ -158,6 +179,10 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
@Override
public void endInput() throws Exception {
endInput = true;
+ if (endInputWatermark != null) {
+ currentWatermark = endInputWatermark;
+ }
+
if (streamingCheckpointEnabled) {
return;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index c3460f39c..2473b9a93 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -54,6 +54,7 @@ import java.util.UUID;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
@@ -234,7 +235,8 @@ public abstract class FlinkSink<T> implements Serializable {
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(),
- createCommittableStateManager());
+ createCommittableStateManager(),
+ options.get(END_INPUT_WATERMARK));
if (options.get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
committerOperator =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
new file mode 100644
index 000000000..3480a426e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.source.ContinuousFileStoreSource;
+import org.apache.paimon.flink.source.StaticFileStoreSource;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
+import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
+
+/**
+ * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource}
and {@link
+ * FixedBucketSink}.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {
+
+ private static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("v", new IntType()),
+ new RowType.RowField("p", new VarCharType(10)),
+ // rename key
+ new RowType.RowField("_k", new IntType())));
+
+ private static final List<RowData> SOURCE_DATA =
+ Arrays.asList(
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+ wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+ wrap(GenericRowData.of(5, StringData.fromString("p2"),
1)));
+
+ private static SerializableRowData wrap(RowData row) {
+ return new SerializableRowData(row,
InternalSerializers.create(TABLE_TYPE));
+ }
+
+ private static final long END_INPUT_WATERMARK = 11111;
+
+ private final StreamExecutionEnvironment env;
+
+ public FlinkEndInputWatermarkITCase() {
+ this.env =
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+ }
+
+ @Parameters(name = "isBatch-{0}")
+ public static List<Boolean> getVarSeg() {
+ return Arrays.asList(true, false);
+ }
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
+ }
+
+ @TestTemplate
+ public void testEndInputWatermarkBySQL() throws Exception {
+ batchSql(
+ "INSERT INTO T /*+ OPTIONS('end.input.watermark'= '%s') */
VALUES (1, 11, 111), (2, 22, 222)",
+ String.valueOf(END_INPUT_WATERMARK));
+
+ FileStoreTable table = paimonTable("T");
+ Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
+ Long waterMark = table.snapshotManager().latestSnapshot().watermark();
+ Assertions.assertNotNull(waterMark);
+ Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+ }
+
+ @TestTemplate
+ public void testEndInputWatermark() throws Exception {
+ FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
+
+ // write
+ DataStreamSource<RowData> source =
+ env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(TABLE_TYPE));
+ DataStream<Row> input =
+ source.map(
+ (MapFunction<RowData, Row>)
+ r ->
+ Row.of(
+ r.getInt(0),
+
r.getString(1).toString(),
+ r.getInt(2)))
+ .setParallelism(source.getParallelism());
+ DataType inputType =
+ DataTypes.ROW(
+ DataTypes.FIELD("v", DataTypes.INT()),
+ DataTypes.FIELD("p", DataTypes.STRING()),
+ DataTypes.FIELD("_k", DataTypes.INT()));
+ new FlinkSinkBuilder(table).forRow(input, inputType).build();
+ env.execute();
+
+ Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
+ Long waterMark = table.snapshotManager().latestSnapshot().watermark();
+ Assertions.assertNotNull(waterMark);
+ Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+ }
+
+ private FileStoreTable buildFileStoreTable(int[] partitions, int[]
primaryKey)
+ throws Exception {
+ Options options = new Options();
+ options.set(BUCKET, 3);
+ options.set(PATH, getTempDirPath());
+ options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
+ options.set(
+ FlinkConnectorOptions.END_INPUT_WATERMARK.key(),
+ String.valueOf(END_INPUT_WATERMARK));
+
+ Path tablePath = new CoreOptions(options.toMap()).path();
+ if (primaryKey.length == 0) {
+ options.set(BUCKET_KEY, "_k");
+ }
+ Schema schema =
+ new Schema(
+ toDataType(TABLE_TYPE).getFields(),
+ Arrays.stream(partitions)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ Arrays.stream(primaryKey)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ options.toMap(),
+ "");
+ return retryArtificialException(
+ () -> {
+ new SchemaManager(LocalFileIO.create(),
tablePath).createTable(schema);
+ return FileStoreTableFactory.create(LocalFileIO.create(),
options);
+ });
+ }
+}