[ 
https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-31049:
-----------------------------------
    Labels: KafkaSink pull-request-available stale-assigned  (was: KafkaSink 
pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Add support for Kafka record headers to KafkaSink
> -------------------------------------------------
>
>                 Key: FLINK-31049
>                 URL: https://issues.apache.org/jira/browse/FLINK-31049
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Alex Gout
>            Assignee: Alex Gout
>            Priority: Minor
>              Labels: KafkaSink, pull-request-available, stale-assigned
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> The default org.apache.flink.connector.kafka.sink.KafkaSink does not support 
> adding Kafka record headers. In some implementations, downstream consumers 
> might rely on Kafka record headers being set.
>  
> A way to add Headers would be to create a custom 
> KafkaRecordSerializationSchema and inject that into the KafkaSink.
> However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for 
> convenience and allows a more usable approach of creating a KafkaSink without 
> having to deal with details like the RecordProducer directly. This builder 
> does not support adding record headers.
> This is where I think it should be added.
> The code responsible for creating the Kafka record involves  
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper 
> where the RecordProducer is created. 
> It is relatively simple to add support for record headers by adding a 
> "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key 
> and value serializers and using the appropriate RecordProducer constructor.
>  
> The issue was discussed 
> [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to