This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9834b5189c7 KAFKA-20134: Fix header-based key deserialization in
MeteredTimestampedWindowStoreWithHeaders iterator methods (#21705)
9834b5189c7 is described below
commit 9834b5189c742b8c508551a4a7a7382f792dbc0e
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Mar 11 20:32:35 2026 +0000
KAFKA-20134: Fix header-based key deserialization in
MeteredTimestampedWindowStoreWithHeaders iterator methods (#21705)
Fixes a bug where `MeteredTimestampedWindowStoreWithHeaders` iterator
methods fail when key deserializers require headers.
The class inherits iterator-returning methods from `MeteredWindowStore`:
- `fetchAll(long, long)` / `backwardFetchAll(long, long)` - `all()` /
`backwardAll()` - `fetch(K, K, long, long)` / `backwardFetch(K, K,
long, long)`
These methods use the deprecated `serdes::keyFrom(rawKey)` which
provides empty headers, causing deserialization failures when headers
are required.
Reviewers: Matthias J. Sax <[email protected]>
---
.../MeteredTimestampedWindowStoreWithHeaders.java | 127 ++++++++
...teredTimestampedWindowStoreWithHeadersTest.java | 322 +++++++++++++++++++++
2 files changed, 449 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index be47980ea4f..85a2dfe2e77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -282,6 +283,132 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
return result;
}
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final
K keyFrom,
+ final
K keyTo,
+ final
long timeFrom,
+ final
long timeTo) {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().fetch(
+ keyBytes(keyFrom, new RecordHeaders()),
+ keyBytes(keyTo, new RecordHeaders()),
+ timeFrom,
+ timeTo)
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetch(final K keyFrom,
+
final K keyTo,
+
final long timeFrom,
+
final long timeTo) {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().backwardFetch(
+ keyBytes(keyFrom, new RecordHeaders()),
+ keyBytes(keyTo, new RecordHeaders()),
+ timeFrom,
+ timeTo)
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
fetchAll(final long timeFrom, final long timeTo) {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().fetchAll(timeFrom, timeTo)
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetchAll(final long timeFrom, final long timeTo) {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().backwardFetchAll(timeFrom, timeTo)
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> all() {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().all()
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardAll() {
+ return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ wrapped().backwardAll()
+ );
+ }
+
+ private class MeteredTimestampedWindowStoreWithHeadersKeyValueIterator
+ implements KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>,
MeteredIterator {
+
+ private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
+ private final long startNs;
+ private final long startTimestampMs;
+ private KeyValue<Windowed<K>, ValueTimestampHeaders<V>> cachedNext;
+
+ private MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
+ this.iter = iter;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ numOpenIterators.increment();
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return this.startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cachedNext != null || iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
+ if (cachedNext != null) {
+ final KeyValue<Windowed<K>, ValueTimestampHeaders<V>> result =
cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
+ final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
+
+ if (next == null) {
+ return null;
+ }
+
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(next.value, new RecordHeaders());
+ final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ final K key = serdes.keyFrom(next.key.key().get(), headers);
+ final Windowed<K> windowedKey = new Windowed<>(key,
next.key.window());
+ return KeyValue.pair(windowedKey, valueTimestampHeaders);
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ final long duration = time.nanoseconds() - startNs;
+ fetchSensor.record(duration);
+ iteratorDurationSensor.record(duration);
+ numOpenIterators.decrement();
+ openIterators.remove(this);
+ }
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext == null ? null : cachedNext.key;
+ }
+ }
+
private boolean isUnderlyingStoreTimestamped() {
Object store = wrapped();
do {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index daefc8367c7..5501b8bc48e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -28,14 +28,19 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.KeyValueIteratorStub;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
@@ -47,8 +52,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -276,4 +289,313 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
verify(innerStoreMock).fetch(KEY_BYTES, TIMESTAMP);
verify(innerStoreMock).put(KEY_BYTES, VALUE_TIMESTAMP_HEADERS_BYTES,
TIMESTAMP);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.fetchAll(0, 100))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.fetchAll(0, 100);
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.all())
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all();
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.fetch(any(Bytes.class), any(Bytes.class), eq(0L),
eq(100L)))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator =
+ store.fetch(KEY, KEY, 0, 100);
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
=
+ iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.backwardFetchAll(0, 100))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.backwardFetchAll(0, 100);
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.backwardAll())
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.backwardAll();
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(innerStoreMock.backwardFetch(any(Bytes.class), any(Bytes.class),
eq(0L), eq(100L)))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ innerStoreMock,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ store.init(context, store);
+
+ final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator =
+ store.backwardFetch(KEY, KEY, 0, 100);
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
=
+ iterator.next();
+
+ assertEquals(KEY, result.key.key());
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
}