This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8059503601f KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(3/N) (#21451)
8059503601f is described below
commit 8059503601f17cda2fd1b4dea1a06b12e8038e47
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Feb 19 04:41:16 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (3/N) (#21451)
This PR implements the metered layer of the
TimestampedKeyValueStoreWithHeaders introduced in KIP-1271.
Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../state/TimestampedKeyValueStoreWithHeaders.java | 27 ++
.../state/internals/MeteredKeyValueStore.java | 9 +-
...MeteredTimestampedKeyValueStoreWithHeaders.java | 117 +++++
...redTimestampedKeyValueStoreWithHeadersTest.java | 499 +++++++++++++++++++++
4 files changed, 648 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
new file mode 100644
index 00000000000..d44184cb4ea
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+/**
+ * A key-(value/timestamp/headers) store that supports put/get/delete.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public interface TimestampedKeyValueStoreWithHeaders<K, V>
+ extends KeyValueStore<K, ValueTimestampHeaders<V>> {
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index dbf6ad0b8cd..91e933e27d2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -86,7 +87,7 @@ public class MeteredKeyValueStore<K, V>
private final String metricsScope;
protected final Time time;
protected Sensor putSensor;
- private Sensor putIfAbsentSensor;
+ protected Sensor putIfAbsentSensor;
protected Sensor getSensor;
protected Sensor deleteSensor;
private Sensor putAllSensor;
@@ -323,7 +324,7 @@ public class MeteredKeyValueStore<K, V>
final V value) {
Objects.requireNonNull(key, "key cannot be null");
try {
- maybeMeasureLatency(() -> wrapped().put(keyBytes(key),
serdes.rawValue(value)), time, putSensor);
+ maybeMeasureLatency(() -> wrapped().put(keyBytes(key),
serdes.rawValue(value, new RecordHeaders())), time, putSensor);
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
@@ -420,11 +421,11 @@ public class MeteredKeyValueStore<K, V>
}
protected V outerValue(final byte[] value) {
- return value != null ? serdes.valueFrom(value) : null;
+ return value != null ? serdes.valueFrom(value, new RecordHeaders()) :
null;
}
protected Bytes keyBytes(final K key) {
- return Bytes.wrap(serdes.rawKey(key));
+ return Bytes.wrap(serdes.rawKey(key, new RecordHeaders()));
}
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
V>> from) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
new file mode 100644
index 00000000000..4e955c98f1a
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type <Bytes,
byte[]>,
+ * hence we use {@link Serde}s to convert from <K,
ValueTimestampHeaders<V>> to <Bytes, byte[]>.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+ extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes,
byte[]> inner,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final
Serde<ValueTimestampHeaders<V>> valueSerde) {
+ super(inner, metricScope, time, keySerde, valueSerde);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final
Serde<ValueTimestampHeaders<V>> valueSerde,
+ final
SerdeGetter getter) {
+ if (valueSerde == null) {
+ return new ValueTimestampHeadersSerde<>((Serde<V>)
getter.valueSerde());
+ } else {
+ return super.prepareValueSerdeForStore(valueSerde, getter);
+ }
+ }
+
+ @Override
+ public void put(final K key,
+ final ValueTimestampHeaders<V> value) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ final Headers headers = value != null ? value.headers() : new
RecordHeaders();
+ maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers)), time, putSensor);
+ maybeRecordE2ELatency();
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key, value);
+ throw new ProcessorStateException(message, e);
+ }
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> putIfAbsent(final K key,
+ final ValueTimestampHeaders<V> value) {
+ Objects.requireNonNull(key, "key cannot be null");
+ final Headers headers = value != null ? value.headers() : new
RecordHeaders();
+ final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
+ () -> outerValue(wrapped().putIfAbsent(keyBytes(key, headers),
serdes.rawValue(value, headers))),
+ time,
+ putIfAbsentSensor
+ );
+ maybeRecordE2ELatency();
+ return currentValue;
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ throw new UnsupportedOperationException("Querying is not supported for
" + getClass().getSimpleName());
+ }
+
+ @Override
+ public Position getPosition() {
+ throw new UnsupportedOperationException("Position is not supported for
" + getClass().getSimpleName());
+ }
+
+ protected Bytes keyBytes(final K key, final Headers headers) {
+ return Bytes.wrap(serdes.rawKey(key, headers));
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
new file mode 100644
index 00000000000..ddaa24ce184
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.KeyValueIteratorStub;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class MeteredTimestampedKeyValueStoreWithHeadersTest {
+ private static final String APPLICATION_ID = "test-app";
+ private static final String STORE_NAME = "store-name";
+ private static final String STORE_TYPE = "scope";
+ private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+ private static final String CHANGELOG_TOPIC = "changelog-topic-name";
+ private static final String THREAD_ID_TAG_KEY = "thread-id";
+ private static final String KEY = "key";
+ private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
+ private static final RecordHeaders HEADERS = makeHeaders();
+ private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS
=
+ ValueTimestampHeaders.make("value", 97L, HEADERS);
+ private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES =
serializeValueTimestampHeaders();
+ private final String threadId = Thread.currentThread().getName();
+ private final TaskId taskId = new TaskId(0, 0, "My-Topology");
+ @Mock
+ private KeyValueStore<Bytes, byte[]> inner;
+ @Mock
+ private InternalProcessorContext<?, ?> context;
+ private MockTime mockTime;
+
+ private static final Map<String, Object> CONFIGS =
+ mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
APPLICATION_ID));
+
+ private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered;
+ private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair =
KeyValue.pair(KEY_BYTES,
+ VALUE_TIMESTAMP_HEADERS_BYTES
+ );
+ private final Metrics metrics = new Metrics();
+ private Map<String, String> tags;
+
+ private void setUpWithoutContext() {
+ mockTime = new MockTime();
+ metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ "scope",
+ mockTime,
+ Serdes.String(),
+ new ValueTimestampHeadersSerde<>(Serdes.String())
+ );
+ metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+ tags = mkMap(
+ mkEntry(THREAD_ID_TAG_KEY, threadId),
+ mkEntry("task-id", taskId.toString()),
+ mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
+ );
+ }
+
+ private void setUp() {
+ setUpWithoutContext();
+ when(context.applicationId()).thenReturn(APPLICATION_ID);
+ when(context.metrics())
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
+ when(context.taskId()).thenReturn(taskId);
+ when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+ when(inner.name()).thenReturn(STORE_NAME);
+ when(context.appConfigs()).thenReturn(CONFIGS);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void setUpWithExpectSerdes() {
+ setUp();
+ when(context.keySerde()).thenReturn((Serde) Serdes.String());
+ when(context.valueSerde()).thenReturn((Serde) Serdes.Long());
+ }
+
+ private void init() {
+ metered.init(context, metered);
+ }
+
+ @Test
+ public void shouldDelegateInit() {
+ setUp();
+ final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer
= new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ Serdes.String(),
+ new ValueTimestampHeadersSerde<>(Serdes.String())
+ );
+ doNothing().when(inner).init(context, outer);
+ outer.init(context, outer);
+ }
+
+ @Test
+ public void shouldPassChangelogTopicNameToStateStoreSerde() {
+ setUp();
+ doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
+ }
+
+ @Test
+ public void
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+ setUp();
+ final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME,
taskId.topologyName());
+ when(context.changelogFor(STORE_NAME)).thenReturn(null);
+
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+ }
+
+ @Test
+ public void testMetrics() {
+ setUp();
+ init();
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new
KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
+ metrics.addReporter(reporter);
+ assertTrue(reporter.containsMbean(String.format(
+ "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
+ STORE_LEVEL_GROUP,
+ THREAD_ID_TAG_KEY,
+ threadId,
+ taskId,
+ STORE_TYPE,
+ STORE_NAME
+ )));
+ }
+
+ @Test
+ public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
+ setUp();
+ doNothing().when(inner).put(any(Bytes.class), any(byte[].class));
+ init();
+
+ metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+
+ final KafkaMetric metric = metric("put-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
+ setUp();
+
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+ init();
+
+ assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY));
+
+ final KafkaMetric metric = metric("get-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
+ setUp();
+ when(inner.putIfAbsent(any(Bytes.class),
any(byte[].class))).thenReturn(null);
+ init();
+
+ metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS);
+
+ final KafkaMetric metric = metric("put-if-absent-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
+ setUp();
+ doNothing().when(inner).putAll(any(List.class));
+ init();
+
+ metered.putAll(Collections.singletonList(KeyValue.pair(KEY,
VALUE_TIMESTAMP_HEADERS)));
+
+ final KafkaMetric metric = metric("put-all-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
+ setUp();
+
when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+ init();
+
+ metered.delete(KEY);
+
+ final KafkaMetric metric = metric("delete-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
+ setUp();
+ when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn(
+ new
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+ init();
+
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
iterator = metered.range(KEY, KEY)) {
+ assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+ assertFalse(iterator.hasNext());
+ }
+
+ final KafkaMetric metric = metric("range-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
+ setUp();
+ when(inner.all())
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+ init();
+
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
iterator = metered.all()) {
+ assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+ assertFalse(iterator.hasNext());
+ }
+
+ final KafkaMetric metric = metric(new MetricName("all-rate",
STORE_LEVEL_GROUP, "", tags));
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ @Test
+ public void shouldCommitInnerWhenCommitTimeRecords() {
+ setUp();
+ doNothing().when(inner).commit(Map.of());
+ init();
+
+ metered.commit(Map.of());
+
+ final KafkaMetric metric = metric("flush-rate");
+ assertTrue((Double) metric.metricValue() > 0);
+ }
+
+ private interface CachedKeyValueStore extends KeyValueStore<Bytes,
byte[]>, CachedStateStore<byte[], byte[]> { }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldSetFlushListenerOnWrappedCachingStore() {
+ setUpWithoutContext();
+ final CachedKeyValueStore cachedKeyValueStore =
mock(CachedKeyValueStore.class);
+
+
when(cachedKeyValueStore.setFlushListener(any(CacheFlushListener.class),
eq(false))).thenReturn(true);
+
+ metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ cachedKeyValueStore,
+ STORE_TYPE,
+ new MockTime(),
+ Serdes.String(),
+ new ValueTimestampHeadersSerde<>(Serdes.String()));
+ assertTrue(metered.setFlushListener(null, false));
+ }
+
+ @Test
+ public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
+ setUpWithoutContext();
+ assertFalse(metered.setFlushListener(null, false));
+ }
+
+ @Test
+ public void
shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
+ setUpWithExpectSerdes();
+ final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store =
new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ null,
+ null
+ );
+ store.init(context, inner);
+
+ try {
+ store.put("key", ValueTimestampHeaders.make(42L, 60000, new
RecordHeaders()));
+ } catch (final StreamsException exception) {
+ if (exception.getCause() instanceof ClassCastException) {
+ throw new AssertionError(
+ "Serdes are not correctly set from processor context.",
exception);
+ } else {
+ throw exception;
+ }
+ }
+ }
+
+ @Test
+ public void
shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
+ setUp();
+ final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store =
new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ Serdes.String(),
+ new ValueTimestampHeadersSerde<>(Serdes.Long())
+ );
+ store.init(context, inner);
+
+ try {
+ store.put("key", ValueTimestampHeaders.make(42L, 60000, new
RecordHeaders()));
+ } catch (final StreamsException exception) {
+ if (exception.getCause() instanceof ClassCastException) {
+ fail("Serdes are not correctly set from constructor
parameters.");
+ }
+ throw exception;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @Test
+ public void shouldTrackOpenIteratorsMetric() {
+ setUp();
+ when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+ init();
+
+ final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
+ assertNotNull(openIteratorsMetric);
+
+ assertEquals(0L, (Long) openIteratorsMetric.metricValue());
+
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
unused = metered.all()) {
+ assertEquals(1L, (Long) openIteratorsMetric.metricValue());
+ }
+
+ assertEquals(0L, (Long) openIteratorsMetric.metricValue());
+ }
+
+ @SuppressWarnings("unused")
+ @Test
+ public void shouldTimeIteratorDuration() {
+ setUp();
+ when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+ init();
+
+ final KafkaMetric iteratorDurationAvgMetric =
metric("iterator-duration-avg");
+ final KafkaMetric iteratorDurationMaxMetric =
metric("iterator-duration-max");
+ assertNotNull(iteratorDurationAvgMetric);
+ assertNotNull(iteratorDurationMaxMetric);
+
+ assertEquals(Double.NaN, (Double)
iteratorDurationAvgMetric.metricValue());
+ assertEquals(Double.NaN, (Double)
iteratorDurationMaxMetric.metricValue());
+
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
unused = metered.all()) {
+ // nothing to do, just close immediately
+ mockTime.sleep(2);
+ }
+
+ assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double)
iteratorDurationAvgMetric.metricValue());
+ assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double)
iteratorDurationMaxMetric.metricValue());
+
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
iterator = metered.all()) {
+ // nothing to do, just close immediately
+ mockTime.sleep(3);
+ }
+
+ assertEquals(2.5 * TimeUnit.MILLISECONDS.toNanos(1), (double)
iteratorDurationAvgMetric.metricValue());
+ assertEquals(3.0 * TimeUnit.MILLISECONDS.toNanos(1), (double)
iteratorDurationMaxMetric.metricValue());
+ }
+
+ @SuppressWarnings("unused")
+ @Test
+ public void shouldTrackOldestOpenIteratorTimestamp() {
+ setUp();
+ when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+ init();
+
+ final KafkaMetric oldestIteratorTimestampMetric =
metric("oldest-iterator-open-since-ms");
+ assertNotNull(oldestIteratorTimestampMetric);
+
+ KeyValueIterator<String, ValueTimestampHeaders<String>> second = null;
+ final long secondTimestamp;
+ try {
+ try (final KeyValueIterator<String, ValueTimestampHeaders<String>>
unused = metered.all()) {
+
+ final long oldestTimestamp = mockTime.milliseconds();
+ assertEquals(oldestTimestamp, (Long)
oldestIteratorTimestampMetric.metricValue());
+ mockTime.sleep(100);
+
+ // open a second iterator before closing the first to test
that we still produce the first iterator's timestamp
+ second = metered.all();
+ secondTimestamp = mockTime.milliseconds();
+ assertEquals(oldestTimestamp, (Long)
oldestIteratorTimestampMetric.metricValue());
+ mockTime.sleep(100);
+ }
+
+ // now that the first iterator is closed, check that the timestamp
has advanced to the still open second iterator
+ assertEquals(secondTimestamp, (Long)
oldestIteratorTimestampMetric.metricValue());
+ } finally {
+ if (second != null) {
+ second.close();
+ }
+ }
+ // now that all iterators are closed, the metric should be zero
+ assertEquals(0L, (Long) oldestIteratorTimestampMetric.metricValue());
+ }
+
+ private static RecordHeaders makeHeaders() {
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("header-key", "header-value".getBytes());
+ return headers;
+ }
+
+ private static byte[] serializeValueTimestampHeaders() {
+ final ValueTimestampHeadersSerializer<String> serializer = new
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+ return serializer.serialize("topic", VALUE_TIMESTAMP_HEADERS);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String
topic) {
+ final Serde<String> keySerde = mock(Serde.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+ final Serializer<ValueTimestampHeaders<String>> valueSerializer =
mock(Serializer.class);
+ when(keySerde.serializer()).thenReturn(keySerializer);
+ lenient().when(keySerializer.serialize(eq(topic),
any(RecordHeaders.class), eq(KEY))).thenReturn(KEY.getBytes());
+ when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+ lenient().when(valueDeserializer.deserialize(eq(topic),
any(RecordHeaders.class),
any(byte[].class))).thenReturn(VALUE_TIMESTAMP_HEADERS);
+ when(valueSerde.serializer()).thenReturn(valueSerializer);
+ lenient().when(valueSerializer.serialize(eq(topic),
any(RecordHeaders.class),
eq(VALUE_TIMESTAMP_HEADERS))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+ metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ metered.init(context, metered);
+
+ metered.get(KEY);
+ metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+ }
+
+ private KafkaMetric metric(final String name) {
+ return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "",
tags));
+ }
+
+ private KafkaMetric metric(final MetricName metricName) {
+ return this.metrics.metric(metricName);
+ }
+}