This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new c9f8aa96 [hotfix] [flink-connector-kafka] Simplify ProducerRecord
instantiation w.r.t. headers
c9f8aa96 is described below
commit c9f8aa960aae4aa157152a9f69a828f7ac7690dd
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Apr 12 07:51:21 2023 -0700
[hotfix] [flink-connector-kafka] Simplify ProducerRecord instantiation
w.r.t. headers
---
.../KafkaRecordSerializationSchemaBuilder.java | 23 +++++++---------------
1 file changed, 7 insertions(+), 16 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 1cc92201..34cf6ef0 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -347,22 +347,13 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
context.getPartitionsForTopic(targetTopic)))
: OptionalInt.empty();
- if (headerProvider != null) {
- return new ProducerRecord<>(
- targetTopic,
- partition.isPresent() ? partition.getAsInt() : null,
- timestamp == null || timestamp < 0L ? null : timestamp,
- key,
- value,
- headerProvider.getHeaders(element));
- } else {
- return new ProducerRecord<>(
- targetTopic,
- partition.isPresent() ? partition.getAsInt() : null,
- timestamp == null || timestamp < 0L ? null : timestamp,
- key,
- value);
- }
+ return new ProducerRecord<>(
+ targetTopic,
+ partition.isPresent() ? partition.getAsInt() : null,
+ timestamp == null || timestamp < 0L ? null : timestamp,
+ key,
+ value,
+ headerProvider != null ?
headerProvider.getHeaders(element) : null);
}
}
}