[
https://issues.apache.org/jira/browse/FLINK-39037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bowen Li updated FLINK-39037:
-----------------------------
Description:
Problem:
static kafka source table support querying metadata like topic, partition, etc,
to identify source of a record
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]
to identify source of a record in a dynamic kafka source where multiple
clusters are involve, it needs to query kafka cluster id in addition to topic,
partition, etc
Goal:
expose kafka cluster id in record of DynamicKafkaSource
How:
Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via
Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it
part of ConsumerRecord — that’s a Kafka client class we don’t control, and it
doesn’t have extension points.
Alternatives and trade‑offs
1/ Wrap ConsumerRecord in a custom type
Pros: clean access to cluster id everywhere.
Cons: huge refactor; all deserializers and downstream APIs expect
ConsumerRecord<byte[],byte[]>. You’d break compatibility with KafkaSourceReader
and Flink internals.
2/ Inject cluster id into headers
Pros: no new interface.
Cons: pollutes actual Kafka headers (not present on the wire), breaks
semantics, and could collide with real headers.
3/ Side‑channel
Pros: smaller changes compared to above 2 approaches, opt‑in, no API
breakage; only dynamic source uses it.
Cons: adds an internal interface and cloning logic. The interface exists
only because cluster id is not part of the Kafka record, while
topic/partition/offset already are.
We have developed solution via option3 side channel, and battle tested it. This
is to contribute it back
> support query kafka cluster id as metadata in records of
> DynamicKafkaTableSource
> --------------------------------------------------------------------------------
>
> Key: FLINK-39037
> URL: https://issues.apache.org/jira/browse/FLINK-39037
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: kafka-4.0.1
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Fix For: kafka-4.1.0
>
>
> Problem:
> static kafka source table support querying metadata like topic, partition,
> etc, to identify source of a record
> [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata]
> to identify source of a record in a dynamic kafka source where multiple
> clusters are involve, it needs to query kafka cluster id in addition to
> topic, partition, etc
> Goal:
> expose kafka cluster id in record of DynamicKafkaSource
> How:
> Unlike topic/partition/offset are already inside the Kafka ConsumerRecord via
> Kafka Client APIs, Cluster id is not part of ConsumerRecord; We can’t make it
> part of ConsumerRecord — that’s a Kafka client class we don’t control, and it
> doesn’t have extension points.
> Alternatives and trade‑offs
> 1/ Wrap ConsumerRecord in a custom type
> Pros: clean access to cluster id everywhere.
> Cons: huge refactor; all deserializers and downstream APIs expect
> ConsumerRecord<byte[],byte[]>. You’d break compatibility with
> KafkaSourceReader and Flink internals.
> 2/ Inject cluster id into headers
> Pros: no new interface.
> Cons: pollutes actual Kafka headers (not present on the wire), breaks
> semantics, and could collide with real headers.
> 3/ Side‑channel
> Pros: smaller changes compared to above 2 approaches, opt‑in, no API
> breakage; only dynamic source uses it.
> Cons: adds an internal interface and cloning logic. The interface exists
> only because cluster id is not part of the Kafka record, while
> topic/partition/offset already are.
> We have developed solution via option3 side channel, and battle tested it.
> This is to contribute it back
--
This message was sent by Atlassian Jira
(v8.20.10#820010)