This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 8fffeb6  [FLINK-24488][connectors/kafka] Fix 
KafkaRecordSerializationSchemaBuilder does not forward timestamp
8fffeb6 is described below

commit 8fffeb653587913f487776531b857cc5b8f5b54f
Author: xmarker <[email protected]>
AuthorDate: Wed Oct 13 23:16:33 2021 +0800

    [FLINK-24488][connectors/kafka] Fix KafkaRecordSerializationSchemaBuilder 
does not forward timestamp
---
 .../KafkaRecordSerializationSchemaBuilder.java     |  6 ++++-
 .../KafkaRecordSerializationSchemaBuilderTest.java | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index e2d0d29..cea6ace 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -326,7 +326,11 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
                             : OptionalInt.empty();
 
             return new ProducerRecord<>(
-                    targetTopic, partition.isPresent() ? partition.getAsInt() 
: null, key, value);
+                    targetTopic,
+                    partition.isPresent() ? partition.getAsInt() : null,
+                    timestamp == null || timestamp < 0L ? null : timestamp,
+                    key,
+                    value);
         }
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 80c92d3..eb822ea 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -197,6 +197,32 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
         assertEquals("a", deserializer.deserialize(DEFAULT_TOPIC, 
record.value()));
     }
 
+    @Test
+    public void testSerializeRecordWithTimestamp() {
+        final SerializationSchema<String> serializationSchema = new 
SimpleStringSchema();
+        final KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopic(DEFAULT_TOPIC)
+                        .setValueSerializationSchema(serializationSchema)
+                        .setKeySerializationSchema(serializationSchema)
+                        .build();
+        final ProducerRecord<byte[], byte[]> recordWithTimestamp =
+                schema.serialize("a", null, 100L);
+        assertEquals(100L, (long) recordWithTimestamp.timestamp());
+
+        final ProducerRecord<byte[], byte[]> recordWithTimestampZero =
+                schema.serialize("a", null, 0L);
+        assertEquals(0L, (long) recordWithTimestampZero.timestamp());
+
+        final ProducerRecord<byte[], byte[]> recordWithoutTimestamp =
+                schema.serialize("a", null, null);
+        assertNull(recordWithoutTimestamp.timestamp());
+
+        final ProducerRecord<byte[], byte[]> recordWithInvalidTimestamp =
+                schema.serialize("a", null, -100L);
+        assertNull(recordWithInvalidTimestamp.timestamp());
+    }
+
     private static void assertOnlyOneSerializerAllowed(
             List<
                             Function<

Reply via email to