This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 845364f7a [flink] Introduce watermark alignment options (#673)
845364f7a is described below
commit 845364f7a2463dc83033a7205680588ba14e397c
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 21 20:10:14 2023 +0800
[flink] Introduce watermark alignment options (#673)
---
docs/content/how-to/querying-tables.md | 72 ++++++++++++++++++-
.../shortcodes/generated/core_configuration.html | 6 ++
.../generated/flink_connector_configuration.html | 30 ++++++++
.../org/apache/paimon/flink/WatermarkITCase.java | 81 ++++++++++++++++++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 67 ++++++++++++++++++
.../paimon/flink/source/DataTableSource.java | 35 ++++++++++
.../flink/source/OnEventWatermarkStrategy.java | 57 +++++++++++++++
.../paimon/flink/source/WatermarkAlignUtils.java | 43 ++++++++++++
.../org/apache/paimon/flink/WatermarkITCase.java | 76 ++++++++++++++++++++
9 files changed, 465 insertions(+), 2 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 5f85b7a4c..ca3108ea5 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -87,9 +87,77 @@ Users can also adjust `changelog-producer` table property to
specify the pattern
{{< img src="/img/scan-mode.png">}}
-{{< hint info >}}
+## Streaming Source
+
+Streaming source behavior is only supported in Flink engine at present.
+
+### Watermark Definition
+
+You can define watermark for reading Paimon tables:
+
+```sql
+CREATE TABLE T (
+ `user` BIGINT,
+ product STRING,
+ order_time TIMESTAMP(3),
+ WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
+);
+
+-- launch a bounded streaming job to read paimon_table
+SELECT window_start, window_end, SUM(f0) FROM
+ TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY
window_start, window_end; */;
+```
+
+You can also enable [Flink Watermark
alignment](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_),
+which will make sure no sources/splits/shards/partitions increase their
watermarks too far ahead of the rest:
+
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>scan.watermark.alignment.group</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A group of sources to align watermarks.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.watermark.alignment.max-drift</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Maximal drift to align watermarks, before we pause consuming
from the source/task/partition.</td>
+ </tr>
+ </tbody>
+</table>
+
+### Bounded Stream
+
Streaming Source can also be bounded, you can specify 'scan.bounded.watermark'
to define the end condition for bounded streaming mode, stream reading will end
until a larger watermark snapshot is encountered.
-{{< /hint >}}
+
+Watermark in snapshot is generated by writer, for example, you can specify a
kafka source and declare the definition of watermark.
+When using this kafka source to write to Paimon table, the snapshots of Paimon
table will generate the corresponding watermark,
+so that you can use the feature of bounded watermark when streaming reads of
this Paimon table.
+
+```sql
+CREATE TABLE kafka_table (
+ `user` BIGINT,
+ product STRING,
+ order_time TIMESTAMP(3),
+ WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
+) WITH ('connector' = 'kafka'...);
+
+-- launch a streaming insert job
+INSERT INTO paimon_table SELECT * FROM kakfa_table;
+
+-- launch a bounded streaming job to read paimon_table
+SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
+```
## System Tables
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9e38a144b..874f02da7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -254,6 +254,12 @@
<td>String</td>
<td>Define primary key by table options, cannot define primary key
on DDL and table options at the same time.</td>
</tr>
+ <tr>
+ <td><h5>read.batch-size</h5></td>
+ <td style="word-wrap: break-word;">1024</td>
+ <td>Integer</td>
+ <td>Read batch size for orc and parquet.</td>
+ </tr>
<tr>
<td><h5>scan.bounded.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index a5740c4b2..91486fd13 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,36 @@
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration.</td>
</tr>
+ <tr>
+ <td><h5>scan.watermark.alignment.group</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A group of sources to align watermarks.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.watermark.alignment.max-drift</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Maximal drift to align watermarks, before we pause consuming
from the source/task/partition.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.watermark.alignment.update-interval</h5></td>
+ <td style="word-wrap: break-word;">1 s</td>
+ <td>Duration</td>
+ <td>How often tasks should notify coordinator about the current
watermark and how often the coordinator should announce the maximal aligned
watermark.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.watermark.emit.strategy</h5></td>
+ <td style="word-wrap: break-word;">on-event</td>
+ <td><p>Enum</p></td>
+ <td>Emit strategy for watermark generation.<br /><br />Possible
values:<ul><li>"on-periodic": Emit watermark periodically, interval is
controlled by Flink 'pipeline.auto-watermark-interval'.</li><li>"on-event":
Emit watermark per record.</li></ul></td>
+ </tr>
+ <tr>
+ <td><h5>scan.watermark.idle-timeout</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>If no records flow in a partition of a stream for that amount
of time, then that partition is considered "idle" and will not hold back the
progress of watermarks in downstream operators.</td>
+ </tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
new file mode 100644
index 000000000..8247d6dd2
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
+ @Test
+ public void testWatermark() throws Exception {
+ innerTestWatermark();
+ }
+
+ @Test
+ public void testWatermarkAlignment() {
+ assertThatThrownBy(
+ () ->
+ innerTestWatermark(
+ "'scan.watermark.idle-timeout'='1s'",
+
"'scan.watermark.alignment.group'='group'",
+
"'scan.watermark.alignment.update-interval'='2s'",
+
"'scan.watermark.alignment.max-drift'='1s',"))
+ .hasMessageContaining(
+ "Flink 1.14 dose not support watermark alignment,
please check your Flink version");
+ }
+
+ private void innerTestWatermark(String... options) throws Exception {
+ sql(
+ "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS
ts) WITH ("
+ + String.join(",", options)
+ + " 'write-mode'='append-only')");
+
+ BlockingIterator<Row, Row> select =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT window_start, window_end, SUM(f0) FROM
TABLE("
+ + "TUMBLE(TABLE T, DESCRIPTOR(ts),
INTERVAL '10' MINUTES))\n"
+ + " GROUP BY window_start,
window_end"));
+
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')");
+
+ assertThat(select.collect(1))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ LocalDateTime.parse("2023-02-02T12:00"),
+ LocalDateTime.parse("2023-02-02T12:10"),
+ 1));
+ select.close();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ec459d3d8..357c7a2ad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -21,7 +21,9 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
+import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
import java.lang.reflect.Field;
@@ -30,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.options.description.TextElement.text;
/** Options for flink connector. */
public class FlinkConnectorOptions {
@@ -121,6 +124,43 @@ public class FlinkConnectorOptions {
+
CoreOptions.ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog
generation by lookup.");
+ public static final ConfigOption<WatermarkEmitStrategy>
SCAN_WATERMARK_EMIT_STRATEGY =
+ key("scan.watermark.emit.strategy")
+ .enumType(WatermarkEmitStrategy.class)
+ .defaultValue(WatermarkEmitStrategy.ON_EVENT)
+ .withDescription("Emit strategy for watermark
generation.");
+
+ public static final ConfigOption<String> SCAN_WATERMARK_ALIGNMENT_GROUP =
+ key("scan.watermark.alignment.group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("A group of sources to align
watermarks.");
+
+ public static final ConfigOption<Duration> SCAN_WATERMARK_IDLE_TIMEOUT =
+ key("scan.watermark.idle-timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "If no records flow in a partition of a stream for
that amount of time, then"
+ + " that partition is considered \"idle\"
and will not hold back the progress of"
+ + " watermarks in downstream operators.");
+
+ public static final ConfigOption<Duration>
SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT =
+ key("scan.watermark.alignment.max-drift")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximal drift to align watermarks, "
+ + "before we pause consuming from the
source/task/partition.");
+
+ public static final ConfigOption<Duration>
SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL =
+ key("scan.watermark.alignment.update-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription(
+ "How often tasks should notify coordinator about
the current watermark "
+ + "and how often the coordinator should
announce the maximal aligned watermark.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
@@ -135,4 +175,31 @@ public class FlinkConnectorOptions {
}
return list;
}
+
+ /** Watermark emit strategy for scan. */
+ public enum WatermarkEmitStrategy implements DescribedEnum {
+ ON_PERIODIC(
+ "on-periodic",
+ "Emit watermark periodically, interval is controlled by Flink
'pipeline.auto-watermark-interval'."),
+
+ ON_EVENT("on-event", "Emit watermark per record.");
+
+ private final String value;
+ private final String description;
+
+ WatermarkEmitStrategy(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index b7a533f8d..65030d91e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
@@ -46,12 +47,18 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
/**
* Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
@@ -156,6 +163,34 @@ public class DataTableSource extends FlinkTableSource
logStoreTableFactory.createSourceProvider(context,
scanContext, projectFields);
}
+ WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
+ Options options = table.options().toConfiguration();
+ if (watermarkStrategy != null) {
+ WatermarkEmitStrategy emitStrategy =
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
+ if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
+ watermarkStrategy = new
OnEventWatermarkStrategy(watermarkStrategy);
+ }
+ Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
+ if (idleTimeout != null) {
+ watermarkStrategy =
watermarkStrategy.withIdleness(idleTimeout);
+ }
+ String watermarkAlignGroup =
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
+ if (watermarkAlignGroup != null) {
+ try {
+ watermarkStrategy =
+ WatermarkAlignUtils.withWatermarkAlignment(
+ watermarkStrategy,
+ watermarkAlignGroup,
+
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
+
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
+ } catch (NoSuchMethodError error) {
+ throw new RuntimeException(
+ "Flink 1.14 dose not support watermark alignment,
please check your Flink version.",
+ error);
+ }
+ }
+ }
+
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(tableIdentifier, table)
.withContinuousMode(streaming)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
new file mode 100644
index 000000000..6ad2a3920
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.data.RowData;
+
+/** Paimon {@link WatermarkStrategy} to emit watermark on event. */
+public class OnEventWatermarkStrategy implements WatermarkStrategy<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final WatermarkStrategy<RowData> strategy;
+
+ public OnEventWatermarkStrategy(WatermarkStrategy<RowData> strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public WatermarkGenerator<RowData> createWatermarkGenerator(
+ WatermarkGeneratorSupplier.Context context) {
+ WatermarkGenerator<RowData> generator =
strategy.createWatermarkGenerator(context);
+ return new WatermarkGenerator<RowData>() {
+
+ @Override
+ public void onEvent(RowData event, long eventTimestamp,
WatermarkOutput output) {
+ generator.onEvent(event, eventTimestamp, output);
+ generator.onPeriodicEmit(output);
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+ // for idle watermark
+ generator.onPeriodicEmit(output);
+ }
+ };
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
new file mode 100644
index 000000000..84639aa76
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.data.RowData;
+
+import java.time.Duration;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Since Flink 1.15, watermark alignment is supported. */
+public class WatermarkAlignUtils {
+
+ public static WatermarkStrategy<RowData> withWatermarkAlignment(
+ WatermarkStrategy<RowData> strategy,
+ String group,
+ Duration drift,
+ Duration updateInterval) {
+ checkArgument(
+ drift != null,
+ String.format(
+ "Watermark alignment max drift can not be null when
group (%s) configured.",
+ group));
+ return strategy.withWatermarkAlignment(group, drift, updateInterval);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
new file mode 100644
index 000000000..51cf834fe
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
+ @Test
+ public void testWatermark() throws Exception {
+ innerTestWatermark();
+ }
+
+ @Test
+ public void testWatermarkAlignment() throws Exception {
+ innerTestWatermark(
+ "'scan.watermark.idle-timeout'='1s'",
+ "'scan.watermark.alignment.group'='group'",
+ "'scan.watermark.alignment.update-interval'='2s'",
+ "'scan.watermark.alignment.max-drift'='1s',");
+ }
+
+ private void innerTestWatermark(String... options) throws Exception {
+ sql(
+ "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS
ts) WITH ("
+ + String.join(",", options)
+ + " 'write-mode'='append-only')");
+
+ BlockingIterator<Row, Row> select =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT window_start, window_end, SUM(f0) FROM
TABLE("
+ + "TUMBLE(TABLE T, DESCRIPTOR(ts),
INTERVAL '10' MINUTES))\n"
+ + " GROUP BY window_start,
window_end;"));
+
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')");
+
+ assertThat(select.collect(1))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ LocalDateTime.parse("2023-02-02T12:00"),
+ LocalDateTime.parse("2023-02-02T12:10"),
+ 1));
+ select.close();
+ }
+}