[ 
https://issues.apache.org/jira/browse/FLINK-39037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-39037:
-----------------------------
    Description: 
Problem:

static kafka source table support querying metadata like topic, partition, etc, 
to identify source of a record  
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]

to identify source of a record in a dynamic kafka source where multiple 
clusters are involve, it needs to query kafka cluster id in addition to topic, 
partition, etc

Goal:

expose kafka cluster id in record of DynamicKafkaSource

How:

Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via 
Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it 
part of ConsumerRecord — that’s a Kafka client class we don’t control, and it 
doesn’t have extension points.

Alternatives and trade‑offs

1/ Wrap ConsumerRecord in a custom type

    Pros: clean access to cluster id everywhere.
    Cons: huge refactor; all deserializers and downstream APIs expect 
ConsumerRecord<byte[],byte[]>. You’d break compatibility with KafkaSourceReader 
and Flink internals.

2/ Inject cluster id into headers

    Pros: no new interface.
    Cons: pollutes actual Kafka headers (not present on the wire), breaks 
semantics, and could collide with real headers.

3/ Side‑channel

    Pros: smaller changes compared to above 2 approaches, opt‑in, no API 
breakage; only dynamic source uses it.
    Cons: adds an internal interface and cloning logic. The interface exists 
only because cluster id is not part of the Kafka record, while 
topic/partition/offset already are.

 

Conclusion

Weighing the pros and cons, we prefer option3 side channel. We have developed 
solution and battle tested it. This is to contribute it back

  was:
Problem:

static kafka source table support querying metadata like topic, partition, etc, 
to identify source of a record  
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]

to identify source of a record in a dynamic kafka source where multiple 
clusters are involve, it needs to query kafka cluster id in addition to topic, 
partition, etc

Goal:

expose kafka cluster id in record of DynamicKafkaSource

How:

Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via 
Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it 
part of ConsumerRecord — that’s a Kafka client class we don’t control, and it 
doesn’t have extension points.

Alternatives and trade‑offs

1/ Wrap ConsumerRecord in a custom type

    Pros: clean access to cluster id everywhere.
    Cons: huge refactor; all deserializers and downstream APIs expect 
ConsumerRecord<byte[],byte[]>. You’d break compatibility with KafkaSourceReader 
and Flink internals.

2/ Inject cluster id into headers

    Pros: no new interface.
    Cons: pollutes actual Kafka headers (not present on the wire), breaks 
semantics, and could collide with real headers.

3/ Side‑channel

    Pros: smaller changes compared to above 2 approaches, opt‑in, no API 
breakage; only dynamic source uses it.
    Cons: adds an internal interface and cloning logic. The interface exists 
only because cluster id is not part of the Kafka record, while 
topic/partition/offset already are.


We have developed solution via option3 side channel, and battle tested it. This 
is to contribute it back


> support query kafka cluster id as metadata in records of 
> DynamicKafkaTableSource
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-39037
>                 URL: https://issues.apache.org/jira/browse/FLINK-39037
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-4.0.1
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: kafka-4.1.0
>
>
> Problem:
> static kafka source table support querying metadata like topic, partition, 
> etc, to identify source of a record  
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]
> to identify source of a record in a dynamic kafka source where multiple 
> clusters are involve, it needs to query kafka cluster id in addition to 
> topic, partition, etc
> Goal:
> expose kafka cluster id in record of DynamicKafkaSource
> How:
> Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via 
> Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it 
> part of ConsumerRecord — that’s a Kafka client class we don’t control, and it 
> doesn’t have extension points.
> Alternatives and trade‑offs
> 1/ Wrap ConsumerRecord in a custom type
>     Pros: clean access to cluster id everywhere.
>     Cons: huge refactor; all deserializers and downstream APIs expect 
> ConsumerRecord<byte[],byte[]>. You’d break compatibility with 
> KafkaSourceReader and Flink internals.
> 2/ Inject cluster id into headers
>     Pros: no new interface.
>     Cons: pollutes actual Kafka headers (not present on the wire), breaks 
> semantics, and could collide with real headers.
> 3/ Side‑channel
>     Pros: smaller changes compared to above 2 approaches, opt‑in, no API 
> breakage; only dynamic source uses it.
>     Cons: adds an internal interface and cloning logic. The interface exists 
> only because cluster id is not part of the Kafka record, while 
> topic/partition/offset already are.
>  
> Conclusion
> Weighing the pros and cons, we prefer option3 side channel. We have developed 
> solution and battle tested it. This is to contribute it back



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to