This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7059ee555c5 [FLINK-37025] Fix generating watermarks in SQL on-periodic 
(#25921)
7059ee555c5 is described below

commit 7059ee555c5093e3b233876ac3d84ad693023df8
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Jan 9 11:00:07 2025 +0100

    [FLINK-37025] Fix generating watermarks in SQL on-periodic (#25921)
---
 .../runtime/stream/sql/WatermarkITCase.java        | 108 +++++++++++++++++++++
 .../GeneratedWatermarkGeneratorSupplier.java       |   6 +-
 2 files changed, 112 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java
new file mode 100644
index 00000000000..4f7530b27f4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests for verifying watermarks behaviour. */
+class WatermarkITCase extends StreamingTestBase {
+
+    @Test
+    void testWatermarkNotMovingBack() {
+        List<Row> data =
+                Arrays.asList(
+                        Row.of(1, LocalDateTime.parse("2024-01-01T00:00:00")),
+                        Row.of(3, LocalDateTime.parse("2024-01-03T00:00:00")),
+                        Row.of(2, LocalDateTime.parse("2024-01-02T00:00:00")));
+
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        final String ddl =
+                String.format(
+                        "CREATE Table VirtualTable (\n"
+                                + "  a INT,\n"
+                                + "  c TIMESTAMP(3),\n"
+                                + "  WATERMARK FOR c as c\n"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false',\n"
+                                + "  'scan.watermark.emit.strategy' = 
'on-periodic',\n"
+                                + "  'enable-watermark-push-down' = 'true',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ")\n",
+                        dataId);
+
+        tEnv().executeSql(ddl);
+        tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
+        String query = "SELECT a, c, current_watermark(c) FROM VirtualTable 
order by c";
+
+        final List<Row> result = 
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect());
+        final List<String> actualWatermarks =
+                
TestValuesTableFactory.getWatermarkOutput("VirtualTable").stream()
+                        .map(
+                                x ->
+                                        
TimestampData.fromEpochMillis(x.getTimestamp())
+                                                .toLocalDateTime()
+                                                .toString())
+                        .collect(Collectors.toList());
+
+        // Underneath, we use FromElementSourceFunctionWithWatermark which is 
a SourceFunction.
+        // SourceFunction does not support watermark moving back. 
SourceStreamTask does not support
+        // WatermarkGenerator natively. The test implementation calls
+        // WatermarkGenerator#onPeriodicEmit
+        // after each record, which makes the test deterministic.
+        // Additionally, the GeneratedWatermarkGeneratorSupplier does not 
deduplicate already
+        // emitted
+        // watermarks. This is usually handled by the target WatermarkOutput. 
In this test, we do
+        // not deduplicate watermarks because we use TestValuesWatermarkOutput.
+        // Given the fact watermarks are generated after every record and we 
don't deduplicate them,
+        // we have "2024-01-03T00:00" twice in the expected watermarks.
+        assertThat(actualWatermarks)
+                .containsExactly("2024-01-01T00:00", "2024-01-03T00:00", 
"2024-01-03T00:00");
+        assertThat(result)
+                .containsExactly(
+                        Row.of(
+                                1,
+                                LocalDateTime.parse("2024-01-01T00:00"),
+                                LocalDateTime.parse("2024-01-01T00:00")),
+                        Row.of(
+                                2,
+                                LocalDateTime.parse("2024-01-02T00:00"),
+                                LocalDateTime.parse("2024-01-03T00:00")),
+                        Row.of(
+                                3,
+                                LocalDateTime.parse("2024-01-03T00:00"),
+                                LocalDateTime.parse("2024-01-03T00:00")));
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
index acc60591b55..70bd8fa0240 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.watermark.WatermarkParams;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -84,7 +85,8 @@ public class GeneratedWatermarkGeneratorSupplier implements 
WatermarkGeneratorSu
 
     /** Wrapper of the code-generated {@link WatermarkGenerator}. */
     public static class DefaultWatermarkGenerator
-            implements 
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> {
+            implements 
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>,
+                    Serializable {
         private static final long serialVersionUID = 1L;
 
         private final WatermarkGenerator innerWatermarkGenerator;
@@ -102,7 +104,7 @@ public class GeneratedWatermarkGeneratorSupplier implements 
WatermarkGeneratorSu
         public void onEvent(RowData event, long eventTimestamp, 
WatermarkOutput output) {
             try {
                 Long watermark = 
innerWatermarkGenerator.currentWatermark(event);
-                if (watermark != null) {
+                if (watermark != null && watermark > currentWatermark) {
                     currentWatermark = watermark;
                     if (watermarkEmitStrategy.isOnEvent()) {
                         output.emitWatermark(new Watermark(currentWatermark));

Reply via email to