[ 
https://issues.apache.org/jira/browse/FLINK-29361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607561#comment-17607561
 ] 

Chesnay Schepler edited comment on FLINK-29361 at 9/21/22 7:16 AM:
-------------------------------------------------------------------

You can do the same thing using the KafkaRecordSerializationSchema of the 
KafkaSink (setRecordSerializer on the builder).


was (Author: zentol):
You can do the same thing using the KafkaRecordSerializationSchema of the 
KafkaSink.

> How to set headers with the new Flink KafkaSink
> -----------------------------------------------
>
>                 Key: FLINK-29361
>                 URL: https://issues.apache.org/jira/browse/FLINK-29361
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Xin Hao
>            Priority: Minor
>
> I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems 
> that it's not possible to add Kafka record headers.
> I think we should add this feature or document it if we already have it.
>  
> Below code is what we can do with FlinkKafkaProducer and ProducerRecord
> {code:java}
> public class SomeKafkaSerializationSchema<T extends SpecificRecordBase>  
> implements KafkaSerializationSchema<T> {
>   ...
>   @Override  public ProducerRecord<byte[], byte[]> serialize(T t, Long ts) {
>     ...    
>     var record = ProducerRecord<byte[], byte[]>(topic, some_bytes_a);
>     record.headers().add("id", some_bytes_b);    
>     return record;  
>   }}
> ...
> var producer = new FlinkKafkaProducer<>(
>   topic,  
>   new SomeKafkaSerializationSchema<>(...),  
>   producerProps,  
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
> );
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to