This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 6bffa8bf663 KAFKA-17954: Error getting oldest-iterator-open-since-ms
from JMX (#17713)
6bffa8bf663 is described below
commit 6bffa8bf663d1adf3bfd6d3d1ca5ec87e0448d71
Author: Nick Telford <[email protected]>
AuthorDate: Tue Nov 19 01:45:49 2024 +0000
KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713)
The thread that evaluates the gauge for the oldest-iterator-open-since-ms
runs concurrently
with threads that open/close iterators (stream threads and interactive
query threads). This PR
fixed a race condition between `openIterators.isEmpty()` and
`openIterators.first()`, by catching
a potential exception. Because we except the race condition to be rare, we
rather catch the
exception in favor of introducing a guard via locking.
Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../kafka/streams/state/internals/MeteredKeyValueStore.java | 9 ++++++++-
.../kafka/streams/state/internals/MeteredSessionStore.java | 9 ++++++++-
.../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 ++++++++-
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index f828c502877..1d2933c528d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -53,6 +53,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -173,7 +174,13 @@ public class MeteredKeyValueStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 233c2c00b87..7440ad45f55 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,6 +47,7 @@ import
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -142,7 +143,13 @@ public class MeteredSessionStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index bf6f749977f..fc83f20ef6d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -161,7 +162,13 @@ public class MeteredWindowStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}