This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 8fffeb6 [FLINK-24488][connectors/kafka] Fix
KafkaRecordSerializationSchemaBuilder does not forward timestamp
8fffeb6 is described below
commit 8fffeb653587913f487776531b857cc5b8f5b54f
Author: xmarker <[email protected]>
AuthorDate: Wed Oct 13 23:16:33 2021 +0800
[FLINK-24488][connectors/kafka] Fix KafkaRecordSerializationSchemaBuilder
does not forward timestamp
---
.../KafkaRecordSerializationSchemaBuilder.java | 6 ++++-
.../KafkaRecordSerializationSchemaBuilderTest.java | 26 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index e2d0d29..cea6ace 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -326,7 +326,11 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
: OptionalInt.empty();
return new ProducerRecord<>(
- targetTopic, partition.isPresent() ? partition.getAsInt()
: null, key, value);
+ targetTopic,
+ partition.isPresent() ? partition.getAsInt() : null,
+ timestamp == null || timestamp < 0L ? null : timestamp,
+ key,
+ value);
}
}
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 80c92d3..eb822ea 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -197,6 +197,32 @@ public class KafkaRecordSerializationSchemaBuilderTest
extends TestLogger {
assertEquals("a", deserializer.deserialize(DEFAULT_TOPIC,
record.value()));
}
+ @Test
+ public void testSerializeRecordWithTimestamp() {
+ final SerializationSchema<String> serializationSchema = new
SimpleStringSchema();
+ final KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(DEFAULT_TOPIC)
+ .setValueSerializationSchema(serializationSchema)
+ .setKeySerializationSchema(serializationSchema)
+ .build();
+ final ProducerRecord<byte[], byte[]> recordWithTimestamp =
+ schema.serialize("a", null, 100L);
+ assertEquals(100L, (long) recordWithTimestamp.timestamp());
+
+ final ProducerRecord<byte[], byte[]> recordWithTimestampZero =
+ schema.serialize("a", null, 0L);
+ assertEquals(0L, (long) recordWithTimestampZero.timestamp());
+
+ final ProducerRecord<byte[], byte[]> recordWithoutTimestamp =
+ schema.serialize("a", null, null);
+ assertNull(recordWithoutTimestamp.timestamp());
+
+ final ProducerRecord<byte[], byte[]> recordWithInvalidTimestamp =
+ schema.serialize("a", null, -100L);
+ assertNull(recordWithInvalidTimestamp.timestamp());
+ }
+
private static void assertOnlyOneSerializerAllowed(
List<
Function<