This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28e7803037f KAFKA-19744: Move restore time calculation to 
ChangelogMetadata (#20613)
28e7803037f is described below

commit 28e7803037fb48770cd4877c9bf12e42b7c52441
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Thu Oct 2 21:24:36 2025 -0700

    KAFKA-19744: Move restore time calculation to ChangelogMetadata (#20613)
    
    - Move restore time calculation to ChangelogMetadata.
    - Introduced a new interface to propagate the calculated value to the
    stores to avoid modifications in the public interface.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../integration/RestoreIntegrationTest.java        | 51 +++++++++++++++++++-
 .../integration/utils/IntegrationTestUtils.java    |  9 ++++
 .../processor/internals/StoreChangelogReader.java  | 11 +++++
 .../state/internals/MeteredKeyValueStore.java      | 15 ++++--
 .../state/internals/MeteredSessionStore.java       | 14 ++++--
 .../streams/state/internals/MeteredStateStore.java | 22 +++++++++
 .../state/internals/MeteredWindowStore.java        | 12 +++--
 .../internals/StoreChangelogReaderTest.java        | 55 ++++++++++++++++++++++
 .../state/internals/MeteredKeyValueStoreTest.java  | 10 ++--
 .../state/internals/MeteredSessionStoreTest.java   |  9 ++--
 .../MeteredVersionedKeyValueStoreTest.java         |  9 +---
 .../state/internals/MeteredWindowStoreTest.java    | 11 +++--
 12 files changed, 196 insertions(+), 32 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index e85ac344157..7ffeb6ac035 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -112,6 +112,9 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -685,6 +688,52 @@ public class RestoreIntegrationTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldRecordRestoreMetrics(final boolean useNewProtocol) 
throws Exception {
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Properties props = props();
+
+        if (useNewProtocol) {
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        }
+
+        props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+
+        createStateForRestoration(inputStream, 10000);
+
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+        builder.table(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.Integer()), Materialized.as("store"))
+                .toStream()
+                .foreach((key, value) -> {
+                    if (numReceived.incrementAndGet() == numberOfKeys) {
+                        shutdownLatch.countDown();
+                    }
+                });
+
+        kafkaStreams = new KafkaStreams(builder.build(), props);
+
+        final AtomicLong restored = new AtomicLong(0);
+        final TrackingStateRestoreListener restoreListener = new 
TrackingStateRestoreListener(restored);
+        kafkaStreams.setGlobalStateRestoreListener(restoreListener);
+        kafkaStreams.start();
+
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(numberOfKeys));
+
+        final Map<String, Long> taskIdToMetricValue = 
kafkaStreams.metrics().entrySet().stream()
+                .filter(e -> e.getKey().name().equals("restore-latency-max"))
+                .collect(Collectors.toMap(e -> 
e.getKey().tags().get("task-id"), e -> ((Double) 
e.getValue().metricValue()).longValue()));
+
+        for (final Map.Entry<TopicPartition, Long> entry : 
restoreListener.changelogToRestoreTime().entrySet()) {
+            final long lowerBound = entry.getValue() - 
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+            final long upperBound = entry.getValue() + 
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+            assertThat(taskIdToMetricValue.get("0_" + 
entry.getKey().partition()), allOf(greaterThanOrEqualTo(lowerBound), 
lessThanOrEqualTo(upperBound)));
+        }
+    }
+
     private void validateReceivedMessages(final List<KeyValue<Integer, 
Integer>> expectedRecords,
                                           final String outputTopic) throws 
