[
https://issues.apache.org/jira/browse/FLINK-29361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607561#comment-17607561
]
Chesnay Schepler edited comment on FLINK-29361 at 9/21/22 7:16 AM:
-------------------------------------------------------------------
You can do the same thing using the KafkaRecordSerializationSchema of the
KafkaSink (setRecordSerializer on the builder).
was (Author: zentol):
You can do the same thing using the KafkaRecordSerializationSchema of the
KafkaSink.
> How to set headers with the new Flink KafkaSink
> -----------------------------------------------
>
> Key: FLINK-29361
> URL: https://issues.apache.org/jira/browse/FLINK-29361
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Xin Hao
> Priority: Minor
>
> I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems
> that it's not possible to add Kafka record headers.
> I think we should add this feature or document it if we already have it.
>
> Below code is what we can do with FlinkKafkaProducer and ProducerRecord
> {code:java}
> public class SomeKafkaSerializationSchema<T extends SpecificRecordBase>
> implements KafkaSerializationSchema<T> {
> ...
> @Override public ProducerRecord<byte[], byte[]> serialize(T t, Long ts) {
> ...
> var record = ProducerRecord<byte[], byte[]>(topic, some_bytes_a);
> record.headers().add("id", some_bytes_b);
> return record;
> }}
> ...
> var producer = new FlinkKafkaProducer<>(
> topic,
> new SomeKafkaSerializationSchema<>(...),
> producerProps,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
> );
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)