xiechenling created FLINK-32725: ----------------------------------- Summary: Add option to control writing of timestamp to Kafka topic in KafkaRecordSerializationSchema.builder Key: FLINK-32725 URL: https://issues.apache.org/jira/browse/FLINK-32725 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0 Environment: flink 1.16.2 Reporter: xiechenling
In the older versions of Kafka sink for Flink, it was possible to configure whether the message timestamp should be written to Kafka. This was achievable using the method `FlinkKafkaProducer.setWriteTimestampToKafka(true)`. However, in the newer versions of Kafka sink, when using `KafkaRecordSerializationSchema.builder()`, the message timestamp is automatically written to the Kafka topic using the context's timestamp. {code:scala} KafkaSink ... .setRecordSerializer(KafkaRecordSerializationSchema.builder() ... .build() {code} If a user wishes to exclude the timestamp from being written to Kafka, they currently need to create a custom `KafkaRecordSerializationSchema` by extending it and overriding the `serialize` method. {code:scala} KafkaSink.builder[(String, String)]() .setBootstrapServers(kafkaAddress) .setRecordSerializer((element: (String, String), context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long) => { new ProducerRecord(sinkTopic, element._1.getBytes, element._2.getBytes) }) {code} I propose adding a new method, similar to `setWriteTimestampToKafka`, to `KafkaRecordSerializationSchema.builder()`, which allows users to control whether the timestamp should be included in the output to the Kafka topic. This would provide a more straightforward and consistent approach for users who do not want the timestamp to be written to Kafka. Thank you for considering this enhancement. -- This message was sent by Atlassian Jira (v8.20.10#820010)