This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4234e6ae5ac [FLINK-37640][table-planner] Fix timestamp_ltz is 
converted timestamp in scan reuse (#26430)
4234e6ae5ac is described below

commit 4234e6ae5ac695304a19e8b8bab49e8086e4c08b
Author: XuShuai <[email protected]>
AuthorDate: Tue May 13 09:52:00 2025 +0800

    [FLINK-37640][table-planner] Fix timestamp_ltz is converted timestamp in 
scan reuse (#26430)
---
 .../table/planner/plan/reuse/ScanReuserUtils.java  | 16 ++++++++++-
 .../table/planner/plan/optimize/ScanReuseTest.java | 32 ++++++++++++++++++++++
 .../table/planner/plan/optimize/ScanReuseTest.xml  | 28 +++++++++++++++++++
 3 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
index b034a3ffee2..6b2518d2d22 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalT
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampKind;
@@ -203,7 +204,20 @@ public class ScanReuserUtils {
                 String name = newFieldNames.get(i);
                 LogicalType type = newSourceType.getTypeAt(i);
                 if (name.equals(rowtimeColumn)) {
-                    type = new TimestampType(type.isNullable(), 
TimestampKind.ROWTIME, 3);
+                    if (type instanceof LocalZonedTimestampType) {
+                        type =
+                                new LocalZonedTimestampType(
+                                        type.isNullable(), 
TimestampKind.ROWTIME, 3);
+                    } else if (type instanceof TimestampType) {
+                        type = new TimestampType(type.isNullable(), 
TimestampKind.ROWTIME, 3);
+                    } else {
+                        throw new TableException(
+                                String.format(
+                                        "Watermark only supports 
LocalZonedTimestampType and TimestampType "
+                                                + "while current type %s is 
not supported by watermark. "
+                                                + "This should not happen.",
+                                        type.asSummaryString()));
+                    }
                 }
                 fields.add(new RowType.RowField(name, type));
             }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
index aa9bb818b3e..9a4a777f448 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
@@ -444,4 +444,36 @@ class ScanReuseTest extends TableTestBase {
         stmt.addInsertSql("INSERT INTO snk2 select `id` from src");
         util.verifyExecPlan(stmt);
     }
+
+    @TestTemplate
+    void testWatermarkPushDownWithTimestampChanged() {
+        assumeThat(isStreaming).isTrue();
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE MyTableWatermark (\n"
+                                + " a bigint,\n"
+                                + " b int,\n"
+                                + " ts_ltz timestamp_ltz(3),\n"
+                                + " WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL 
'10' MINUTE"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'enable-watermark-push-down' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE MySinkTs (\n"
+                                + "a bigint, "
+                                + "ts timestamp_ltz(3)"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'table-sink-class' = 'DEFAULT')");
+
+        StatementSet stmt = util.tableEnv().createStatementSet();
+        stmt.addInsertSql("INSERT INTO MySinkTs SELECT a, ts_ltz FROM 
MyTableWatermark");
+        stmt.addInsertSql("INSERT INTO MySinkTs SELECT b, ts_ltz FROM 
MyTableWatermark");
+        util.verifyExecPlan(stmt);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index c93963b7ddb..16138fc4fcc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -1379,4 +1379,32 @@ Sink(table=[default_catalog.default_database.snk2], 
fields=[id])
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testWatermarkPushDownWithTimestampChanged[isStreaming: 
true]">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MySinkTs], fields=[a, 
ts_ltz])
++- LogicalProject(a=[$0], ts_ltz=[$2])
+   +- LogicalWatermarkAssigner(rowtime=[ts_ltz], watermark=[-($2, 
600000:INTERVAL MINUTE)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTableWatermark]])
+
+LogicalSink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts])
++- LogicalProject(a=[CAST($1):BIGINT], 
ts=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
+   +- LogicalWatermarkAssigner(rowtime=[ts_ltz], watermark=[-($2, 
600000:INTERVAL MINUTE)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTableWatermark]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTableWatermark, 
project=[a, b, ts_ltz], metadata=[], watermark=[-(ts_ltz, 600000:INTERVAL 
MINUTE)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, 
ts_ltz])(reuse_id=[1])
+
+Sink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts_ltz])
++- Calc(select=[a, ts_ltz])
+   +- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts])
++- Calc(select=[CAST(b AS BIGINT) AS a, CAST(ts_ltz AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
 </Root>

Reply via email to