This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new c9f8aa96 [hotfix] [flink-connector-kafka] Simplify ProducerRecord 
instantiation w.r.t. headers
c9f8aa96 is described below

commit c9f8aa960aae4aa157152a9f69a828f7ac7690dd
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Apr 12 07:51:21 2023 -0700

    [hotfix] [flink-connector-kafka] Simplify ProducerRecord instantiation 
w.r.t. headers
---
 .../KafkaRecordSerializationSchemaBuilder.java     | 23 +++++++---------------
 1 file changed, 7 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 1cc92201..34cf6ef0 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -347,22 +347,13 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
                                             
context.getPartitionsForTopic(targetTopic)))
                             : OptionalInt.empty();
 
-            if (headerProvider != null) {
-                return new ProducerRecord<>(
-                        targetTopic,
-                        partition.isPresent() ? partition.getAsInt() : null,
-                        timestamp == null || timestamp < 0L ? null : timestamp,
-                        key,
-                        value,
-                        headerProvider.getHeaders(element));
-            } else {
-                return new ProducerRecord<>(
-                        targetTopic,
-                        partition.isPresent() ? partition.getAsInt() : null,
-                        timestamp == null || timestamp < 0L ? null : timestamp,
-                        key,
-                        value);
-            }
+            return new ProducerRecord<>(
+                    targetTopic,
+                    partition.isPresent() ? partition.getAsInt() : null,
+                    timestamp == null || timestamp < 0L ? null : timestamp,
+                    key,
+                    value,
+                    headerProvider != null ? 
headerProvider.getHeaders(element) : null);
         }
     }
 }

Reply via email to