This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 448338cab5e KAFKA-20173: Propagate headers into serde 1/N (#21490)
448338cab5e is described below

commit 448338cab5ebd87614b022336efc190d5af0e9ec
Author: Uladzislau Blok <[email protected]>
AuthorDate: Wed Feb 18 12:06:09 2026 +0100

    KAFKA-20173: Propagate headers into serde 1/N (#21490)
    
    Propagate headers into serializes / deserializers
    
    This PR partially covers `org.apache.kafka.streams.kstream` package.
    
    Code:
    - Use empty headers for backward compatibility
    - Propagate headers when it's possible
    
    Tests:
    - Use empty headers for existing tests
    - Add new test if needed
    
    Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
     <[email protected]>, TengYao Chi <[email protected]>
    
    ---------
    
    Co-authored-by: Matthias J. Sax <[email protected]>
---
 .../kstream/SessionWindowedDeserializer.java       |  9 ++++++-
 .../streams/kstream/SessionWindowedSerializer.java |  9 ++++++-
 .../state/internals/MeteredSessionStore.java       |  2 +-
 .../state/internals/PrefixedSessionKeySchemas.java | 19 +++++++++-----
 .../streams/state/internals/SessionKeySchema.java  | 13 +++++++---
 .../kstream/SessionWindowedDeserializerTest.java   | 29 +++++++++++++++++++++
 .../kstream/SessionWindowedSerializerTest.java     | 27 +++++++++++++++++++
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  8 +++---
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  4 +--
 .../state/internals/SessionKeySchemaTest.java      | 30 ++++++++++++----------
 10 files changed, 117 insertions(+), 33 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
index 11795459c9c..8e3dc8663e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Utils;
@@ -87,6 +89,11 @@ public class SessionWindowedDeserializer<T> implements 
Deserializer<Windowed<T>>
 
     @Override
     public Windowed<T> deserialize(final String topic, final byte[] data) {
+        return deserialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public Windowed<T> deserialize(final String topic, final Headers headers, 
final byte[] data) {
         WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
 
         if (data == null || data.length == 0) {
@@ -94,7 +101,7 @@ public class SessionWindowedDeserializer<T> implements 
Deserializer<Windowed<T>>
         }
 
         // for either key or value, their schema is the same hence we will 
just use session key schema
-        return SessionKeySchema.from(data, inner, topic);
+        return SessionKeySchema.from(data, inner, headers, topic);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 60e2b81d497..50edd86f95d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -87,13 +89,18 @@ public class SessionWindowedSerializer<T> implements 
WindowedSerializer<T> {
 
     @Override
     public byte[] serialize(final String topic, final Windowed<T> data) {
+        return serialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Headers headers, final 
Windowed<T> data) {
         WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
 
         if (data == null) {
             return null;
         }
         // for either key or value, their schema is the same hence we will 
just use session key schema
-        return SessionKeySchema.toBinary(data, inner, topic);
+        return SessionKeySchema.toBinary(data, inner, headers, topic);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index f5281a04f6b..9b578ee4b26 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -159,7 +159,7 @@ public class MeteredSessionStore<K, V>
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) 
wrapped).setFlushListener(
                 record -> listener.apply(
-                    record.withKey(SessionKeySchema.from(record.key(), 
serdes.keyDeserializer(), serdes.topic()))
+                    record.withKey(SessionKeySchema.from(record.key(), 
serdes.keyDeserializer(), record.headers(), serdes.topic()))
                         .withValue(new Change<>(
                             record.value().newValue != null ? 
serdes.valueFrom(record.value().newValue) : null,
                             record.value().oldValue != null ? 
serdes.valueFrom(record.value().oldValue) : null,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index b54b83b08f0..6f3f8cc6603 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -153,8 +154,9 @@ public class PrefixedSessionKeySchemas {
 
         private static <K> K extractKey(final byte[] binaryKey,
                                         final Deserializer<K> deserializer,
+                                        final Headers headers,
                                         final String topic) {
-            return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+            return deserializer.deserialize(topic, headers, 
extractKeyBytes(binaryKey));
         }
 
         static byte[] extractKeyBytes(final byte[] binaryKey) {
@@ -178,16 +180,18 @@ public class PrefixedSessionKeySchemas {
 
         public static <K> Windowed<K> from(final byte[] binaryKey,
                                            final Deserializer<K> 
keyDeserializer,
+                                           final Headers headers,
                                            final String topic) {
-            final K key = extractKey(binaryKey, keyDeserializer, topic);
+            final K key = extractKey(binaryKey, keyDeserializer, headers, 
topic);
             final Window window = extractWindow(binaryKey);
             return new Windowed<>(key, window);
         }
 
         public static <K> byte[] toBinary(final Windowed<K> sessionKey,
                                           final Serializer<K> serializer,
+                                          final Headers headers,
                                           final String topic) {
-            final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+            final byte[] bytes = serializer.serialize(topic, headers, 
sessionKey.key());
             return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), 
sessionKey.window().end()).get();
         }
 
@@ -323,14 +327,16 @@ public class PrefixedSessionKeySchemas {
 
         private static <K> K extractKey(final byte[] binaryKey,
                                         final Deserializer<K> deserializer,
+                                        final Headers headers,
                                         final String topic) {
-            return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+            return deserializer.deserialize(topic, headers, 
extractKeyBytes(binaryKey));
         }
 
         public static <K> Windowed<K> from(final byte[] binaryKey,
                                            final Deserializer<K> 
keyDeserializer,
+                                           final Headers headers,
                                            final String topic) {
-            final K key = extractKey(binaryKey, keyDeserializer, topic);
+            final K key = extractKey(binaryKey, keyDeserializer, headers, 
topic);
             final Window window = extractWindow(binaryKey);
             return new Windowed<>(key, window);
         }
@@ -349,8 +355,9 @@ public class PrefixedSessionKeySchemas {
 
         public static <K> byte[] toBinary(final Windowed<K> sessionKey,
                                           final Serializer<K> serializer,
+                                          final Headers headers,
                                           final String topic) {
-            final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+            final byte[] bytes = serializer.serialize(topic, headers, 
sessionKey.key());
             return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), 
sessionKey.window().end()).get();
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 03d172e4b52..994cf8120ac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -113,8 +114,9 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
 
     private static <K> K extractKey(final byte[] binaryKey,
                                     final Deserializer<K> deserializer,
+                                    final Headers headers,
                                     final String topic) {
-        return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+        return deserializer.deserialize(topic, headers, 
extractKeyBytes(binaryKey));
     }
 
     static byte[] extractKeyBytes(final byte[] binaryKey) {
@@ -140,8 +142,9 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
 
     public static <K> Windowed<K> from(final byte[] binaryKey,
                                        final Deserializer<K> keyDeserializer,
+                                       final Headers headers,
                                        final String topic) {
-        final K key = extractKey(binaryKey, keyDeserializer, topic);
+        final K key = extractKey(binaryKey, keyDeserializer, headers, topic);
         final Window window = extractWindow(binaryKey);
         return new Windowed<>(key, window);
     }
@@ -154,15 +157,17 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
 
     public static <K> Windowed<K> from(final Windowed<Bytes> keyBytes,
                                        final Deserializer<K> keyDeserializer,
+                                       final Headers headers,
                                        final String topic) {
-        final K key = keyDeserializer.deserialize(topic, keyBytes.key().get());
+        final K key = keyDeserializer.deserialize(topic, headers, 
keyBytes.key().get());
         return new Windowed<>(key, keyBytes.window());
     }
 
     public static <K> byte[] toBinary(final Windowed<K> sessionKey,
                                       final Serializer<K> serializer,
+                                      final Headers headers,
                                       final String topic) {
-        final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+        final byte[] bytes = serializer.serialize(topic, headers, 
sessionKey.key());
         return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), 
sessionKey.window().end()).get();
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 0315120ae1b..0635457783f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -17,11 +17,14 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -31,6 +34,13 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class SessionWindowedDeserializerTest {
     private final SessionWindowedDeserializer<?> sessionWindowedDeserializer = 
new SessionWindowedDeserializer<>(new StringDeserializer());
@@ -106,4 +116,23 @@ public class SessionWindowedDeserializerTest {
         
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
sessionWindowedDeserializer.configure(props, false));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
+
+        final String key = "test-key";
+        final Windowed<String> windowed = new Windowed<>(key, new 
TimeWindow(0, 1));
+        final byte[] data = new 
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
windowed);
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final SessionWindowedDeserializer<String> testDeserializer = new 
SessionWindowedDeserializer<>(mockDeserializer);
+
+        testDeserializer.deserialize("dummy", headers, data);
+
+        verify(mockDeserializer).deserialize(anyString(), eq(headers), 
any(byte[].class));
+        verify(mockDeserializer, never()).deserialize(anyString(), 
any(byte[].class));
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 212b0c810e5..649450e14a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -17,11 +17,14 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -31,6 +34,13 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class SessionWindowedSerializerTest {
     private final SessionWindowedSerializer<?> sessionWindowedSerializer = new 
SessionWindowedSerializer<>(Serdes.String().serializer());
@@ -106,4 +116,21 @@ public class SessionWindowedSerializerTest {
         props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
sessionWindowedSerializer.configure(props, false));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
+
+        final String key = "test-key";
+        final Windowed<String> data = new Windowed<>(key, new TimeWindow(0, 
1));
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final SessionWindowedSerializer<String> testSerializer = new 
SessionWindowedSerializer<>(mockSerializer);
+
+        testSerializer.serialize("dummy", headers, data);
+
+        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 5f8d4920c80..35503edb9cc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1652,9 +1652,9 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
             return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq, 
stateSerdes);
         } else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) {
             if (changeLog) {
-                return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), "dummy"));
+                return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
             }
-            return Bytes.wrap(TimeFirstSessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), "dummy"));
+            return Bytes.wrap(TimeFirstSessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
         } else {
             throw new IllegalStateException("Unrecognized serde schema");
         }
@@ -1665,7 +1665,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         if (getIndexSchema() instanceof KeyFirstWindowKeySchema) {
             return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0, 
stateSerdes);
         } else if (getIndexSchema() instanceof KeyFirstSessionKeySchema) {
-            return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), "dummy"));
+            return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
         } else {
             throw new IllegalStateException("Unrecognized serde schema");
         }
