[
https://issues.apache.org/jira/browse/FLINK-30598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xintong Song updated FLINK-30598:
---------------------------------
Fix Version/s: 1.18.0
(was: 1.17.0)
> Wrong code generated for WatermarkGenerator because of inconsistent source
> type info when deserialized from exec plan
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30598
> URL: https://issues.apache.org/jira/browse/FLINK-30598
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Fix For: 1.18.0
>
>
> When compile from an exist exec plan which contains watermark declaration and
> it referred the metadata column, the generated code for WatermarkGenerator
> maybe wrong
> because currently `DynamicTableSourceSpec`.getTableSource passes the user
> defined schema to `SourceAbilitySpec` to perform optimization like
> projection/watermark pushdown, while optimization path from sql use a fixed
> reorder form: "PHYSICAL COLUMNS + METADATA COLUMNS", this may cause the
> problem.
> a repro-case:
> {code}
> @Test
> public void testWatermarkPushDownWithMetadata() throws Exception {
> // to verify FLINK-: the case declares metadata field first, without
> fix it will get a
> // wrong code generated by WatermarkGeneratorCodeGenerator which
> reference the incorrect
> // varchar column as the watermark field.
> createTestValuesSourceTable(
> "MyTable",
> JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
> new String[] {
> "ts timestamp(3) metadata",
> "a int",
> "b bigint",
> "c varchar",
> "watermark for ts as ts - interval '5' second"
> },
> new HashMap<String, String>() {
> {
> put("enable-watermark-push-down", "true");
> put("readable-metadata", "ts:timestamp(3)");
> }
> });
> File sinkPath =
> createTestCsvSinkTable(
> "MySink", "a int", "b bigint", "c varchar", "ts
> timestamp(3)");
> compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from
> MyTable where b = 3")
> .await();
> assertResult(
> Arrays.asList(
> "4,3,Hello world, how are you?," +
> toLocalDateTime(4000L),
> "5,3,I am fine.," + toLocalDateTime(5000L),
> "6,3,Luke Skywalker," + toLocalDateTime(6000L)),
> sinkPath);
> }
> {code}
> the wrong code snippet(`row.getString(3)` should be a TimestampData):
> {code}
> public Long currentWatermark(org.apache.flink.table.data.RowData row) throws
> Exception {
>
> org.apache.flink.table.data.binary.BinaryStringData field$19;
> boolean isNull$19;
> org.apache.flink.table.data.binary.BinaryStringData field$21;
> boolean isNull$22;
> org.apache.flink.table.data.TimestampData result$23;
> boolean isNull$24;
> org.apache.flink.table.data.TimestampData result$25;
>
> isNull$19 = row.isNullAt(3);
> field$19 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$19) {
> field$19 = ((org.apache.flink.table.data.binary.BinaryStringData)
> row.getString(3));
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)