[
https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-31049:
-----------------------------------
Labels: KafkaSink pull-request-available stale-assigned (was: KafkaSink
pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issue is assigned but has not
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a
comment updating the community on your progress. If this issue is waiting on
feedback, please consider this a reminder to the committer/reviewer. Flink is a
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone
else may work on it.
> Add support for Kafka record headers to KafkaSink
> -------------------------------------------------
>
> Key: FLINK-31049
> URL: https://issues.apache.org/jira/browse/FLINK-31049
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Alex Gout
> Assignee: Alex Gout
> Priority: Minor
> Labels: KafkaSink, pull-request-available, stale-assigned
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> The default org.apache.flink.connector.kafka.sink.KafkaSink does not support
> adding Kafka record headers. In some implementations, downstream consumers
> might rely on Kafka record headers being set.
>
> A way to add Headers would be to create a custom
> KafkaRecordSerializationSchema and inject that into the KafkaSink.
> However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for
> convenience and allows a more usable approach of creating a KafkaSink without
> having to deal with details like the RecordProducer directly. This builder
> does not support adding record headers.
> This is where I think it should be added.
> The code responsible for creating the Kafka record involves
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> where the RecordProducer is created.
> It is relatively simple to add support for record headers by adding a
> "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key
> and value serializers and using the appropriate RecordProducer constructor.
>
> The issue was discussed
> [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro].
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)