@@ -1696,7 +1696,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                     results.add(deserialized);
                 } else if (getBaseSchema() instanceof 
TimeFirstSessionKeySchema) {
                     final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
-                        TimeFirstSessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), "dummy"),
+                        TimeFirstSessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
                         stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
                     );
                     results.add(deserialized);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 880868eadcb..1e8e4ea0a42 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -870,7 +870,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
     private Bytes serializeKey(final Windowed<String> key) {
         final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
         if (schema instanceof SessionKeySchema) {
-            return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), "dummy"));
+            return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
         } else if (schema instanceof WindowKeySchema) {
             return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
         } else {
@@ -903,7 +903,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                     results.add(deserialized);
                 } else if (schema instanceof SessionKeySchema) {
                     final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
-                        SessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), "dummy"),
+                        SessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
                         stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
                     );
                     results.add(deserialized);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index 00c9b6dc063..47e6436dde1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -76,17 +78,17 @@ public class SessionKeySchemaTest {
     );
 
     @FunctionalInterface
-    interface TriFunction<A, B, C, R> {
-        R apply(A a, B b, C c);
+    interface QuadFunction<A, B, C, D, R> {
+        R apply(A a, B b, C c, D d);
     }
 
-    private static final Map<SchemaType, TriFunction<Windowed<String>, 
Serializer<String>, String, byte[]>> SERDE_TO_STORE_BINARY_MAP = mkMap(
+    private static final Map<SchemaType, QuadFunction<Windowed<String>, 
Serializer<String>, Headers, String, byte[]>> SERDE_TO_STORE_BINARY_MAP = mkMap(
         mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::toBinary),
         mkEntry(SchemaType.PrefixedKeyFirstSchema, 
KeyFirstSessionKeySchema::toBinary),
         mkEntry(SchemaType.PrefixedTimeFirstSchema, 
TimeFirstSessionKeySchema::toBinary)
     );
 
-    private static final Map<SchemaType, TriFunction<byte[], 
Deserializer<String>, String, Windowed<String>>> SERDE_FROM_BYTES_MAP = mkMap(
+    private static final Map<SchemaType, QuadFunction<byte[], 
Deserializer<String>, Headers, String, Windowed<String>>> SERDE_FROM_BYTES_MAP 
= mkMap(
         mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::from),
         mkEntry(SchemaType.PrefixedKeyFirstSchema, 
KeyFirstSessionKeySchema::from),
         mkEntry(SchemaType.PrefixedTimeFirstSchema, 
TimeFirstSessionKeySchema::from)
@@ -124,8 +126,8 @@ public class SessionKeySchemaTest {
     private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
     private SchemaType schemaType;
     private Function<Windowed<Bytes>, Bytes> toBinary;
-    private TriFunction<Windowed<String>, Serializer<String>, String, byte[]> 
serdeToBinary;
-    private TriFunction<byte[], Deserializer<String>, String, 
Windowed<String>> serdeFromBytes;
+    private QuadFunction<Windowed<String>, Serializer<String>, Headers, 
String, byte[]> serdeToBinary;
+    private QuadFunction<byte[], Deserializer<String>, Headers, String, 
Windowed<String>> serdeFromBytes;
     private Function<Bytes, Windowed<Bytes>> fromBytes;
     private Function<byte[], Long> extractStartTS;
     private Function<byte[], Long> extractEndTS;
@@ -336,8 +338,8 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldConvertToBinaryAndBack(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
-        final Windowed<String> result = serdeFromBytes.apply(serialized, 
Serdes.String().deserializer(), "dummy");
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
+        final Windowed<String> result = serdeFromBytes.apply(serialized, 
Serdes.String().deserializer(), new RecordHeaders(), "dummy");
         assertEquals(windowedKey, result);
     }
 
@@ -345,7 +347,7 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldExtractEndTimeFromBinary(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
         assertEquals(endTime, (long) extractEndTS.apply(serialized));
     }
 
@@ -353,7 +355,7 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldExtractStartTimeFromBinary(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
         assertEquals(startTime, (long) extractStartTS.apply(serialized));
     }
 
@@ -361,7 +363,7 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldExtractWindowFromBindary(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
         assertEquals(window, extractWindow.apply(serialized));
     }
 
@@ -369,7 +371,7 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldExtractKeyBytesFromBinary(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
         assertArrayEquals(key.getBytes(), extractKeyBytes.apply(serialized));
     }
 
@@ -377,8 +379,8 @@ public class SessionKeySchemaTest {
     @EnumSource(SchemaType.class)
     public void shouldExtractKeyFromBinary(final SchemaType type) {
         setUp(type);
-        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), "dummy");
-        assertEquals(windowedKey, serdeFromBytes.apply(serialized, 
serde.deserializer(), "dummy"));
+        final byte[] serialized = serdeToBinary.apply(windowedKey, 
serde.serializer(), new RecordHeaders(), "dummy");
+        assertEquals(windowedKey, serdeFromBytes.apply(serialized, 
serde.deserializer(), new RecordHeaders(), "dummy"));
     }
 
     @ParameterizedTest

Reply via email to