[ 
https://issues.apache.org/jira/browse/FLINK-39792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084039#comment-18084039
 ] 

david radley edited comment on FLINK-39792 at 5/28/26 9:07 AM:
---------------------------------------------------------------

[~arvid] this sounds like a good idea. I was looking around and it looks like 
the header value is always a string. Can we deprecate then create an amended 
version of the existing map and add 2 new ones using STRING instead of BYTE? 
This seems more accurate. Or is there a reason why we need BYTE here? 


was (Author: JIRAUSER300523):
[~arvid] this sounds like a good idea. I was looking around and it looks like 
the header value is always a string. Can we deprecate the amend the existing 
map and add 2 new ones using STRING instead of BYTE? This seems more accurate. 
Or is there a reason why we need BYTE here? 

> Add multi-headers metadata key to Kafka connector
> -------------------------------------------------
>
>                 Key: FLINK-39792
>                 URL: https://issues.apache.org/jira/browse/FLINK-39792
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>
> Problem:
> The Kafka wire format models headers as an ordered, duplicate-key-allowed 
> iterable ({{{}Headers extends Iterable<Header>{}}}), but the existing headers 
> metadata key uses {{{}MAP<STRING, BYTES>{}}}, which cannot faithfully 
> represent this: when multiple headers share a key, iteration into a 
> {{HashMap}} silently discards all but the last value, losing both duplicates 
> and insertion order. This makes the current metadata key unsuitable for use 
> cases that rely on repeated header keys or order-sensitive header processing.
> Solution:
> Add a new metadata key {{multi-headers}} with type {{ARRAY<ROW<name STRING, 
> value BYTES>>}} to {{KafkaDynamicSource}} and {{{}KafkaDynamicSink{}}}. On 
> the source side, {{record.headers()}} is iterated in wire order into the 
> array, preserving duplicates and position.
> On the sink side, the array is iterated and each entry is appended via 
> {{{}record.headers().add(name, value){}}}. The existing ??headers?? key 
> ({{{}MAP<STRING, BYTES>{}}}) is kept unchanged and soft-deprecated — no 
> breaking change; users opt in to the new key  at their own pace.
> If both, new and old header keys are used, we throw an error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to