Yanquan Lv created FLINK-36688:
----------------------------------

             Summary: table.optimizer.reuse-source-enabled may cause disordered 
metadata columns when reading from Kafka.  
                 Key: FLINK-36688
                 URL: https://issues.apache.org/jira/browse/FLINK-36688
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.0-preview, 1.19.1, 1.20.0
            Reporter: Yanquan Lv


Metadata columns in Kafka need to maintain a fixed order: The metadata for 
format needs to be at the beginning, while the metadata for Kafka 
itself(partition/offset and so on) needs to be at the end. Kafka connector will 
add fields of format first, and then add fields of Kafka later.
However, reused Source did not maintain this order, witch may cause 
ClassCastException.

How to product:

{code:java}
create temporary table `message_channel_task_record`
(
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL
,`partition` INT METADATA VIRTUAL
,`offset` BIGINT METADATA VIRTUAL
,id BIGINT comment '自增ID'
,PRIMARY KEY (`id`) NOT ENFORCED
)
with (
'connector'='kafka'
xxx
)
;


create temporary table `sink`
(
origin_ts TIMESTAMP(3)
,`partition` INT
,`offset` BIGINT
,id BIGINT
)
WITH (
'connector'='print'
)
;


create temporary table `sr_sink`
(
id BIGINT comment '自增ID'
)
WITH (
'connector'='print'
)
;


-- EXPLAIN STATEMENT SET BEGIN


BEGIN STATEMENT SET;


INSERT INTO sink
SELECT
origin_ts
,`partition`
,`offset`
,id
FROM message_channel_task_record
;


INSERT INTO `sr_sink`
SELECT
id
FROM `message_channel_task_record`
;


END
;
 {code}
Explained plan:

{code:java}
 [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, 
project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], 
fields=[id, partition, origin_ts, offset])
:- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, 
CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, id])
:  +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, 
offset, id])
   +- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code}

Expected metadata column order is: value.ingestion-timestamp(format), 
partition, offset;
The actual metadata column order is: partition, 
value.ingestion-timestamp(format), offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to