Exception {
         final Properties consumerProperties = new Properties();
@@ -971,4 +1020,4 @@ public class RestoreIntegrationTest {
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a7b3f838f2a..44ea3c7fc89 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1337,6 +1337,8 @@ public class IntegrationTestUtils {
         public final Map<TopicPartition, AtomicLong> changelogToStartOffset = 
new ConcurrentHashMap<>();
         public final Map<TopicPartition, AtomicLong> changelogToEndOffset = 
new ConcurrentHashMap<>();
         public final Map<TopicPartition, AtomicLong> 
changelogToTotalNumRestored = new ConcurrentHashMap<>();
+        private final Map<TopicPartition, AtomicLong> 
changelogToRestoreStartTime = new ConcurrentHashMap<>();
+        private final Map<TopicPartition, AtomicLong> 
changelogToRestoreEndTime = new ConcurrentHashMap<>();
         private final AtomicLong restored;
 
         public TrackingStateRestoreListener() {
@@ -1355,6 +1357,7 @@ public class IntegrationTestUtils {
             changelogToStartOffset.put(topicPartition, new 
AtomicLong(startingOffset));
             changelogToEndOffset.put(topicPartition, new 
AtomicLong(endingOffset));
             changelogToTotalNumRestored.put(topicPartition, new 
AtomicLong(0L));
+            changelogToRestoreStartTime.put(topicPartition, new 
AtomicLong(System.nanoTime()));
         }
 
         @Override
@@ -1372,6 +1375,7 @@ public class IntegrationTestUtils {
             if (restored != null) {
                 restored.addAndGet(totalRestored);
             }
+            changelogToRestoreEndTime.put(topicPartition, new 
AtomicLong(System.nanoTime()));
         }
 
         public long totalNumRestored() {
@@ -1381,6 +1385,11 @@ public class IntegrationTestUtils {
             }
             return totalNumRestored;
         }
+
+        public Map<TopicPartition, Long> changelogToRestoreTime() {
+            return changelogToRestoreStartTime.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
changelogToRestoreEndTime.get(e.getKey()).get() - e.getValue().get()));
+        }
     }
 
     public static class TrackingStandbyUpdateListener implements 
StandbyUpdateListener {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c91e9b38b99..5e09ceb62da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.state.internals.MeteredStateStore;
 
 import org.slf4j.Logger;
 
@@ -138,6 +139,8 @@ public class StoreChangelogReader implements 
ChangelogReader {
         // either due to limit offset (standby) or committed end offset 
(active)
         private int bufferedLimitIndex;
 
+        private long restoreStartTimeNs;
+
         private ChangelogMetadata(final StateStoreMetadata storeMetadata, 
final ProcessorStateManager stateManager) {
             this.changelogState = ChangelogState.REGISTERED;
             this.storeMetadata = storeMetadata;
@@ -188,6 +191,10 @@ public class StoreChangelogReader implements 
ChangelogReader {
         int bufferedLimitIndex() {
             return bufferedLimitIndex;
         }
+
+        long calculateRestoreTime(final long restoreEndTimeNs) {
+            return restoreEndTimeNs - restoreStartTimeNs;
+        }
     }
 
     private static final long DEFAULT_OFFSET_UPDATE_MS = 
Duration.ofMinutes(5L).toMillis();
@@ -695,6 +702,9 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
             changelogMetadata.transitTo(ChangelogState.COMPLETED);
             
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
+            if (storeMetadata.store() instanceof MeteredStateStore) {
+                ((MeteredStateStore) 
storeMetadata.store()).recordRestoreTime(changelogMetadata.calculateRestoreTime(time.nanoseconds()));
+            }
 
             try {
                 stateRestoreListener.onRestoreEnd(partition, storeName, 
changelogMetadata.totalRestored);
@@ -1026,6 +1036,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
                 // no records to restore; in this case we just initialize the 
sensor to zero
                 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
                 task.recordRestoration(time, recordsToRestore, true);
+                changelogMetadata.restoreStartTimeNs = time.nanoseconds();
             }  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY) {
                 try {
                     standbyUpdateListener.onUpdateStart(partition, storeName, 
startOffset);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0962033b7ef..9c033d6bbd5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -69,7 +69,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
  */
 public class MeteredKeyValueStore<K, V>
     extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
-    implements KeyValueStore<K, V> {
+    implements KeyValueStore<K, V>, MeteredStateStore {
 
     final Serde<K> keySerde;
     final Serde<V> valueSerde;
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V>
     protected InternalProcessorContext<?, ?> internalContext;
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
+    private Sensor restoreSensor;
 
     protected OpenIterators openIterators;
 
@@ -128,11 +129,10 @@ public class MeteredKeyValueStore<K, V>
         streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
 
         registerMetrics();
-        final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
 
-        // register and possibly restore the state from the logs
-        maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, 
restoreSensor);
+        restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
+
+        super.init(stateStoreContext, root);
     }
 
     private void registerMetrics() {
@@ -152,6 +152,11 @@ public class MeteredKeyValueStore<K, V>
         openIterators = new OpenIterators(taskId, metricsScope, name(), 
streamsMetrics);
     }
 
+    @Override
+    public void recordRestoreTime(final long restoreTimeNs) {
+        restoreSensor.record(restoreTimeNs);
+    }
+
     protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, 
final SerdeGetter getter) {
         return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 234ac1220f7..7794a6ebc51 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -57,7 +57,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 
 public class MeteredSessionStore<K, V>
     extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
-    implements SessionStore<K, V> {
+    implements SessionStore<K, V>, MeteredStateStore {
 
     private final String metricsScope;
     private final Serde<K> keySerde;
@@ -73,6 +73,7 @@ public class MeteredSessionStore<K, V>
     private Sensor iteratorDurationSensor;
     private InternalProcessorContext<?, ?> internalContext;
     private TaskId taskId;
+    private Sensor restoreSensor;
 
     private final LongAdder numOpenIterators = new LongAdder();
     private final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -108,11 +109,9 @@ public class MeteredSessionStore<K, V>
         streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
 
         registerMetrics();
-        final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
+        restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
 
-        // register and possibly restore the state from the logs
-        maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, 
restoreSensor);
+        super.init(stateStoreContext, root);
     }
 
     private void registerMetrics() {
@@ -132,6 +131,11 @@ public class MeteredSessionStore<K, V>
         );
     }
 
+    @Override
+    public void recordRestoreTime(final long restoreTimeNs) {
+        restoreSensor.record(restoreTimeNs);
+    }
+
     private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
new file mode 100644
index 00000000000..6b9f4eeb8f2
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public interface MeteredStateStore {
+
+    void recordRestoreTime(final long restoreTimeNs);
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2da877453ce..1ba37da6dab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -60,7 +60,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 
 public class MeteredWindowStore<K, V>
     extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
-    implements WindowStore<K, V> {
+    implements WindowStore<K, V>, MeteredStateStore {
 
     private final long windowSizeMs;
     private final String metricsScope;
@@ -76,6 +76,7 @@ public class MeteredWindowStore<K, V>
     private Sensor iteratorDurationSensor;
     private InternalProcessorContext<?, ?> internalContext;
     private TaskId taskId;
+    private Sensor restoreSensor;
 
     private final LongAdder numOpenIterators = new LongAdder();
     private final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -124,8 +125,8 @@ public class MeteredWindowStore<K, V>
         streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
 
         registerMetrics();
-        final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
+
+        restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, 
restoreSensor);
@@ -150,6 +151,11 @@ public class MeteredWindowStore<K, V>
         );
     }
 
+    @Override
+    public void recordRestoreTime(final long restoreTimeNs) {
+        restoreSensor.record(restoreTimeNs);
+    }
+
     private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 69655b4d642..99a2b2519f9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.test.MockStandbyUpdateListener;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -89,7 +90,9 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -1364,6 +1367,58 @@ public class StoreChangelogReaderTest {
         }
     }
 
+    @Test
+    public void shouldCallRecordRestoreTimeAtTheEndOfRestore() {
+        setupActiveStateManager();
+
+        final MeteredKeyValueStore<?, ?> meteredStateStore = 
mock(MeteredKeyValueStore.class);
+
+        when(storeMetadata.changelogPartition()).thenReturn(tp);
+        when(storeMetadata.store()).thenReturn(meteredStateStore);
+        when(meteredStateStore.name()).thenReturn(storeName);
+        final TaskId taskId = new TaskId(0, 0);
+
+        when(storeMetadata.offset()).thenReturn(0L);
+        when(activeStateManager.taskId()).thenReturn(taskId);
+
+        setupConsumer(2, tp);
+        consumer.updateEndOffsets(Collections.singletonMap(tp, 2L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 2L));
+
+        changelogReader.register(tp, activeStateManager);
+
+        changelogReader.restore(Collections.singletonMap(taskId, 
mock(Task.class)));
+
+        assertEquals(1L, 
changelogReader.changelogMetadata(tp).totalRestored());
+        verify(meteredStateStore).recordRestoreTime(anyLong());
+    }
+
+    @Test
+    public void shouldNotCallRecordRestoreTimeIfRestoreDoesNotComplete() {
+        setupActiveStateManager();
+
+        final MeteredKeyValueStore<?, ?> meteredStateStore = 
mock(MeteredKeyValueStore.class);
+
+        when(storeMetadata.changelogPartition()).thenReturn(tp);
+        when(storeMetadata.store()).thenReturn(meteredStateStore);
+        when(meteredStateStore.name()).thenReturn(storeName);
+        final TaskId taskId = new TaskId(0, 0);
+
+        when(storeMetadata.offset()).thenReturn(0L);
+        when(activeStateManager.taskId()).thenReturn(taskId);
+
+        setupConsumer(2, tp);
+        consumer.updateEndOffsets(Collections.singletonMap(tp, 3L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 3L));
+
+        changelogReader.register(tp, activeStateManager);
+
+        changelogReader.restore(Collections.singletonMap(taskId, 
mock(Task.class)));
+
+        assertEquals(1L, 
changelogReader.changelogMetadata(tp).totalRestored());
+        verify(meteredStateStore, never()).recordRestoreTime(anyLong());
+    }
+
     private void setupConsumer(final long messages, final TopicPartition 
topicPartition) {
         assignPartition(messages, topicPartition);
         addRecords(messages, topicPartition);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 294af3944f2..1ba655a75ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -58,7 +58,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -210,16 +209,19 @@ public class MeteredKeyValueStoreTest {
     }
 
     @Test
-    public void shouldRecordRestoreLatencyOnInit() {
+    public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
         setUp();
         doNothing().when(inner).init(context, metered);
 
         init();
 
+        final long restoreTimeNs = 1000L;
+        metered.recordRestoreTime(restoreTimeNs);
+
         // it suffices to verify one restore metric since all restore metrics 
are recorded by the same sensor
         // and the sensor is tested elsewhere
-        final KafkaMetric metric = metric("restore-rate");
-        assertThat((Double) metric.metricValue(), greaterThan(0.0));
+        final KafkaMetric metric = metric("restore-latency-max");
+        assertThat((Double) metric.metricValue(), equalTo((double) 
restoreTimeNs));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index ee1b686dade..f8b08a532d1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -458,14 +458,17 @@ public class MeteredSessionStoreTest {
     }
 
     @Test
-    public void shouldRecordRestoreTimeOnInit() {
+    public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
         setUp();
         init();
 
+        final long restoreTimeNs = 1000L;
+        store.recordRestoreTime(restoreTimeNs);
+
         // it suffices to verify one restore metric since all restore metrics 
are recorded by the same sensor
         // and the sensor is tested elsewhere
-        final KafkaMetric metric = metric("restore-rate");
-        assertTrue((Double) metric.metricValue() > 0);
+        final KafkaMetric metric = metric("restore-latency-max");
+        assertThat((Double) metric.metricValue(), equalTo((double) 
restoreTimeNs));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 8e8e02b2722..5c4509bc7a3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -183,13 +183,6 @@ public class MeteredVersionedKeyValueStoreTest {
         verify(valueSerializer).serialize(changelogTopicName, VALUE);
     }
 
-    @Test
-    public void shouldRecordMetricsOnInit() {
-        // init is called in setUp(). it suffices to verify one restore metric 
since all restore
-        // metrics are recorded by the same sensor, and the sensor is tested 
elsewhere.
-        assertThat((Double) getMetric("restore-rate").metricValue(), 
greaterThan(0.0));
-    }
-
     @Test
     public void shouldDelegateAndRecordMetricsOnPut() {
         when(inner.put(RAW_KEY, RAW_VALUE, 
TIMESTAMP)).thenReturn(PUT_RETURN_CODE_VALID_TO_UNDEFINED);
@@ -473,4 +466,4 @@ public class MeteredVersionedKeyValueStoreTest {
             .filter(name -> name.group().equals(STORE_LEVEL_GROUP) && 
name.tags().equals(tags))
             .collect(Collectors.toList());
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1c8935d1e1c..2726ce26aa7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -210,14 +210,19 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
-    public void shouldRecordRestoreLatencyOnInit() {
+    public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
+        setUp();
         doNothing().when(innerStoreMock).init(context, store);
+
         store.init(context, store);
 
+        final long restoreTimeNs = 1000L;
+        store.recordRestoreTime(restoreTimeNs);
+
         // it suffices to verify one restore metric since all restore metrics 
are recorded by the same sensor
         // and the sensor is tested elsewhere
-        final KafkaMetric metric = metric("restore-rate");
-        assertThat((Double) metric.metricValue(), greaterThan(0.0));
+        final KafkaMetric metric = metric("restore-latency-max");
+        assertThat((Double) metric.metricValue(), equalTo((double) 
restoreTimeNs));
     }
 
     @Test

Reply via email to