Yanquan Lv created FLINK-36688: ---------------------------------- Summary: table.optimizer.reuse-source-enabled may cause disordered metadata columns when reading from Kafka. Key: FLINK-36688 URL: https://issues.apache.org/jira/browse/FLINK-36688 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 2.0-preview, 1.19.1, 1.20.0 Reporter: Yanquan Lv
Metadata columns in Kafka need to maintain a fixed order: The metadata for format needs to be at the beginning, while the metadata for Kafka itself(partition/offset and so on) needs to be at the end. Kafka connector will add fields of format first, and then add fields of Kafka later. However, reused Source did not maintain this order, witch may cause ClassCastException. How to product: {code:java} create temporary table `message_channel_task_record` ( origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL ,`partition` INT METADATA VIRTUAL ,`offset` BIGINT METADATA VIRTUAL ,id BIGINT comment '自增ID' ,PRIMARY KEY (`id`) NOT ENFORCED ) with ( 'connector'='kafka' xxx ) ; create temporary table `sink` ( origin_ts TIMESTAMP(3) ,`partition` INT ,`offset` BIGINT ,id BIGINT ) WITH ( 'connector'='print' ) ; create temporary table `sr_sink` ( id BIGINT comment '自增ID' ) WITH ( 'connector'='print' ) ; -- EXPLAIN STATEMENT SET BEGIN BEGIN STATEMENT SET; INSERT INTO sink SELECT origin_ts ,`partition` ,`offset` ,id FROM message_channel_task_record ; INSERT INTO `sr_sink` SELECT id FROM `message_channel_task_record` ; END ; {code} Explained plan: {code:java} [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], fields=[id, partition, origin_ts, offset]) :- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, id]) : +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, offset, id]) +- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code} Expected metadata column order is: value.ingestion-timestamp(format), partition, offset; The actual metadata column order is: partition, value.ingestion-timestamp(format), offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)