[
https://issues.apache.org/jira/browse/FLINK-39792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39792:
-----------------------------------
Labels: pull-request-available (was: )
> Add multi-headers metadata key to Kafka connector
> -------------------------------------------------
>
> Key: FLINK-39792
> URL: https://issues.apache.org/jira/browse/FLINK-39792
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Arvid Heise
> Assignee: Arvid Heise
> Priority: Major
> Labels: pull-request-available
>
> Problem:
> The Kafka wire format models headers as an ordered, duplicate-key-allowed
> iterable ({{{}Headers extends Iterable<Header>{}}}), but the existing headers
> metadata key uses {{{}MAP<STRING, BYTES>{}}}, which cannot faithfully
> represent this: when multiple headers share a key, iteration into a
> {{HashMap}} silently discards all but the last value, losing both duplicates
> and insertion order. This makes the current metadata key unsuitable for use
> cases that rely on repeated header keys or order-sensitive header processing.
> Solution:
> Add a new metadata key {{multi-headers}} with type {{ARRAY<ROW<name STRING,
> value BYTES>>}} to {{KafkaDynamicSource}} and {{{}KafkaDynamicSink{}}}. On
> the source side, {{record.headers()}} is iterated in wire order into the
> array, preserving duplicates and position.
> On the sink side, the array is iterated and each entry is appended via
> {{{}record.headers().add(name, value){}}}. The existing ??headers?? key
> ({{{}MAP<STRING, BYTES>{}}}) is kept unchanged and soft-deprecated — no
> breaking change; users opt in to the new key at their own pace.
> If both, new and old header keys are used, we throw an error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)