xiechenling created FLINK-32725:
-----------------------------------

             Summary: Add option to control writing of timestamp to Kafka topic 
in KafkaRecordSerializationSchema.builder
                 Key: FLINK-32725
                 URL: https://issues.apache.org/jira/browse/FLINK-32725
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.14.0
         Environment: flink 1.16.2
            Reporter: xiechenling


In the older versions of Kafka sink for Flink, it was possible to configure 
whether the message timestamp should be written to Kafka. This was achievable 
using the method `FlinkKafkaProducer.setWriteTimestampToKafka(true)`.

However, in the newer versions of Kafka sink, when using 
`KafkaRecordSerializationSchema.builder()`, the message timestamp is 
automatically written to the Kafka topic using the context's timestamp.

{code:scala}
KafkaSink
...
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
...
.build()
{code}


If a user wishes to exclude the timestamp from being written to Kafka, they 
currently need to create a custom `KafkaRecordSerializationSchema` by extending 
it and overriding the `serialize` method.

{code:scala}
KafkaSink.builder[(String, String)]()
.setBootstrapServers(kafkaAddress)
.setRecordSerializer((element: (String, String), context: 
KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long) => {
new ProducerRecord(sinkTopic, element._1.getBytes, element._2.getBytes)
})
{code}

I propose adding a new method, similar to `setWriteTimestampToKafka`, to 
`KafkaRecordSerializationSchema.builder()`, which allows users to control 
whether the timestamp should be included in the output to the Kafka topic. This 
would provide a more straightforward and consistent approach for users who do 
not want the timestamp to be written to Kafka.

Thank you for considering this enhancement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to