This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a30deb4f6d2 [fix][io][kca] kafka headers silently dropped (#25325)
a30deb4f6d2 is described below

commit a30deb4f6d22a66ec153a64d8d0d2a97a674d36d
Author: Enrique Fernández <[email protected]>
AuthorDate: Mon Mar 16 09:11:38 2026 +0100

    [fix][io][kca] kafka headers silently dropped (#25325)
---
 .../kafka/connect/AbstractKafkaConnectSource.java  |  5 ++--
 .../io/kafka/connect/KafkaConnectSource.java       | 15 +++++++++++
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 29 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index e048ddf7244..6ed9e7891b2 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -288,7 +288,6 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
     public abstract AbstractKafkaSourceRecord<T> 
processSourceRecord(SourceRecord srcRecord);
 
-    private static final Map<String, String> PROPERTIES = 
Collections.emptyMap();
     private static final Optional<Long> RECORD_SEQUENCE = Optional.empty();
 
     public abstract class AbstractKafkaSourceRecord<T> implements Record {
@@ -331,9 +330,11 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             return RECORD_SEQUENCE;
         }
 
+        Map<String, String> properties = Collections.emptyMap();
+
         @Override
         public Map<String, String> getProperties() {
-            return PROPERTIES;
+            return properties;
         }
 
         public boolean isEmpty() {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 3d5d76d4230..e8a77b6de45 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
 import io.confluent.connect.avro.AvroData;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -240,6 +241,20 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
                 .map(e -> e.getKey() + "=" + e.getValue())
                 .collect(Collectors.joining(",")));
             this.partitionIndex = 
Optional.ofNullable(srcRecord.kafkaPartition());
+
+            // Propagate Kafka Connect record headers as Pulsar message 
properties.
+            if (srcRecord.headers() != null && !srcRecord.headers().isEmpty()) 
{
+                Map<String, String> headerProperties = new HashMap<>();
+                for (var h : srcRecord.headers()) {
+                    Object val = h.value();
+                    if (val != null) {
+                        headerProperties.put(h.key(), val instanceof byte[]
+                                ? Base64.getEncoder().encodeToString((byte[]) 
val)
+                                : val.toString());
+                    }
+                }
+                this.properties = 
Collections.unmodifiableMap(headerProperties);
+            }
         }
 
         @Override
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 5d414b6373c..8b264695905 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -300,6 +300,35 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase {
         runTransformTest(config, false);
     }
 
+    @Test
+    void testHeadersPropagatedAsProperties() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, 
"default-tenant/default-ns");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        Map<String, Object> sourcePartition = new HashMap<>();
+        Map<String, Object> sourceOffset = new HashMap<>();
+        sourcePartition.put("test", "test");
+        sourceOffset.put("test", 0);
+        SourceRecord srcRecord = new SourceRecord(
+                sourcePartition, sourceOffset, topicName, null,
+                null, null, null, "value"
+        );
+        srcRecord.headers().addString("event-type", "order.created");
+        srcRecord.headers().addString("event-id", "abc-123");
+        srcRecord.headers().addLong("event-version", 42L);
+
+        KafkaSourceRecord record = 
kafkaConnectSource.processSourceRecord(srcRecord);
+
+        assertEquals("order.created", 
record.getProperties().get("event-type"));
+        assertEquals("abc-123", record.getProperties().get("event-id"));
+        assertEquals("42", record.getProperties().get("event-version"));
+        assertEquals(3, record.getProperties().size());
+    }
+
     @Test
     void testShortTopicNames() throws Exception {
         Map<String, Object> config = getConfig();

Reply via email to