[ 
https://issues.apache.org/jira/browse/FLINK-39037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39037:
-----------------------------------
    Labels: pull-request-available  (was: )

> support query kafka cluster id as metadata in records of 
> DynamicKafkaTableSource
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-39037
>                 URL: https://issues.apache.org/jira/browse/FLINK-39037
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-4.0.1
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kafka-4.1.0
>
>
> Problem:
> static kafka source table support querying metadata like topic, partition, 
> etc, to identify source of a record  
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]
> to identify source of a record in a dynamic kafka source where multiple 
> clusters are involve, it needs to query kafka cluster id in addition to 
> topic, partition, etc
> Goal:
> expose kafka cluster id in record of DynamicKafkaSource
> How:
> Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via 
> Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it 
> part of ConsumerRecord — that’s a Kafka client class we don’t control, and it 
> doesn’t have extension points.
> Alternatives and trade‑offs
> 1/ Wrap ConsumerRecord in a custom type
>     Pros: clean access to cluster id everywhere.
>     Cons: huge refactor; all deserializers and downstream APIs expect 
> ConsumerRecord<byte[],byte[]>. You’d break compatibility with 
> KafkaSourceReader and Flink internals.
> 2/ Inject cluster id into headers
>     Pros: no new interface.
>     Cons: pollutes actual Kafka headers (not present on the wire), breaks 
> semantics, and could collide with real headers.
> 3/ Side‑channel
>     Pros: smaller changes compared to above 2 approaches, opt‑in, no API 
> breakage; only dynamic source uses it.
>     Cons: adds an internal interface and cloning logic. The interface exists 
> only because cluster id is not part of the Kafka record, while 
> topic/partition/offset already are.
>  
> Conclusion
> Weighing the pros and cons, we prefer option3 side channel. We have developed 
> solution and battle tested it. This is to contribute it back



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to