This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28e7803037f KAFKA-19744: Move restore time calculation to
ChangelogMetadata (#20613)
28e7803037f is described below
commit 28e7803037fb48770cd4877c9bf12e42b7c52441
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Thu Oct 2 21:24:36 2025 -0700
KAFKA-19744: Move restore time calculation to ChangelogMetadata (#20613)
- Move restore time calculation to ChangelogMetadata.
- Introduced a new interface to propagate the calculated value to the
stores to avoid modifications in the public interface.
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/RestoreIntegrationTest.java | 51 +++++++++++++++++++-
.../integration/utils/IntegrationTestUtils.java | 9 ++++
.../processor/internals/StoreChangelogReader.java | 11 +++++
.../state/internals/MeteredKeyValueStore.java | 15 ++++--
.../state/internals/MeteredSessionStore.java | 14 ++++--
.../streams/state/internals/MeteredStateStore.java | 22 +++++++++
.../state/internals/MeteredWindowStore.java | 12 +++--
.../internals/StoreChangelogReaderTest.java | 55 ++++++++++++++++++++++
.../state/internals/MeteredKeyValueStoreTest.java | 10 ++--
.../state/internals/MeteredSessionStoreTest.java | 9 ++--
.../MeteredVersionedKeyValueStoreTest.java | 9 +---
.../state/internals/MeteredWindowStoreTest.java | 11 +++--
12 files changed, 196 insertions(+), 32 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index e85ac344157..7ffeb6ac035 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -112,6 +112,9 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -685,6 +688,52 @@ public class RestoreIntegrationTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void shouldRecordRestoreMetrics(final boolean useNewProtocol)
throws Exception {
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Properties props = props();
+
+ if (useNewProtocol) {
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
+ }
+
+ props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+
+ createStateForRestoration(inputStream, 10000);
+
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ builder.table(inputStream, Consumed.with(Serdes.Integer(),
Serdes.Integer()), Materialized.as("store"))
+ .toStream()
+ .foreach((key, value) -> {
+ if (numReceived.incrementAndGet() == numberOfKeys) {
+ shutdownLatch.countDown();
+ }
+ });
+
+ kafkaStreams = new KafkaStreams(builder.build(), props);
+
+ final AtomicLong restored = new AtomicLong(0);
+ final TrackingStateRestoreListener restoreListener = new
TrackingStateRestoreListener(restored);
+ kafkaStreams.setGlobalStateRestoreListener(restoreListener);
+ kafkaStreams.start();
+
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(numberOfKeys));
+
+ final Map<String, Long> taskIdToMetricValue =
kafkaStreams.metrics().entrySet().stream()
+ .filter(e -> e.getKey().name().equals("restore-latency-max"))
+ .collect(Collectors.toMap(e ->
e.getKey().tags().get("task-id"), e -> ((Double)
e.getValue().metricValue()).longValue()));
+
+ for (final Map.Entry<TopicPartition, Long> entry :
restoreListener.changelogToRestoreTime().entrySet()) {
+ final long lowerBound = entry.getValue() -
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+ final long upperBound = entry.getValue() +
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+ assertThat(taskIdToMetricValue.get("0_" +
entry.getKey().partition()), allOf(greaterThanOrEqualTo(lowerBound),
lessThanOrEqualTo(upperBound)));
+ }
+ }
+
private void validateReceivedMessages(final List<KeyValue<Integer,
Integer>> expectedRecords,
final String outputTopic) throws
Exception {
final Properties consumerProperties = new Properties();
@@ -971,4 +1020,4 @@ public class RestoreIntegrationTest {
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a7b3f838f2a..44ea3c7fc89 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1337,6 +1337,8 @@ public class IntegrationTestUtils {
public final Map<TopicPartition, AtomicLong> changelogToStartOffset =
new ConcurrentHashMap<>();
public final Map<TopicPartition, AtomicLong> changelogToEndOffset =
new ConcurrentHashMap<>();
public final Map<TopicPartition, AtomicLong>
changelogToTotalNumRestored = new ConcurrentHashMap<>();
+ private final Map<TopicPartition, AtomicLong>
changelogToRestoreStartTime = new ConcurrentHashMap<>();
+ private final Map<TopicPartition, AtomicLong>
changelogToRestoreEndTime = new ConcurrentHashMap<>();
private final AtomicLong restored;
public TrackingStateRestoreListener() {
@@ -1355,6 +1357,7 @@ public class IntegrationTestUtils {
changelogToStartOffset.put(topicPartition, new
AtomicLong(startingOffset));
changelogToEndOffset.put(topicPartition, new
AtomicLong(endingOffset));
changelogToTotalNumRestored.put(topicPartition, new
AtomicLong(0L));
+ changelogToRestoreStartTime.put(topicPartition, new
AtomicLong(System.nanoTime()));
}
@Override
@@ -1372,6 +1375,7 @@ public class IntegrationTestUtils {
if (restored != null) {
restored.addAndGet(totalRestored);
}
+ changelogToRestoreEndTime.put(topicPartition, new
AtomicLong(System.nanoTime()));
}
public long totalNumRestored() {
@@ -1381,6 +1385,11 @@ public class IntegrationTestUtils {
}
return totalNumRestored;
}
+
+ public Map<TopicPartition, Long> changelogToRestoreTime() {
+ return changelogToRestoreStartTime.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
changelogToRestoreEndTime.get(e.getKey()).get() - e.getValue().get()));
+ }
}
public static class TrackingStandbyUpdateListener implements
StandbyUpdateListener {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c91e9b38b99..5e09ceb62da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.state.internals.MeteredStateStore;
import org.slf4j.Logger;
@@ -138,6 +139,8 @@ public class StoreChangelogReader implements
ChangelogReader {
// either due to limit offset (standby) or committed end offset
(active)
private int bufferedLimitIndex;
+ private long restoreStartTimeNs;
+
private ChangelogMetadata(final StateStoreMetadata storeMetadata,
final ProcessorStateManager stateManager) {
this.changelogState = ChangelogState.REGISTERED;
this.storeMetadata = storeMetadata;
@@ -188,6 +191,10 @@ public class StoreChangelogReader implements
ChangelogReader {
int bufferedLimitIndex() {
return bufferedLimitIndex;
}
+
+ long calculateRestoreTime(final long restoreEndTimeNs) {
+ return restoreEndTimeNs - restoreStartTimeNs;
+ }
}
private static final long DEFAULT_OFFSET_UPDATE_MS =
Duration.ofMinutes(5L).toMillis();
@@ -695,6 +702,9 @@ public class StoreChangelogReader implements
ChangelogReader {
changelogMetadata.transitTo(ChangelogState.COMPLETED);
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
+ if (storeMetadata.store() instanceof MeteredStateStore) {
+ ((MeteredStateStore)
storeMetadata.store()).recordRestoreTime(changelogMetadata.calculateRestoreTime(time.nanoseconds()));
+ }
try {
stateRestoreListener.onRestoreEnd(partition, storeName,
changelogMetadata.totalRestored);
@@ -1026,6 +1036,7 @@ public class StoreChangelogReader implements
ChangelogReader {
// no records to restore; in this case we just initialize the
sensor to zero
final long recordsToRestore =
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
task.recordRestoration(time, recordsToRestore, true);
+ changelogMetadata.restoreStartTimeNs = time.nanoseconds();
} else if (changelogMetadata.stateManager.taskType() ==
TaskType.STANDBY) {
try {
standbyUpdateListener.onUpdateStart(partition, storeName,
startOffset);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0962033b7ef..9c033d6bbd5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -69,7 +69,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/
public class MeteredKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
- implements KeyValueStore<K, V> {
+ implements KeyValueStore<K, V>, MeteredStateStore {
final Serde<K> keySerde;
final Serde<V> valueSerde;
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V>
protected InternalProcessorContext<?, ?> internalContext;
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
+ private Sensor restoreSensor;
protected OpenIterators openIterators;
@@ -128,11 +129,10 @@ public class MeteredKeyValueStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
- final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
- // register and possibly restore the state from the logs
- maybeMeasureLatency(() -> super.init(stateStoreContext, root), time,
restoreSensor);
+ restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
+
+ super.init(stateStoreContext, root);
}
private void registerMetrics() {
@@ -152,6 +152,11 @@ public class MeteredKeyValueStore<K, V>
openIterators = new OpenIterators(taskId, metricsScope, name(),
streamsMetrics);
}
+ @Override
+ public void recordRestoreTime(final long restoreTimeNs) {
+ restoreSensor.record(restoreTimeNs);
+ }
+
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde,
final SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 234ac1220f7..7794a6ebc51 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -57,7 +57,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
public class MeteredSessionStore<K, V>
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
- implements SessionStore<K, V> {
+ implements SessionStore<K, V>, MeteredStateStore {
private final String metricsScope;
private final Serde<K> keySerde;
@@ -73,6 +73,7 @@ public class MeteredSessionStore<K, V>
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
+ private Sensor restoreSensor;
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -108,11 +109,9 @@ public class MeteredSessionStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
- final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
+ restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
- // register and possibly restore the state from the logs
- maybeMeasureLatency(() -> super.init(stateStoreContext, root), time,
restoreSensor);
+ super.init(stateStoreContext, root);
}
private void registerMetrics() {
@@ -132,6 +131,11 @@ public class MeteredSessionStore<K, V>
);
}
+ @Override
+ public void recordRestoreTime(final long restoreTimeNs) {
+ restoreSensor.record(restoreTimeNs);
+ }
+
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic =
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
new file mode 100644
index 00000000000..6b9f4eeb8f2
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public interface MeteredStateStore {
+
+ void recordRestoreTime(final long restoreTimeNs);
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2da877453ce..1ba37da6dab 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -60,7 +60,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
- implements WindowStore<K, V> {
+ implements WindowStore<K, V>, MeteredStateStore {
private final long windowSizeMs;
private final String metricsScope;
@@ -76,6 +76,7 @@ public class MeteredWindowStore<K, V>
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
+ private Sensor restoreSensor;
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -124,8 +125,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
- final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
+
+ restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time,
restoreSensor);
@@ -150,6 +151,11 @@ public class MeteredWindowStore<K, V>
);
}
+ @Override
+ public void recordRestoreTime(final long restoreTimeNs) {
+ restoreSensor.record(restoreTimeNs);
+ }
+
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic =
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 69655b4d642..99a2b2519f9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
@@ -89,7 +90,9 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -1364,6 +1367,58 @@ public class StoreChangelogReaderTest {
}
}
+ @Test
+ public void shouldCallRecordRestoreTimeAtTheEndOfRestore() {
+ setupActiveStateManager();
+
+ final MeteredKeyValueStore<?, ?> meteredStateStore =
mock(MeteredKeyValueStore.class);
+
+ when(storeMetadata.changelogPartition()).thenReturn(tp);
+ when(storeMetadata.store()).thenReturn(meteredStateStore);
+ when(meteredStateStore.name()).thenReturn(storeName);
+ final TaskId taskId = new TaskId(0, 0);
+
+ when(storeMetadata.offset()).thenReturn(0L);
+ when(activeStateManager.taskId()).thenReturn(taskId);
+
+ setupConsumer(2, tp);
+ consumer.updateEndOffsets(Collections.singletonMap(tp, 2L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 2L));
+
+ changelogReader.register(tp, activeStateManager);
+
+ changelogReader.restore(Collections.singletonMap(taskId,
mock(Task.class)));
+
+ assertEquals(1L,
changelogReader.changelogMetadata(tp).totalRestored());
+ verify(meteredStateStore).recordRestoreTime(anyLong());
+ }
+
+ @Test
+ public void shouldNotCallRecordRestoreTimeIfRestoreDoesNotComplete() {
+ setupActiveStateManager();
+
+ final MeteredKeyValueStore<?, ?> meteredStateStore =
mock(MeteredKeyValueStore.class);
+
+ when(storeMetadata.changelogPartition()).thenReturn(tp);
+ when(storeMetadata.store()).thenReturn(meteredStateStore);
+ when(meteredStateStore.name()).thenReturn(storeName);
+ final TaskId taskId = new TaskId(0, 0);
+
+ when(storeMetadata.offset()).thenReturn(0L);
+ when(activeStateManager.taskId()).thenReturn(taskId);
+
+ setupConsumer(2, tp);
+ consumer.updateEndOffsets(Collections.singletonMap(tp, 3L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 3L));
+
+ changelogReader.register(tp, activeStateManager);
+
+ changelogReader.restore(Collections.singletonMap(taskId,
mock(Task.class)));
+
+ assertEquals(1L,
changelogReader.changelogMetadata(tp).totalRestored());
+ verify(meteredStateStore, never()).recordRestoreTime(anyLong());
+ }
+
private void setupConsumer(final long messages, final TopicPartition
topicPartition) {
assignPartition(messages, topicPartition);
addRecords(messages, topicPartition);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 294af3944f2..1ba655a75ce 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -58,7 +58,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -210,16 +209,19 @@ public class MeteredKeyValueStoreTest {
}
@Test
- public void shouldRecordRestoreLatencyOnInit() {
+ public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
setUp();
doNothing().when(inner).init(context, metered);
init();
+ final long restoreTimeNs = 1000L;
+ metered.recordRestoreTime(restoreTimeNs);
+
// it suffices to verify one restore metric since all restore metrics
are recorded by the same sensor
// and the sensor is tested elsewhere
- final KafkaMetric metric = metric("restore-rate");
- assertThat((Double) metric.metricValue(), greaterThan(0.0));
+ final KafkaMetric metric = metric("restore-latency-max");
+ assertThat((Double) metric.metricValue(), equalTo((double)
restoreTimeNs));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index ee1b686dade..f8b08a532d1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -458,14 +458,17 @@ public class MeteredSessionStoreTest {
}
@Test
- public void shouldRecordRestoreTimeOnInit() {
+ public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
setUp();
init();
+ final long restoreTimeNs = 1000L;
+ store.recordRestoreTime(restoreTimeNs);
+
// it suffices to verify one restore metric since all restore metrics
are recorded by the same sensor
// and the sensor is tested elsewhere
- final KafkaMetric metric = metric("restore-rate");
- assertTrue((Double) metric.metricValue() > 0);
+ final KafkaMetric metric = metric("restore-latency-max");
+ assertThat((Double) metric.metricValue(), equalTo((double)
restoreTimeNs));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 8e8e02b2722..5c4509bc7a3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -183,13 +183,6 @@ public class MeteredVersionedKeyValueStoreTest {
verify(valueSerializer).serialize(changelogTopicName, VALUE);
}
- @Test
- public void shouldRecordMetricsOnInit() {
- // init is called in setUp(). it suffices to verify one restore metric
since all restore
- // metrics are recorded by the same sensor, and the sensor is tested
elsewhere.
- assertThat((Double) getMetric("restore-rate").metricValue(),
greaterThan(0.0));
- }
-
@Test
public void shouldDelegateAndRecordMetricsOnPut() {
when(inner.put(RAW_KEY, RAW_VALUE,
TIMESTAMP)).thenReturn(PUT_RETURN_CODE_VALID_TO_UNDEFINED);
@@ -473,4 +466,4 @@ public class MeteredVersionedKeyValueStoreTest {
.filter(name -> name.group().equals(STORE_LEVEL_GROUP) &&
name.tags().equals(tags))
.collect(Collectors.toList());
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1c8935d1e1c..2726ce26aa7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -210,14 +210,19 @@ public class MeteredWindowStoreTest {
}
@Test
- public void shouldRecordRestoreLatencyOnInit() {
+ public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
+ setUp();
doNothing().when(innerStoreMock).init(context, store);
+
store.init(context, store);
+ final long restoreTimeNs = 1000L;
+ store.recordRestoreTime(restoreTimeNs);
+
// it suffices to verify one restore metric since all restore metrics
are recorded by the same sensor
// and the sensor is tested elsewhere
- final KafkaMetric metric = metric("restore-rate");
- assertThat((Double) metric.metricValue(), greaterThan(0.0));
+ final KafkaMetric metric = metric("restore-latency-max");
+ assertThat((Double) metric.metricValue(), equalTo((double)
restoreTimeNs));
}
@Test