This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8059503601f KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders 
(3/N) (#21451)
8059503601f is described below

commit 8059503601f17cda2fd1b4dea1a06b12e8038e47
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Feb 19 04:41:16 2026 +0100

    KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (3/N) (#21451)
    
    This PR implements the metered layer of the
    TimestampedKeyValueStoreWithHeaders introduced in KIP-1271.
    
    Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../state/TimestampedKeyValueStoreWithHeaders.java |  27 ++
 .../state/internals/MeteredKeyValueStore.java      |   9 +-
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 117 +++++
 ...redTimestampedKeyValueStoreWithHeadersTest.java | 499 +++++++++++++++++++++
 4 files changed, 648 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
new file mode 100644
index 00000000000..d44184cb4ea
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+/**
+ * A key-(value/timestamp/headers) store that supports put/get/delete.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public interface TimestampedKeyValueStoreWithHeaders<K, V>
+    extends KeyValueStore<K, ValueTimestampHeaders<V>> {
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index dbf6ad0b8cd..91e933e27d2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -86,7 +87,7 @@ public class MeteredKeyValueStore<K, V>
     private final String metricsScope;
     protected final Time time;
     protected Sensor putSensor;
-    private Sensor putIfAbsentSensor;
+    protected Sensor putIfAbsentSensor;
     protected Sensor getSensor;
     protected Sensor deleteSensor;
     private Sensor putAllSensor;
@@ -323,7 +324,7 @@ public class MeteredKeyValueStore<K, V>
                     final V value) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
-            maybeMeasureLatency(() -> wrapped().put(keyBytes(key), 
serdes.rawValue(value)), time, putSensor);
+            maybeMeasureLatency(() -> wrapped().put(keyBytes(key), 
serdes.rawValue(value, new RecordHeaders())), time, putSensor);
             maybeRecordE2ELatency();
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
@@ -420,11 +421,11 @@ public class MeteredKeyValueStore<K, V>
     }
 
     protected V outerValue(final byte[] value) {
-        return value != null ? serdes.valueFrom(value) : null;
+        return value != null ? serdes.valueFrom(value, new RecordHeaders()) : 
null;
     }
 
     protected Bytes keyBytes(final K key) {
-        return Bytes.wrap(serdes.rawKey(key));
+        return Bytes.wrap(serdes.rawKey(key, new RecordHeaders()));
     }
 
     private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
V>> from) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
new file mode 100644
index 00000000000..4e955c98f1a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                               final String metricScope,
+                                               final Time time,
+                                               final Serde<K> keySerde,
+                                               final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+        super(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                        final 
SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new ValueTimestampHeadersSerde<>((Serde<V>) 
getter.valueSerde());
+        } else {
+            return super.prepareValueSerdeForStore(valueSerde, getter);
+        }
+    }
+
+    @Override
+    public void put(final K key,
+                    final ValueTimestampHeaders<V> value) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
+            maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers)), time, putSensor);
+            maybeRecordE2ELatency();
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> putIfAbsent(final K key,
+                         final ValueTimestampHeaders<V> value) {
+        Objects.requireNonNull(key, "key cannot be null");
+        final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
+        final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
+            () -> outerValue(wrapped().putIfAbsent(keyBytes(key, headers), 
serdes.rawValue(value, headers))),
+            time,
+            putIfAbsentSensor
+        );
+        maybeRecordE2ELatency();
+        return currentValue;
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        throw new UnsupportedOperationException("Querying is not supported for 
" + getClass().getSimpleName());
+    }
+
+    @Override
+    public Position getPosition() {
+        throw new UnsupportedOperationException("Position is not supported for 
" + getClass().getSimpleName());
+    }
+
+    protected Bytes keyBytes(final K key, final Headers headers) {
+        return Bytes.wrap(serdes.rawKey(key, headers));
+    }
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
new file mode 100644
index 00000000000..ddaa24ce184
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.KeyValueIteratorStub;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class MeteredTimestampedKeyValueStoreWithHeadersTest {
+    private static final String APPLICATION_ID = "test-app";
+    private static final String STORE_NAME = "store-name";
+    private static final String STORE_TYPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String CHANGELOG_TOPIC = "changelog-topic-name";
+    private static final String THREAD_ID_TAG_KEY = "thread-id";
+    private static final String KEY = "key";
+    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
+    private static final RecordHeaders HEADERS = makeHeaders();
+    private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS 
=
+        ValueTimestampHeaders.make("value", 97L, HEADERS);
+    private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = 
serializeValueTimestampHeaders();
+    private final String threadId = Thread.currentThread().getName();
+    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
+    @Mock
+    private KeyValueStore<Bytes, byte[]> inner;
+    @Mock
+    private InternalProcessorContext<?, ?> context;
+    private MockTime mockTime;
+
+    private static final Map<String, Object> CONFIGS =
+        mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, 
APPLICATION_ID));
+
+    private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered;
+    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = 
KeyValue.pair(KEY_BYTES,
+        VALUE_TIMESTAMP_HEADERS_BYTES
+    );
+    private final Metrics metrics = new Metrics();
+    private Map<String, String> tags;
+
+    private void setUpWithoutContext() {
+        mockTime = new MockTime();
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            "scope",
+            mockTime,
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry(THREAD_ID_TAG_KEY, threadId),
+            mkEntry("task-id", taskId.toString()),
+            mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
+        );
+    }
+
+    private void setUp() {
+        setUpWithoutContext();
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.metrics())
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
+        when(context.taskId()).thenReturn(taskId);
+        when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.appConfigs()).thenReturn(CONFIGS);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void setUpWithExpectSerdes() {
+        setUp();
+        when(context.keySerde()).thenReturn((Serde) Serdes.String());
+        when(context.valueSerde()).thenReturn((Serde) Serdes.Long());
+    }
+
+    private void init() {
+        metered.init(context, metered);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        setUp();
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer 
= new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        doNothing().when(inner).init(context, outer);
+        outer.init(context, outer);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        setUp();
+        doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
+    }
+
+    @Test
+    public void 
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        setUp();
+        final String defaultChangelogTopicName = 
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, 
taskId.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @Test
+    public void testMetrics() {
+        setUp();
+        init();
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format(
+            "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
+            STORE_LEVEL_GROUP,
+            THREAD_ID_TAG_KEY,
+            threadId,
+            taskId,
+            STORE_TYPE,
+            STORE_NAME
+        )));
+    }
+
+    @Test
+    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
+        setUp();
+        doNothing().when(inner).put(any(Bytes.class), any(byte[].class));
+        init();
+
+        metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
+        setUp();
+        
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY));
+
+        final KafkaMetric metric = metric("get-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
+        setUp();
+        when(inner.putIfAbsent(any(Bytes.class), 
any(byte[].class))).thenReturn(null);
+        init();
+
+        metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-if-absent-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
+        setUp();
+        doNothing().when(inner).putAll(any(List.class));
+        init();
+
+        metered.putAll(Collections.singletonList(KeyValue.pair(KEY, 
VALUE_TIMESTAMP_HEADERS)));
+
+        final KafkaMetric metric = metric("put-all-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
+        setUp();
+        
when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        metered.delete(KEY);
+
+        final KafkaMetric metric = metric("delete-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
+        setUp();
+        when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn(
+            new 
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+        init();
+
+        try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
iterator = metered.range(KEY, KEY)) {
+            assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+            assertFalse(iterator.hasNext());
+        }
+
+        final KafkaMetric metric = metric("range-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
+        setUp();
+        when(inner.all())
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+        init();
+
+        try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
iterator = metered.all()) {
+            assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+            assertFalse(iterator.hasNext());
+        }
+
+        final KafkaMetric metric = metric(new MetricName("all-rate", 
STORE_LEVEL_GROUP, "", tags));
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldCommitInnerWhenCommitTimeRecords() {
+        setUp();
+        doNothing().when(inner).commit(Map.of());
+        init();
+
+        metered.commit(Map.of());
+
+        final KafkaMetric metric = metric("flush-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    private interface CachedKeyValueStore extends KeyValueStore<Bytes, 
byte[]>, CachedStateStore<byte[], byte[]> { }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSetFlushListenerOnWrappedCachingStore() {
+        setUpWithoutContext();
+        final CachedKeyValueStore cachedKeyValueStore = 
mock(CachedKeyValueStore.class);
+
+        
when(cachedKeyValueStore.setFlushListener(any(CacheFlushListener.class), 
eq(false))).thenReturn(true);
+
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            cachedKeyValueStore,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String()));
+        assertTrue(metered.setFlushListener(null, false));
+    }
+
+    @Test
+    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
+        setUpWithoutContext();
+        assertFalse(metered.setFlushListener(null, false));
+    }
+
+    @Test
+    public void 
shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
+        setUpWithExpectSerdes();
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = 
new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            null,
+            null
+        );
+        store.init(context, inner);
+
+        try {
+            store.put("key", ValueTimestampHeaders.make(42L, 60000, new 
RecordHeaders()));
+        } catch (final StreamsException exception) {
+            if (exception.getCause() instanceof ClassCastException) {
+                throw new AssertionError(
+                    "Serdes are not correctly set from processor context.", 
exception);
+            } else {
+                throw exception;
+            }
+        }
+    }
+
+    @Test
+    public void 
shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
+        setUp();
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = 
new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.Long())
+        );
+        store.init(context, inner);
+
+        try {
+            store.put("key", ValueTimestampHeaders.make(42L, 60000, new 
RecordHeaders()));
+        } catch (final StreamsException exception) {
+            if (exception.getCause() instanceof ClassCastException) {
+                fail("Serdes are not correctly set from constructor 
parameters.");
+            }
+            throw exception;
+        }
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void shouldTrackOpenIteratorsMetric() {
+        setUp();
+        when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+        init();
+
+        final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
+        assertNotNull(openIteratorsMetric);
+
+        assertEquals(0L, (Long) openIteratorsMetric.metricValue());
+
+        try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
unused = metered.all()) {
+            assertEquals(1L, (Long) openIteratorsMetric.metricValue());
+        }
+
+        assertEquals(0L, (Long) openIteratorsMetric.metricValue());
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void shouldTimeIteratorDuration() {
+        setUp();
+        when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+        init();
+
+        final KafkaMetric iteratorDurationAvgMetric = 
metric("iterator-duration-avg");
+        final KafkaMetric iteratorDurationMaxMetric = 
metric("iterator-duration-max");
+        assertNotNull(iteratorDurationAvgMetric);
+        assertNotNull(iteratorDurationMaxMetric);
+
+        assertEquals(Double.NaN, (Double) 
iteratorDurationAvgMetric.metricValue());
+        assertEquals(Double.NaN, (Double) 
iteratorDurationMaxMetric.metricValue());
+
+        try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
unused = metered.all()) {
+            // nothing to do, just close immediately
+            mockTime.sleep(2);
+        }
+
+        assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) 
iteratorDurationAvgMetric.metricValue());
+        assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) 
iteratorDurationMaxMetric.metricValue());
+
+        try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
iterator = metered.all()) {
+            // nothing to do, just close immediately
+            mockTime.sleep(3);
+        }
+
+        assertEquals(2.5 * TimeUnit.MILLISECONDS.toNanos(1), (double) 
iteratorDurationAvgMetric.metricValue());
+        assertEquals(3.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) 
iteratorDurationMaxMetric.metricValue());
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void shouldTrackOldestOpenIteratorTimestamp() {
+        setUp();
+        when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+        init();
+
+        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertNotNull(oldestIteratorTimestampMetric);
+
+        KeyValueIterator<String, ValueTimestampHeaders<String>> second = null;
+        final long secondTimestamp;
+        try {
+            try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
unused = metered.all()) {
+
+                final long oldestTimestamp = mockTime.milliseconds();
+                assertEquals(oldestTimestamp, (Long) 
oldestIteratorTimestampMetric.metricValue());
+                mockTime.sleep(100);
+
+                // open a second iterator before closing the first to test 
that we still produce the first iterator's timestamp
+                second = metered.all();
+                secondTimestamp = mockTime.milliseconds();
+                assertEquals(oldestTimestamp, (Long) 
oldestIteratorTimestampMetric.metricValue());
+                mockTime.sleep(100);
+            }
+
+            // now that the first iterator is closed, check that the timestamp 
has advanced to the still open second iterator
+            assertEquals(secondTimestamp, (Long) 
oldestIteratorTimestampMetric.metricValue());
+        } finally {
+            if (second != null) {
+                second.close();
+            }
+        }
+        // now that all iterators are closed, the metric should be zero
+        assertEquals(0L, (Long) oldestIteratorTimestampMetric.metricValue());
+    }
+
+    private static RecordHeaders makeHeaders() {
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("header-key", "header-value".getBytes());
+        return headers;
+    }
+
+    private static byte[] serializeValueTimestampHeaders() {
+        final ValueTimestampHeadersSerializer<String> serializer = new 
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+        return serializer.serialize("topic", VALUE_TIMESTAMP_HEADERS);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String 
topic) {
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+        final Serializer<ValueTimestampHeaders<String>> valueSerializer = 
mock(Serializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        lenient().when(keySerializer.serialize(eq(topic), 
any(RecordHeaders.class), eq(KEY))).thenReturn(KEY.getBytes());
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+        lenient().when(valueDeserializer.deserialize(eq(topic), 
any(RecordHeaders.class), 
any(byte[].class))).thenReturn(VALUE_TIMESTAMP_HEADERS);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        lenient().when(valueSerializer.serialize(eq(topic), 
any(RecordHeaders.class), 
eq(VALUE_TIMESTAMP_HEADERS))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        metered.init(context, metered);
+
+        metered.get(KEY);
+        metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+    }
+
+    private KafkaMetric metric(final String name) {
+        return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", 
tags));
+    }
+
+    private KafkaMetric metric(final MetricName metricName) {
+        return this.metrics.metric(metricName);
+    }
+}

Reply via email to