[
https://issues.apache.org/jira/browse/FLINK-39792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084039#comment-18084039
]
david radley edited comment on FLINK-39792 at 5/28/26 9:07 AM:
---------------------------------------------------------------
[~arvid] this sounds like a good idea. I was looking around and it looks like
the header value is always a string. Can we deprecate then create an amended
version of the existing map and add 2 new ones using STRING instead of BYTE?
This seems more accurate. Or is there a reason why we need BYTE here?
was (Author: JIRAUSER300523):
[~arvid] this sounds like a good idea. I was looking around and it looks like
the header value is always a string. Can we deprecate the amend the existing
map and add 2 new ones using STRING instead of BYTE? This seems more accurate.
Or is there a reason why we need BYTE here?
> Add multi-headers metadata key to Kafka connector
> -------------------------------------------------
>
> Key: FLINK-39792
> URL: https://issues.apache.org/jira/browse/FLINK-39792
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Arvid Heise
> Assignee: Arvid Heise
> Priority: Major
>
> Problem:
> The Kafka wire format models headers as an ordered, duplicate-key-allowed
> iterable ({{{}Headers extends Iterable<Header>{}}}), but the existing headers
> metadata key uses {{{}MAP<STRING, BYTES>{}}}, which cannot faithfully
> represent this: when multiple headers share a key, iteration into a
> {{HashMap}} silently discards all but the last value, losing both duplicates
> and insertion order. This makes the current metadata key unsuitable for use
> cases that rely on repeated header keys or order-sensitive header processing.
> Solution:
> Add a new metadata key {{multi-headers}} with type {{ARRAY<ROW<name STRING,
> value BYTES>>}} to {{KafkaDynamicSource}} and {{{}KafkaDynamicSink{}}}. On
> the source side, {{record.headers()}} is iterated in wire order into the
> array, preserving duplicates and position.
> On the sink side, the array is iterated and each entry is appended via
> {{{}record.headers().add(name, value){}}}. The existing ??headers?? key
> ({{{}MAP<STRING, BYTES>{}}}) is kept unchanged and soft-deprecated — no
> breaking change; users opt in to the new key at their own pace.
> If both, new and old header keys are used, we throw an error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)