This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 4bacbce15a2 KAFKA-19959: Update session and window store with new
default when no open iterators (#21120)
4bacbce15a2 is described below
commit 4bacbce15a24d775f71a3080cb82c528bfaf83b0
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Dec 11 16:43:02 2025 -0500
KAFKA-19959: Update session and window store with new default when no open
iterators (#21120)
When applying the NPE fix for the `oldest-iterator-open-since-ms` metric
we only applied this to the `MeteredKeyValueStore`. This fix needs to be
applied to `MeteredSessionStore` and `MeteredWindowStore` as well.
Reviewers: Matthias Sax <[email protected]>
---
.../kafka/streams/state/internals/MeteredSessionStore.java | 7 ++++++-
.../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 +++++++--
.../kafka/streams/state/internals/MeteredKeyValueStoreTest.java | 6 ++++--
.../kafka/streams/state/internals/MeteredSessionStoreTest.java | 6 +++---
.../kafka/streams/state/internals/MeteredWindowStoreTest.java | 6 +++---
5 files changed, 23 insertions(+), 11 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 7794a6ebc51..37673366733 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,6 +47,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -125,9 +126,13 @@ public class MeteredSessionStore<K, V>
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> {
+ try {
final Iterator<MeteredIterator> openIteratorsIterator =
openIterators.iterator();
- return openIteratorsIterator.hasNext() ?
openIteratorsIterator.next().startTimestamp() : null;
+ return openIteratorsIterator.hasNext() ?
openIteratorsIterator.next().startTimestamp() : 0L;
+ } catch (final NoSuchElementException e) {
+ return 0L;
}
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 1ba37da6dab..de924fbbb63 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -50,6 +50,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -144,10 +145,14 @@ public class MeteredWindowStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> {
+ (config, now) -> {
+ try {
final Iterator<MeteredIterator> openIteratorsIterator =
openIterators.iterator();
- return openIteratorsIterator.hasNext() ?
openIteratorsIterator.next().startTimestamp() : null;
+ return openIteratorsIterator.hasNext() ?
openIteratorsIterator.next().startTimestamp() : 0L;
+ } catch (final NoSuchElementException e) {
+ return 0L;
}
+ }
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f92304f360b..0bfcfc2e2ae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -530,6 +530,8 @@ public class MeteredKeyValueStoreTest {
final KafkaMetric oldestIteratorTimestampMetric =
metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
+
KeyValueIterator<String, String> second = null;
final long secondTimestamp;
try {
@@ -546,14 +548,14 @@ public class MeteredKeyValueStoreTest {
}
// now that the first iterator is closed, check that the timestamp
has advanced to the still open second iterator
- assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
+ assertThat(oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
// no open iterators left, timestamp should be reset to 0
- assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(0L));
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
private KafkaMetric metric(final MetricName metricName) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f22780636f2..e19211ed38a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -715,7 +715,7 @@ public class MeteredSessionStoreTest {
final KafkaMetric oldestIteratorTimestampMetric =
metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
- assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
KeyValueIterator<Windowed<String>, String> second = null;
final long secondTimestamp;
@@ -733,14 +733,14 @@ public class MeteredSessionStoreTest {
}
// now that the first iterator is closed, check that the timestamp
has advanced to the still open second iterator
- assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
+ assertThat(oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
- assertThat((Integer) oldestIteratorTimestampMetric.metricValue(),
nullValue());
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
private KafkaMetric metric(final String name) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 3cf17ff830e..e643f60ec3d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -499,7 +499,7 @@ public class MeteredWindowStoreTest {
final KafkaMetric oldestIteratorTimestampMetric =
metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
- assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
KeyValueIterator<Windowed<String>, String> second = null;
final long secondTimestamp;
@@ -517,14 +517,14 @@ public class MeteredWindowStoreTest {
}
// now that the first iterator is closed, check that the timestamp
has advanced to the still open second iterator
- assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
+ assertThat(oldestIteratorTimestampMetric.metricValue(),
equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
- assertThat((Integer) oldestIteratorTimestampMetric.metricValue(),
nullValue());
+ assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
private KafkaMetric metric(final String name) {