This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4234e6ae5ac [FLINK-37640][table-planner] Fix timestamp_ltz is
converted timestamp in scan reuse (#26430)
4234e6ae5ac is described below
commit 4234e6ae5ac695304a19e8b8bab49e8086e4c08b
Author: XuShuai <[email protected]>
AuthorDate: Tue May 13 09:52:00 2025 +0800
[FLINK-37640][table-planner] Fix timestamp_ltz is converted timestamp in
scan reuse (#26430)
---
.../table/planner/plan/reuse/ScanReuserUtils.java | 16 ++++++++++-
.../table/planner/plan/optimize/ScanReuseTest.java | 32 ++++++++++++++++++++++
.../table/planner/plan/optimize/ScanReuseTest.xml | 28 +++++++++++++++++++
3 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
index b034a3ffee2..6b2518d2d22 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalT
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampKind;
@@ -203,7 +204,20 @@ public class ScanReuserUtils {
String name = newFieldNames.get(i);
LogicalType type = newSourceType.getTypeAt(i);
if (name.equals(rowtimeColumn)) {
- type = new TimestampType(type.isNullable(),
TimestampKind.ROWTIME, 3);
+ if (type instanceof LocalZonedTimestampType) {
+ type =
+ new LocalZonedTimestampType(
+ type.isNullable(),
TimestampKind.ROWTIME, 3);
+ } else if (type instanceof TimestampType) {
+ type = new TimestampType(type.isNullable(),
TimestampKind.ROWTIME, 3);
+ } else {
+ throw new TableException(
+ String.format(
+ "Watermark only supports
LocalZonedTimestampType and TimestampType "
+ + "while current type %s is
not supported by watermark. "
+ + "This should not happen.",
+ type.asSummaryString()));
+ }
}
fields.add(new RowType.RowField(name, type));
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
index aa9bb818b3e..9a4a777f448 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
@@ -444,4 +444,36 @@ class ScanReuseTest extends TableTestBase {
stmt.addInsertSql("INSERT INTO snk2 select `id` from src");
util.verifyExecPlan(stmt);
}
+
+ @TestTemplate
+ void testWatermarkPushDownWithTimestampChanged() {
+ assumeThat(isStreaming).isTrue();
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE MyTableWatermark (\n"
+ + " a bigint,\n"
+ + " b int,\n"
+ + " ts_ltz timestamp_ltz(3),\n"
+ + " WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL
'10' MINUTE"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'enable-watermark-push-down' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE MySinkTs (\n"
+ + "a bigint, "
+ + "ts timestamp_ltz(3)"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')");
+
+ StatementSet stmt = util.tableEnv().createStatementSet();
+ stmt.addInsertSql("INSERT INTO MySinkTs SELECT a, ts_ltz FROM
MyTableWatermark");
+ stmt.addInsertSql("INSERT INTO MySinkTs SELECT b, ts_ltz FROM
MyTableWatermark");
+ util.verifyExecPlan(stmt);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index c93963b7ddb..16138fc4fcc 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -1379,4 +1379,32 @@ Sink(table=[default_catalog.default_database.snk2],
fields=[id])
]]>
</Resource>
</TestCase>
+ <TestCase name="testWatermarkPushDownWithTimestampChanged[isStreaming:
true]">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MySinkTs], fields=[a,
ts_ltz])
++- LogicalProject(a=[$0], ts_ltz=[$2])
+ +- LogicalWatermarkAssigner(rowtime=[ts_ltz], watermark=[-($2,
600000:INTERVAL MINUTE)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTableWatermark]])
+
+LogicalSink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts])
++- LogicalProject(a=[CAST($1):BIGINT],
ts=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
+ +- LogicalWatermarkAssigner(rowtime=[ts_ltz], watermark=[-($2,
600000:INTERVAL MINUTE)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTableWatermark]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTableWatermark,
project=[a, b, ts_ltz], metadata=[], watermark=[-(ts_ltz, 600000:INTERVAL
MINUTE)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b,
ts_ltz])(reuse_id=[1])
+
+Sink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts_ltz])
++- Calc(select=[a, ts_ltz])
+ +- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.MySinkTs], fields=[a, ts])
++- Calc(select=[CAST(b AS BIGINT) AS a, CAST(ts_ltz AS
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
</Root>