[
https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jing Ge reassigned FLINK-31049:
-------------------------------
Assignee: Alex Gout
> Add support for Kafka record headers to KafkaSink
> -------------------------------------------------
>
> Key: FLINK-31049
> URL: https://issues.apache.org/jira/browse/FLINK-31049
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Alex Gout
> Assignee: Alex Gout
> Priority: Minor
> Labels: KafkaSink
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> The default org.apache.flink.connector.kafka.sink.KafkaSink does not support
> adding Kafka record headers. In some implementations, downstream consumers
> might rely on Kafka record headers being set.
>
> A way to add Headers would be to create a custom
> KafkaRecordSerializationSchema and inject that into the KafkaSink.
> However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for
> convenience and allows a more usable approach of creating a KafkaSink without
> having to deal with details like the RecordProducer directly. This builder
> does not support adding record headers.
> This is where I think it should be added.
> The code responsible for creating the Kafka record involves
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> where the RecordProducer is created.
> It is relatively simple to add support for record headers by adding a
> "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key
> and value serializers and using the appropriate RecordProducer constructor.
>
> The issue was discussed
> [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro].
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)