This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 448338cab5e KAFKA-20173: Propagate headers into serde 1/N (#21490)
448338cab5e is described below
commit 448338cab5ebd87614b022336efc190d5af0e9ec
Author: Uladzislau Blok <[email protected]>
AuthorDate: Wed Feb 18 12:06:09 2026 +0100
KAFKA-20173: Propagate headers into serde 1/N (#21490)
Propagate headers into serializes / deserializers
This PR partially covers `org.apache.kafka.streams.kstream` package.
Code:
- Use empty headers for backward compatibility
- Propagate headers when it's possible
Tests:
- Use empty headers for existing tests
- Add new test if needed
Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
<[email protected]>, TengYao Chi <[email protected]>
---------
Co-authored-by: Matthias J. Sax <[email protected]>
---
.../kstream/SessionWindowedDeserializer.java | 9 ++++++-
.../streams/kstream/SessionWindowedSerializer.java | 9 ++++++-
.../state/internals/MeteredSessionStore.java | 2 +-
.../state/internals/PrefixedSessionKeySchemas.java | 19 +++++++++-----
.../streams/state/internals/SessionKeySchema.java | 13 +++++++---
.../kstream/SessionWindowedDeserializerTest.java | 29 +++++++++++++++++++++
.../kstream/SessionWindowedSerializerTest.java | 27 +++++++++++++++++++
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 8 +++---
.../AbstractRocksDBSegmentedBytesStoreTest.java | 4 +--
.../state/internals/SessionKeySchemaTest.java | 30 ++++++++++++----------
10 files changed, 117 insertions(+), 33 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
index 11795459c9c..8e3dc8663e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Utils;
@@ -87,6 +89,11 @@ public class SessionWindowedDeserializer<T> implements
Deserializer<Windowed<T>>
@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
+ return deserialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public Windowed<T> deserialize(final String topic, final Headers headers,
final byte[] data) {
WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
if (data == null || data.length == 0) {
@@ -94,7 +101,7 @@ public class SessionWindowedDeserializer<T> implements
Deserializer<Windowed<T>>
}
// for either key or value, their schema is the same hence we will
just use session key schema
- return SessionKeySchema.from(data, inner, topic);
+ return SessionKeySchema.from(data, inner, headers, topic);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 60e2b81d497..50edd86f95d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
@@ -87,13 +89,18 @@ public class SessionWindowedSerializer<T> implements
WindowedSerializer<T> {
@Override
public byte[] serialize(final String topic, final Windowed<T> data) {
+ return serialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final
Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
if (data == null) {
return null;
}
// for either key or value, their schema is the same hence we will
just use session key schema
- return SessionKeySchema.toBinary(data, inner, topic);
+ return SessionKeySchema.toBinary(data, inner, headers, topic);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index f5281a04f6b..9b578ee4b26 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -159,7 +159,7 @@ public class MeteredSessionStore<K, V>
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>)
wrapped).setFlushListener(
record -> listener.apply(
- record.withKey(SessionKeySchema.from(record.key(),
serdes.keyDeserializer(), serdes.topic()))
+ record.withKey(SessionKeySchema.from(record.key(),
serdes.keyDeserializer(), record.headers(), serdes.topic()))
.withValue(new Change<>(
record.value().newValue != null ?
serdes.valueFrom(record.value().newValue) : null,
record.value().oldValue != null ?
serdes.valueFrom(record.value().oldValue) : null,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index b54b83b08f0..6f3f8cc6603 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -153,8 +154,9 @@ public class PrefixedSessionKeySchemas {
private static <K> K extractKey(final byte[] binaryKey,
final Deserializer<K> deserializer,
+ final Headers headers,
final String topic) {
- return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+ return deserializer.deserialize(topic, headers,
extractKeyBytes(binaryKey));
}
static byte[] extractKeyBytes(final byte[] binaryKey) {
@@ -178,16 +180,18 @@ public class PrefixedSessionKeySchemas {
public static <K> Windowed<K> from(final byte[] binaryKey,
final Deserializer<K>
keyDeserializer,
+ final Headers headers,
final String topic) {
- final K key = extractKey(binaryKey, keyDeserializer, topic);
+ final K key = extractKey(binaryKey, keyDeserializer, headers,
topic);
final Window window = extractWindow(binaryKey);
return new Windowed<>(key, window);
}
public static <K> byte[] toBinary(final Windowed<K> sessionKey,
final Serializer<K> serializer,
+ final Headers headers,
final String topic) {
- final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+ final byte[] bytes = serializer.serialize(topic, headers,
sessionKey.key());
return toBinary(Bytes.wrap(bytes), sessionKey.window().start(),
sessionKey.window().end()).get();
}
@@ -323,14 +327,16 @@ public class PrefixedSessionKeySchemas {
private static <K> K extractKey(final byte[] binaryKey,
final Deserializer<K> deserializer,
+ final Headers headers,
final String topic) {
- return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+ return deserializer.deserialize(topic, headers,
extractKeyBytes(binaryKey));
}
public static <K> Windowed<K> from(final byte[] binaryKey,
final Deserializer<K>
keyDeserializer,
+ final Headers headers,
final String topic) {
- final K key = extractKey(binaryKey, keyDeserializer, topic);
+ final K key = extractKey(binaryKey, keyDeserializer, headers,
topic);
final Window window = extractWindow(binaryKey);
return new Windowed<>(key, window);
}
@@ -349,8 +355,9 @@ public class PrefixedSessionKeySchemas {
public static <K> byte[] toBinary(final Windowed<K> sessionKey,
final Serializer<K> serializer,
+ final Headers headers,
final String topic) {
- final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+ final byte[] bytes = serializer.serialize(topic, headers,
sessionKey.key());
return toBinary(Bytes.wrap(bytes), sessionKey.window().start(),
sessionKey.window().end()).get();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 03d172e4b52..994cf8120ac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -113,8 +114,9 @@ public class SessionKeySchema implements
SegmentedBytesStore.KeySchema {
private static <K> K extractKey(final byte[] binaryKey,
final Deserializer<K> deserializer,
+ final Headers headers,
final String topic) {
- return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
+ return deserializer.deserialize(topic, headers,
extractKeyBytes(binaryKey));
}
static byte[] extractKeyBytes(final byte[] binaryKey) {
@@ -140,8 +142,9 @@ public class SessionKeySchema implements
SegmentedBytesStore.KeySchema {
public static <K> Windowed<K> from(final byte[] binaryKey,
final Deserializer<K> keyDeserializer,
+ final Headers headers,
final String topic) {
- final K key = extractKey(binaryKey, keyDeserializer, topic);
+ final K key = extractKey(binaryKey, keyDeserializer, headers, topic);
final Window window = extractWindow(binaryKey);
return new Windowed<>(key, window);
}
@@ -154,15 +157,17 @@ public class SessionKeySchema implements
SegmentedBytesStore.KeySchema {
public static <K> Windowed<K> from(final Windowed<Bytes> keyBytes,
final Deserializer<K> keyDeserializer,
+ final Headers headers,
final String topic) {
- final K key = keyDeserializer.deserialize(topic, keyBytes.key().get());
+ final K key = keyDeserializer.deserialize(topic, headers,
keyBytes.key().get());
return new Windowed<>(key, keyBytes.window());
}
public static <K> byte[] toBinary(final Windowed<K> sessionKey,
final Serializer<K> serializer,
+ final Headers headers,
final String topic) {
- final byte[] bytes = serializer.serialize(topic, sessionKey.key());
+ final byte[] bytes = serializer.serialize(topic, headers,
sessionKey.key());
return toBinary(Bytes.wrap(bytes), sessionKey.window().start(),
sessionKey.window().end()).get();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 0315120ae1b..0635457783f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.jupiter.api.Test;
@@ -31,6 +34,13 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class SessionWindowedDeserializerTest {
private final SessionWindowedDeserializer<?> sessionWindowedDeserializer =
new SessionWindowedDeserializer<>(new StringDeserializer());
@@ -106,4 +116,23 @@ public class SessionWindowedDeserializerTest {
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
sessionWindowedDeserializer.configure(props, false));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
+
+ final String key = "test-key";
+ final Windowed<String> windowed = new Windowed<>(key, new
TimeWindow(0, 1));
+ final byte[] data = new
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
windowed);
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final SessionWindowedDeserializer<String> testDeserializer = new
SessionWindowedDeserializer<>(mockDeserializer);
+
+ testDeserializer.deserialize("dummy", headers, data);
+
+ verify(mockDeserializer).deserialize(anyString(), eq(headers),
any(byte[].class));
+ verify(mockDeserializer, never()).deserialize(anyString(),
any(byte[].class));
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 212b0c810e5..649450e14a9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.jupiter.api.Test;
@@ -31,6 +34,13 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class SessionWindowedSerializerTest {
private final SessionWindowedSerializer<?> sessionWindowedSerializer = new
SessionWindowedSerializer<>(Serdes.String().serializer());
@@ -106,4 +116,21 @@ public class SessionWindowedSerializerTest {
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
sessionWindowedSerializer.configure(props, false));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
+
+ final String key = "test-key";
+ final Windowed<String> data = new Windowed<>(key, new TimeWindow(0,
1));
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final SessionWindowedSerializer<String> testSerializer = new
SessionWindowedSerializer<>(mockSerializer);
+
+ testSerializer.serialize("dummy", headers, data);
+
+ verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+ verify(mockSerializer, never()).serialize(anyString(), eq(key));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 5f8d4920c80..35503edb9cc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1652,9 +1652,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq,
stateSerdes);
} else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) {
if (changeLog) {
- return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), "dummy"));
+ return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
}
- return Bytes.wrap(TimeFirstSessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), "dummy"));
+ return Bytes.wrap(TimeFirstSessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
} else {
throw new IllegalStateException("Unrecognized serde schema");
}
@@ -1665,7 +1665,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
if (getIndexSchema() instanceof KeyFirstWindowKeySchema) {
return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0,
stateSerdes);
} else if (getIndexSchema() instanceof KeyFirstSessionKeySchema) {
- return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), "dummy"));
+ return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
} else {
throw new IllegalStateException("Unrecognized serde schema");
}
@@ -1696,7 +1696,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
results.add(deserialized);
} else if (getBaseSchema() instanceof
TimeFirstSessionKeySchema) {
final KeyValue<Windowed<String>, Long> deserialized =
KeyValue.pair(
- TimeFirstSessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), "dummy"),
+ TimeFirstSessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
);
results.add(deserialized);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 880868eadcb..1e8e4ea0a42 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -870,7 +870,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
private Bytes serializeKey(final Windowed<String> key) {
final StateSerdes<String, Long> stateSerdes =
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
if (schema instanceof SessionKeySchema) {
- return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), "dummy"));
+ return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
} else if (schema instanceof WindowKeySchema) {
return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
} else {
@@ -903,7 +903,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
results.add(deserialized);
} else if (schema instanceof SessionKeySchema) {
final KeyValue<Windowed<String>, Long> deserialized =
KeyValue.pair(
- SessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), "dummy"),
+ SessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
);
results.add(deserialized);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index 00c9b6dc063..47e6436dde1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -76,17 +78,17 @@ public class SessionKeySchemaTest {
);
@FunctionalInterface
- interface TriFunction<A, B, C, R> {
- R apply(A a, B b, C c);
+ interface QuadFunction<A, B, C, D, R> {
+ R apply(A a, B b, C c, D d);
}
- private static final Map<SchemaType, TriFunction<Windowed<String>,
Serializer<String>, String, byte[]>> SERDE_TO_STORE_BINARY_MAP = mkMap(
+ private static final Map<SchemaType, QuadFunction<Windowed<String>,
Serializer<String>, Headers, String, byte[]>> SERDE_TO_STORE_BINARY_MAP = mkMap(
mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::toBinary),
mkEntry(SchemaType.PrefixedKeyFirstSchema,
KeyFirstSessionKeySchema::toBinary),
mkEntry(SchemaType.PrefixedTimeFirstSchema,
TimeFirstSessionKeySchema::toBinary)
);
- private static final Map<SchemaType, TriFunction<byte[],
Deserializer<String>, String, Windowed<String>>> SERDE_FROM_BYTES_MAP = mkMap(
+ private static final Map<SchemaType, QuadFunction<byte[],
Deserializer<String>, Headers, String, Windowed<String>>> SERDE_FROM_BYTES_MAP
= mkMap(
mkEntry(SchemaType.SessionKeySchema, SessionKeySchema::from),
mkEntry(SchemaType.PrefixedKeyFirstSchema,
KeyFirstSessionKeySchema::from),
mkEntry(SchemaType.PrefixedTimeFirstSchema,
TimeFirstSessionKeySchema::from)
@@ -124,8 +126,8 @@ public class SessionKeySchemaTest {
private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
private SchemaType schemaType;
private Function<Windowed<Bytes>, Bytes> toBinary;
- private TriFunction<Windowed<String>, Serializer<String>, String, byte[]>
serdeToBinary;
- private TriFunction<byte[], Deserializer<String>, String,
Windowed<String>> serdeFromBytes;
+ private QuadFunction<Windowed<String>, Serializer<String>, Headers,
String, byte[]> serdeToBinary;
+ private QuadFunction<byte[], Deserializer<String>, Headers, String,
Windowed<String>> serdeFromBytes;
private Function<Bytes, Windowed<Bytes>> fromBytes;
private Function<byte[], Long> extractStartTS;
private Function<byte[], Long> extractEndTS;
@@ -336,8 +338,8 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldConvertToBinaryAndBack(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
- final Windowed<String> result = serdeFromBytes.apply(serialized,
Serdes.String().deserializer(), "dummy");
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
+ final Windowed<String> result = serdeFromBytes.apply(serialized,
Serdes.String().deserializer(), new RecordHeaders(), "dummy");
assertEquals(windowedKey, result);
}
@@ -345,7 +347,7 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldExtractEndTimeFromBinary(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
assertEquals(endTime, (long) extractEndTS.apply(serialized));
}
@@ -353,7 +355,7 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldExtractStartTimeFromBinary(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
assertEquals(startTime, (long) extractStartTS.apply(serialized));
}
@@ -361,7 +363,7 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldExtractWindowFromBindary(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
assertEquals(window, extractWindow.apply(serialized));
}
@@ -369,7 +371,7 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldExtractKeyBytesFromBinary(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
assertArrayEquals(key.getBytes(), extractKeyBytes.apply(serialized));
}
@@ -377,8 +379,8 @@ public class SessionKeySchemaTest {
@EnumSource(SchemaType.class)
public void shouldExtractKeyFromBinary(final SchemaType type) {
setUp(type);
- final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), "dummy");
- assertEquals(windowedKey, serdeFromBytes.apply(serialized,
serde.deserializer(), "dummy"));
+ final byte[] serialized = serdeToBinary.apply(windowedKey,
serde.serializer(), new RecordHeaders(), "dummy");
+ assertEquals(windowedKey, serdeFromBytes.apply(serialized,
serde.deserializer(), new RecordHeaders(), "dummy"));
}
@ParameterizedTest