This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7bd979bb4ff KAFKA-20173: Propagate headers into serde 3/N (#21681)
7bd979bb4ff is described below

commit 7bd979bb4ff842b90ecde4ae8eb6580544f303a9
Author: Uladzislau Blok <[email protected]>
AuthorDate: Mon Mar 9 04:35:04 2026 +0100

    KAFKA-20173: Propagate headers into serde 3/N (#21681)
    
    Follow-up for:
    - https://github.com/apache/kafka/pull/21490
    - https://github.com/apache/kafka/pull/21536
    
    This PR fixes headers propagation in `ListSerde`.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 checkstyle/import-control.xml                      |  1 +
 .../common/serialization/ListDeserializer.java     |  9 ++++++-
 .../kafka/common/serialization/ListSerializer.java |  9 ++++++-
 .../common/serialization/ListDeserializerTest.java | 28 ++++++++++++++++++++++
 .../common/serialization/ListSerializerTest.java   | 27 +++++++++++++++++++++
 5 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 86659a1856d..a160d34c04a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -196,6 +196,7 @@
     <subpackage name="serialization">
       <allow pkg="org.apache.kafka.clients" />
       <allow class="org.apache.kafka.common.header.Headers" />
+      <allow class="org.apache.kafka.common.header.internals.RecordHeaders" />
     </subpackage>
 
     <subpackage name="utils">
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
index 4fe1313bda7..77ff3e68419 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes.ListSerde;
 import org.apache.kafka.common.utils.Utils;
 
@@ -160,6 +162,11 @@ public class ListDeserializer<Inner> implements 
Deserializer<List<Inner>> {
 
     @Override
     public List<Inner> deserialize(String topic, byte[] data) {
+        return deserialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public List<Inner> deserialize(String topic, Headers headers, byte[] data) 
{
         if (data == null) {
             return null;
         }
@@ -184,7 +191,7 @@ public class ListDeserializer<Inner> implements 
Deserializer<List<Inner>> {
                     log.trace("Deserialized list so far: {}", 
deserializedList); // avoid logging actual data above TRACE level since it may 
contain sensitive information
                     throw new SerializationException("End of the stream was 
reached prematurely");
                 }
-                deserializedList.add(inner.deserialize(topic, payload));
+                deserializedList.add(inner.deserialize(topic, headers, 
payload));
             }
             return deserializedList;
         } catch (IOException e) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
index 99c6ef5ce85..d56d9414d72 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Utils;
 
 import org.slf4j.Logger;
@@ -107,6 +109,11 @@ public class ListSerializer<Inner> implements 
Serializer<List<Inner>> {
 
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
+        return serialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serialize(String topic, Headers headers, List<Inner> data) {
         if (data == null) {
             return null;
         }
@@ -125,7 +132,7 @@ public class ListSerializer<Inner> implements 
Serializer<List<Inner>> {
                         out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE);
                     }
                 } else {
-                    final byte[] bytes = inner.serialize(topic, entry);
+                    final byte[] bytes = inner.serialize(topic, headers, 
entry);
                     if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
                         out.writeInt(bytes.length);
                     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
index 2bfb7a86334..a3f4bec36e3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
@@ -19,17 +19,27 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @SuppressWarnings("unchecked")
 public class ListDeserializerTest {
@@ -249,4 +259,22 @@ public class ListDeserializerTest {
         assertEquals("List deserializer was already initialized using a 
non-default constructor", exception.getMessage());
     }
 
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
+
+        final String topic = "topic";
+        final List<String> data = List.of("test-value");
+        final byte[] serializedData = new 
ListSerializer<>(Serdes.String().serializer()).serialize(topic, data);
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, mockDeserializer);
+
+        testDeserializer.deserialize(topic, headers, serializedData);
+
+        verify(mockDeserializer).deserialize(eq(topic), eq(headers), 
any(byte[].class));
+        verify(mockDeserializer, never()).deserialize(anyString(), 
any(byte[].class));
+    }
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
index 4ebaafbecea..e9984a8a7a8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
@@ -19,16 +19,26 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class ListSerializerTest {
@@ -151,4 +161,21 @@ public class ListSerializerTest {
         assertEquals("List serializer was already initialized using a 
non-default constructor", exception.getMessage());
     }
 
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
+
+        final String topic = "topic";
+        final List<String> data = List.of("test-key");
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final ListSerializer<String> testSerializer = new 
ListSerializer<>(mockSerializer);
+
+        testSerializer.serialize(topic, headers, data);
+
+        verify(mockSerializer).serialize(eq(topic), eq(headers), 
eq("test-key"));
+        verify(mockSerializer, never()).serialize(anyString(), anyString());
+    }
+
 }

Reply via email to