This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a30deb4f6d2 [fix][io][kca] kafka headers silently dropped (#25325)
a30deb4f6d2 is described below
commit a30deb4f6d22a66ec153a64d8d0d2a97a674d36d
Author: Enrique Fernández <[email protected]>
AuthorDate: Mon Mar 16 09:11:38 2026 +0100
[fix][io][kca] kafka headers silently dropped (#25325)
---
.../kafka/connect/AbstractKafkaConnectSource.java | 5 ++--
.../io/kafka/connect/KafkaConnectSource.java | 15 +++++++++++
.../io/kafka/connect/KafkaConnectSourceTest.java | 29 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index e048ddf7244..6ed9e7891b2 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -288,7 +288,6 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
public abstract AbstractKafkaSourceRecord<T>
processSourceRecord(SourceRecord srcRecord);
- private static final Map<String, String> PROPERTIES =
Collections.emptyMap();
private static final Optional<Long> RECORD_SEQUENCE = Optional.empty();
public abstract class AbstractKafkaSourceRecord<T> implements Record {
@@ -331,9 +330,11 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
return RECORD_SEQUENCE;
}
+ Map<String, String> properties = Collections.emptyMap();
+
@Override
public Map<String, String> getProperties() {
- return PROPERTIES;
+ return properties;
}
public boolean isEmpty() {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 3d5d76d4230..e8a77b6de45 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
import io.confluent.connect.avro.AvroData;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -240,6 +241,20 @@ public class KafkaConnectSource extends
AbstractKafkaConnectSource<KeyValue<byte
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(",")));
this.partitionIndex =
Optional.ofNullable(srcRecord.kafkaPartition());
+
+ // Propagate Kafka Connect record headers as Pulsar message
properties.
+ if (srcRecord.headers() != null && !srcRecord.headers().isEmpty())
{
+ Map<String, String> headerProperties = new HashMap<>();
+ for (var h : srcRecord.headers()) {
+ Object val = h.value();
+ if (val != null) {
+ headerProperties.put(h.key(), val instanceof byte[]
+ ? Base64.getEncoder().encodeToString((byte[])
val)
+ : val.toString());
+ }
+ }
+ this.properties =
Collections.unmodifiableMap(headerProperties);
+ }
}
@Override
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 5d414b6373c..8b264695905 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -300,6 +300,35 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
runTransformTest(config, false);
}
+ @Test
+ void testHeadersPropagatedAsProperties() throws Exception {
+ Map<String, Object> config = getConfig();
+ config.put(TaskConfig.TASK_CLASS_CONFIG,
"org.apache.kafka.connect.file.FileStreamSourceTask");
+ config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG,
"default-tenant/default-ns");
+
+ kafkaConnectSource = new KafkaConnectSource();
+ kafkaConnectSource.open(config, context);
+
+ Map<String, Object> sourcePartition = new HashMap<>();
+ Map<String, Object> sourceOffset = new HashMap<>();
+ sourcePartition.put("test", "test");
+ sourceOffset.put("test", 0);
+ SourceRecord srcRecord = new SourceRecord(
+ sourcePartition, sourceOffset, topicName, null,
+ null, null, null, "value"
+ );
+ srcRecord.headers().addString("event-type", "order.created");
+ srcRecord.headers().addString("event-id", "abc-123");
+ srcRecord.headers().addLong("event-version", 42L);
+
+ KafkaSourceRecord record =
kafkaConnectSource.processSourceRecord(srcRecord);
+
+ assertEquals("order.created",
record.getProperties().get("event-type"));
+ assertEquals("abc-123", record.getProperties().get("event-id"));
+ assertEquals("42", record.getProperties().get("event-version"));
+ assertEquals(3, record.getProperties().size());
+ }
+
@Test
void testShortTopicNames() throws Exception {
Map<String, Object> config = getConfig();