This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 845364f7a [flink] Introduce watermark alignment options (#673)
845364f7a is described below

commit 845364f7a2463dc83033a7205680588ba14e397c
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 21 20:10:14 2023 +0800

    [flink] Introduce watermark alignment options (#673)
---
 docs/content/how-to/querying-tables.md             | 72 ++++++++++++++++++-
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../generated/flink_connector_configuration.html   | 30 ++++++++
 .../org/apache/paimon/flink/WatermarkITCase.java   | 81 ++++++++++++++++++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java | 67 ++++++++++++++++++
 .../paimon/flink/source/DataTableSource.java       | 35 ++++++++++
 .../flink/source/OnEventWatermarkStrategy.java     | 57 +++++++++++++++
 .../paimon/flink/source/WatermarkAlignUtils.java   | 43 ++++++++++++
 .../org/apache/paimon/flink/WatermarkITCase.java   | 76 ++++++++++++++++++++
 9 files changed, 465 insertions(+), 2 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 5f85b7a4c..ca3108ea5 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -87,9 +87,77 @@ Users can also adjust `changelog-producer` table property to 
specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Source
+
+Streaming source behavior is only supported in Flink engine at present.
+
+### Watermark Definition
+
+You can define watermark for reading Paimon tables:
+
+```sql
+CREATE TABLE T (
+    `user` BIGINT,
+    product STRING,
+    order_time TIMESTAMP(3),
+    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
+);
+
+-- launch a bounded streaming job to read paimon_table
+SELECT window_start, window_end, SUM(f0) FROM
+ TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY 
window_start, window_end; */;
+```
+
+You can also enable [Flink Watermark 
alignment](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_),
+which will make sure no sources/splits/shards/partitions increase their 
watermarks too far ahead of the rest:
+
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>scan.watermark.alignment.group</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A group of sources to align watermarks.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.max-drift</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Maximal drift to align watermarks, before we pause consuming 
from the source/task/partition.</td>
+        </tr>
+    </tbody>
+</table>
+
+### Bounded Stream
+
 Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' 
to define the end condition for bounded streaming mode, stream reading will end 
until a larger watermark snapshot is encountered.
-{{< /hint >}}
+
+Watermark in snapshot is generated by writer, for example, you can specify a 
kafka source and declare the definition of watermark.
+When using this kafka source to write to Paimon table, the snapshots of Paimon 
table will generate the corresponding watermark,
+so that you can use the feature of bounded watermark when streaming reads of 
this Paimon table.
+
+```sql
+CREATE TABLE kafka_table (
+    `user` BIGINT,
+    product STRING,
+    order_time TIMESTAMP(3),
+    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
+) WITH ('connector' = 'kafka'...);
+
+-- launch a streaming insert job
+INSERT INTO paimon_table SELECT * FROM kakfa_table;
+
+-- launch a bounded streaming job to read paimon_table
+SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
+```
 
 ## System Tables
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9e38a144b..874f02da7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -254,6 +254,12 @@
             <td>String</td>
             <td>Define primary key by table options, cannot define primary key 
on DDL and table options at the same time.</td>
         </tr>
+        <tr>
+            <td><h5>read.batch-size</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>Read batch size for orc and parquet.</td>
+        </tr>
         <tr>
             <td><h5>scan.bounded.watermark</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index a5740c4b2..91486fd13 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,36 @@
             <td>Integer</td>
             <td>Define a custom parallelism for the scan source. By default, 
if this option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration.</td>
         </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.group</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A group of sources to align watermarks.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.max-drift</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Maximal drift to align watermarks, before we pause consuming 
from the source/task/partition.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.update-interval</h5></td>
+            <td style="word-wrap: break-word;">1 s</td>
+            <td>Duration</td>
+            <td>How often tasks should notify coordinator about the current 
watermark and how often the coordinator should announce the maximal aligned 
watermark.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.emit.strategy</h5></td>
+            <td style="word-wrap: break-word;">on-event</td>
+            <td><p>Enum</p></td>
+            <td>Emit strategy for watermark generation.<br /><br />Possible 
values:<ul><li>"on-periodic": Emit watermark periodically, interval is 
controlled by Flink 'pipeline.auto-watermark-interval'.</li><li>"on-event": 
Emit watermark per record.</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.idle-timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>If no records flow in a partition of a stream for that amount 
of time, then that partition is considered "idle" and will not hold back the 
progress of watermarks in downstream operators.</td>
+        </tr>
         <tr>
             <td><h5>sink.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
new file mode 100644
index 000000000..8247d6dd2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testWatermark() throws Exception {
+        innerTestWatermark();
+    }
+
+    @Test
+    public void testWatermarkAlignment() {
+        assertThatThrownBy(
+                        () ->
+                                innerTestWatermark(
+                                        "'scan.watermark.idle-timeout'='1s'",
+                                        
"'scan.watermark.alignment.group'='group'",
+                                        
"'scan.watermark.alignment.update-interval'='2s'",
+                                        
"'scan.watermark.alignment.max-drift'='1s',"))
+                .hasMessageContaining(
+                        "Flink 1.14 dose not support watermark alignment, 
please check your Flink version");
+    }
+
+    private void innerTestWatermark(String... options) throws Exception {
+        sql(
+                "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS 
ts) WITH ("
+                        + String.join(",", options)
+                        + " 'write-mode'='append-only')");
+
+        BlockingIterator<Row, Row> select =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT window_start, window_end, SUM(f0) FROM 
TABLE("
+                                        + "TUMBLE(TABLE T, DESCRIPTOR(ts), 
INTERVAL '10' MINUTES))\n"
+                                        + "  GROUP BY window_start, 
window_end"));
+
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')");
+
+        assertThat(select.collect(1))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                LocalDateTime.parse("2023-02-02T12:00"),
+                                LocalDateTime.parse("2023-02-02T12:10"),
+                                1));
+        select.close();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ec459d3d8..357c7a2ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -21,7 +21,9 @@ package org.apache.paimon.flink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
+import org.apache.paimon.options.description.InlineElement;
 import org.apache.paimon.options.description.TextElement;
 
 import java.lang.reflect.Field;
@@ -30,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.options.description.TextElement.text;
 
 /** Options for flink connector. */
 public class FlinkConnectorOptions {
@@ -121,6 +124,43 @@ public class FlinkConnectorOptions {
                                     + 
CoreOptions.ChangelogProducer.LOOKUP.name()
                                     + ", commit will wait for changelog 
generation by lookup.");
 
+    public static final ConfigOption<WatermarkEmitStrategy> 
SCAN_WATERMARK_EMIT_STRATEGY =
+            key("scan.watermark.emit.strategy")
+                    .enumType(WatermarkEmitStrategy.class)
+                    .defaultValue(WatermarkEmitStrategy.ON_EVENT)
+                    .withDescription("Emit strategy for watermark 
generation.");
+
+    public static final ConfigOption<String> SCAN_WATERMARK_ALIGNMENT_GROUP =
+            key("scan.watermark.alignment.group")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("A group of sources to align 
watermarks.");
+
+    public static final ConfigOption<Duration> SCAN_WATERMARK_IDLE_TIMEOUT =
+            key("scan.watermark.idle-timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "If no records flow in a partition of a stream for 
that amount of time, then"
+                                    + " that partition is considered \"idle\" 
and will not hold back the progress of"
+                                    + " watermarks in downstream operators.");
+
+    public static final ConfigOption<Duration> 
SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT =
+            key("scan.watermark.alignment.max-drift")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximal drift to align watermarks, "
+                                    + "before we pause consuming from the 
source/task/partition.");
+
+    public static final ConfigOption<Duration> 
SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL =
+            key("scan.watermark.alignment.update-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription(
+                            "How often tasks should notify coordinator about 
the current watermark "
+                                    + "and how often the coordinator should 
announce the maximal aligned watermark.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
@@ -135,4 +175,31 @@ public class FlinkConnectorOptions {
         }
         return list;
     }
+
+    /** Watermark emit strategy for scan. */
+    public enum WatermarkEmitStrategy implements DescribedEnum {
+        ON_PERIODIC(
+                "on-periodic",
+                "Emit watermark periodically, interval is controlled by Flink 
'pipeline.auto-watermark-interval'."),
+
+        ON_EVENT("on-event", "Emit watermark per record.");
+
+        private final String value;
+        private final String description;
+
+        WatermarkEmitStrategy(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index b7a533f8d..65030d91e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.LogConsistency;
 import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
 import org.apache.paimon.flink.PaimonDataStreamScanProvider;
 import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
@@ -46,12 +47,18 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
 
 /**
  * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
@@ -156,6 +163,34 @@ public class DataTableSource extends FlinkTableSource
                     logStoreTableFactory.createSourceProvider(context, 
scanContext, projectFields);
         }
 
+        WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
+        Options options = table.options().toConfiguration();
+        if (watermarkStrategy != null) {
+            WatermarkEmitStrategy emitStrategy = 
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
+            if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
+                watermarkStrategy = new 
OnEventWatermarkStrategy(watermarkStrategy);
+            }
+            Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
+            if (idleTimeout != null) {
+                watermarkStrategy = 
watermarkStrategy.withIdleness(idleTimeout);
+            }
+            String watermarkAlignGroup = 
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
+            if (watermarkAlignGroup != null) {
+                try {
+                    watermarkStrategy =
+                            WatermarkAlignUtils.withWatermarkAlignment(
+                                    watermarkStrategy,
+                                    watermarkAlignGroup,
+                                    
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
+                                    
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
+                } catch (NoSuchMethodError error) {
+                    throw new RuntimeException(
+                            "Flink 1.14 dose not support watermark alignment, 
please check your Flink version.",
+                            error);
+                }
+            }
+        }
+
         FlinkSourceBuilder sourceBuilder =
                 new FlinkSourceBuilder(tableIdentifier, table)
                         .withContinuousMode(streaming)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
new file mode 100644
index 000000000..6ad2a3920
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/OnEventWatermarkStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.data.RowData;
+
+/** Paimon {@link WatermarkStrategy} to emit watermark on event. */
+public class OnEventWatermarkStrategy implements WatermarkStrategy<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final WatermarkStrategy<RowData> strategy;
+
+    public OnEventWatermarkStrategy(WatermarkStrategy<RowData> strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public WatermarkGenerator<RowData> createWatermarkGenerator(
+            WatermarkGeneratorSupplier.Context context) {
+        WatermarkGenerator<RowData> generator = 
strategy.createWatermarkGenerator(context);
+        return new WatermarkGenerator<RowData>() {
+
+            @Override
+            public void onEvent(RowData event, long eventTimestamp, 
WatermarkOutput output) {
+                generator.onEvent(event, eventTimestamp, output);
+                generator.onPeriodicEmit(output);
+            }
+
+            @Override
+            public void onPeriodicEmit(WatermarkOutput output) {
+                // for idle watermark
+                generator.onPeriodicEmit(output);
+            }
+        };
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
new file mode 100644
index 000000000..84639aa76
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/WatermarkAlignUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.data.RowData;
+
+import java.time.Duration;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Since Flink 1.15, watermark alignment is supported. */
+public class WatermarkAlignUtils {
+
+    public static WatermarkStrategy<RowData> withWatermarkAlignment(
+            WatermarkStrategy<RowData> strategy,
+            String group,
+            Duration drift,
+            Duration updateInterval) {
+        checkArgument(
+                drift != null,
+                String.format(
+                        "Watermark alignment max drift can not be null when 
group (%s) configured.",
+                        group));
+        return strategy.withWatermarkAlignment(group, drift, updateInterval);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
new file mode 100644
index 000000000..51cf834fe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testWatermark() throws Exception {
+        innerTestWatermark();
+    }
+
+    @Test
+    public void testWatermarkAlignment() throws Exception {
+        innerTestWatermark(
+                "'scan.watermark.idle-timeout'='1s'",
+                "'scan.watermark.alignment.group'='group'",
+                "'scan.watermark.alignment.update-interval'='2s'",
+                "'scan.watermark.alignment.max-drift'='1s',");
+    }
+
+    private void innerTestWatermark(String... options) throws Exception {
+        sql(
+                "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS 
ts) WITH ("
+                        + String.join(",", options)
+                        + " 'write-mode'='append-only')");
+
+        BlockingIterator<Row, Row> select =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT window_start, window_end, SUM(f0) FROM 
TABLE("
+                                        + "TUMBLE(TABLE T, DESCRIPTOR(ts), 
INTERVAL '10' MINUTES))\n"
+                                        + "  GROUP BY window_start, 
window_end;"));
+
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')");
+
+        assertThat(select.collect(1))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                LocalDateTime.parse("2023-02-02T12:00"),
+                                LocalDateTime.parse("2023-02-02T12:10"),
+                                1));
+        select.close();
+    }
+}

Reply via email to