[ https://issues.apache.org/jira/browse/FLINK-30598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xintong Song updated FLINK-30598: --------------------------------- Fix Version/s: 1.18.0 (was: 1.17.0) > Wrong code generated for WatermarkGenerator because of inconsistent source > type info when deserialized from exec plan > --------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-30598 > URL: https://issues.apache.org/jira/browse/FLINK-30598 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.0 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Fix For: 1.18.0 > > > When compile from an exist exec plan which contains watermark declaration and > it referred the metadata column, the generated code for WatermarkGenerator > maybe wrong > because currently `DynamicTableSourceSpec`.getTableSource passes the user > defined schema to `SourceAbilitySpec` to perform optimization like > projection/watermark pushdown, while optimization path from sql use a fixed > reorder form: "PHYSICAL COLUMNS + METADATA COLUMNS", this may cause the > problem. > a repro-case: > {code} > @Test > public void testWatermarkPushDownWithMetadata() throws Exception { > // to verify FLINK-: the case declares metadata field first, without > fix it will get a > // wrong code generated by WatermarkGeneratorCodeGenerator which > reference the incorrect > // varchar column as the watermark field. > createTestValuesSourceTable( > "MyTable", > JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()), > new String[] { > "ts timestamp(3) metadata", > "a int", > "b bigint", > "c varchar", > "watermark for ts as ts - interval '5' second" > }, > new HashMap<String, String>() { > { > put("enable-watermark-push-down", "true"); > put("readable-metadata", "ts:timestamp(3)"); > } > }); > File sinkPath = > createTestCsvSinkTable( > "MySink", "a int", "b bigint", "c varchar", "ts > timestamp(3)"); > compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from > MyTable where b = 3") > .await(); > assertResult( > Arrays.asList( > "4,3,Hello world, how are you?," + > toLocalDateTime(4000L), > "5,3,I am fine.," + toLocalDateTime(5000L), > "6,3,Luke Skywalker," + toLocalDateTime(6000L)), > sinkPath); > } > {code} > the wrong code snippet(`row.getString(3)` should be a TimestampData): > {code} > public Long currentWatermark(org.apache.flink.table.data.RowData row) throws > Exception { > > org.apache.flink.table.data.binary.BinaryStringData field$19; > boolean isNull$19; > org.apache.flink.table.data.binary.BinaryStringData field$21; > boolean isNull$22; > org.apache.flink.table.data.TimestampData result$23; > boolean isNull$24; > org.apache.flink.table.data.TimestampData result$25; > > isNull$19 = row.isNullAt(3); > field$19 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$19) { > field$19 = ((org.apache.flink.table.data.binary.BinaryStringData) > row.getString(3)); > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)