[ 
https://issues.apache.org/jira/browse/FLINK-32725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888181#comment-17888181
 ] 

xiechenling commented on FLINK-32725:
-------------------------------------

[~renqs] Hello, can you help me check if this can be done?

> Add option to control writing of timestamp to Kafka topic in 
> KafkaRecordSerializationSchema.builder
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32725
>                 URL: https://issues.apache.org/jira/browse/FLINK-32725
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>         Environment: flink 1.16.2
>            Reporter: xiechenling
>            Priority: Major
>
> In the older versions of Kafka sink for Flink, it was possible to configure 
> whether the message timestamp should be written to Kafka. This was achievable 
> using the method `FlinkKafkaProducer.setWriteTimestampToKafka(true)`.
> However, in the newer versions of Kafka sink, when using 
> `KafkaRecordSerializationSchema.builder()`, the message timestamp is 
> automatically written to the Kafka topic using the context's timestamp.
> {code:scala}
> KafkaSink
> ...
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> ...
> .build()
> {code}
> If a user wishes to exclude the timestamp from being written to Kafka, they 
> currently need to create a custom `KafkaRecordSerializationSchema` by 
> extending it and overriding the `serialize` method.
> {code:scala}
> KafkaSink.builder[(String, String)]()
> .setBootstrapServers(kafkaAddress)
> .setRecordSerializer((element: (String, String), context: 
> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long) => {
> new ProducerRecord(sinkTopic, element._1.getBytes, element._2.getBytes)
> })
> {code}
> I propose adding a new method, similar to `setWriteTimestampToKafka`, to 
> `KafkaRecordSerializationSchema.builder()`, which allows users to control 
> whether the timestamp should be included in the output to the Kafka topic. 
> This would provide a more straightforward and consistent approach for users 
> who do not want the timestamp to be written to Kafka.
> Thank you for considering this enhancement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to