This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9834b5189c7 KAFKA-20134: Fix header-based key deserialization in 
MeteredTimestampedWindowStoreWithHeaders iterator methods (#21705)
9834b5189c7 is described below

commit 9834b5189c742b8c508551a4a7a7382f792dbc0e
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Mar 11 20:32:35 2026 +0000

    KAFKA-20134: Fix header-based key deserialization in 
MeteredTimestampedWindowStoreWithHeaders iterator methods (#21705)
    
    Fixes a bug where `MeteredTimestampedWindowStoreWithHeaders` iterator
    methods fail when key deserializers require headers.
    
    The class inherits iterator-returning methods from `MeteredWindowStore`:
    - `fetchAll(long, long)` / `backwardFetchAll(long, long)`    - `all()` /
    `backwardAll()`    - `fetch(K, K, long, long)` / `backwardFetch(K, K,
    long, long)`
    
    These methods use the deprecated `serdes::keyFrom(rawKey)` which
    provides empty headers, causing deserialization failures when headers
    are required.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../MeteredTimestampedWindowStoreWithHeaders.java  | 127 ++++++++
 ...teredTimestampedWindowStoreWithHeadersTest.java | 322 +++++++++++++++++++++
 2 files changed, 449 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index be47980ea4f..85a2dfe2e77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -282,6 +283,132 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
         return result;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final 
K keyFrom,
+                                                                         final 
K keyTo,
+                                                                         final 
long timeFrom,
+                                                                         final 
long timeTo) {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().fetch(
+                keyBytes(keyFrom, new RecordHeaders()),
+                keyBytes(keyTo, new RecordHeaders()),
+                timeFrom,
+                timeTo)
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetch(final K keyFrom,
+                                                                               
  final K keyTo,
+                                                                               
  final long timeFrom,
+                                                                               
  final long timeTo) {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().backwardFetch(
+                keyBytes(keyFrom, new RecordHeaders()),
+                keyBytes(keyTo, new RecordHeaders()),
+                timeFrom,
+                timeTo)
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
fetchAll(final long timeFrom, final long timeTo) {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().fetchAll(timeFrom, timeTo)
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetchAll(final long timeFrom, final long timeTo) {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().backwardFetchAll(timeFrom, timeTo)
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> all() {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().all()
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardAll() {
+        return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            wrapped().backwardAll()
+        );
+    }
+
+    private class MeteredTimestampedWindowStoreWithHeadersKeyValueIterator
+        implements KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>, 
MeteredIterator {
+
+        private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
+        private final long startNs;
+        private final long startTimestampMs;
+        private KeyValue<Windowed<K>, ValueTimestampHeaders<V>> cachedNext;
+
+        private MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
+            final KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
+            this.iter = iter;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            numOpenIterators.increment();
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return this.startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cachedNext != null || iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
+            if (cachedNext != null) {
+                final KeyValue<Windowed<K>, ValueTimestampHeaders<V>> result = 
cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
+            final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
+
+            if (next == null) {
+                return null;
+            }
+
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(next.value, new RecordHeaders());
+            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            final K key = serdes.keyFrom(next.key.key().get(), headers);
+            final Windowed<K> windowedKey = new Windowed<>(key, 
next.key.window());
+            return KeyValue.pair(windowedKey, valueTimestampHeaders);
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                final long duration = time.nanoseconds() - startNs;
+                fetchSensor.record(duration);
+                iteratorDurationSensor.record(duration);
+                numOpenIterators.decrement();
+                openIterators.remove(this);
+            }
+        }
+
+        @Override
+        public Windowed<K> peekNextKey() {
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext == null ? null : cachedNext.key;
+        }
+    }
+
     private boolean isUnderlyingStoreTimestamped() {
         Object store = wrapped();
         do {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index daefc8367c7..5501b8bc48e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -28,14 +28,19 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.KeyValueIteratorStub;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
@@ -47,8 +52,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -276,4 +289,313 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
         verify(innerStoreMock).fetch(KEY_BYTES, TIMESTAMP);
         verify(innerStoreMock).put(KEY_BYTES, VALUE_TIMESTAMP_HEADERS_BYTES, 
TIMESTAMP);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.fetchAll(0, 100))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.fetchAll(0, 100);
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.all())
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all();
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.fetch(any(Bytes.class), any(Bytes.class), eq(0L), 
eq(100L)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator =
+            store.fetch(KEY, KEY, 0, 100);
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
=
+            iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.backwardFetchAll(0, 100))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.backwardFetchAll(0, 100);
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.backwardAll())
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.backwardAll();
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(innerStoreMock.backwardFetch(any(Bytes.class), any(Bytes.class), 
eq(0L), eq(100L)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+            innerStoreMock,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        store.init(context, store);
+
+        final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator =
+            store.backwardFetch(KEY, KEY, 0, 100);
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
=
+            iterator.next();
+
+        assertEquals(KEY, result.key.key());
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
 }

Reply via email to