This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 57299cfbb1f KAFKA-17954: Error getting oldest-iterator-open-since-ms
from JMX (#17713)
57299cfbb1f is described below
commit 57299cfbb1f33b9cf603473c96b2c77e8a5b6bf0
Author: Nick Telford <[email protected]>
AuthorDate: Tue Nov 19 01:45:49 2024 +0000
KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713)
The thread that evaluates the gauge for the oldest-iterator-open-since-ms
runs concurrently
with threads that open/close iterators (stream threads and interactive
query threads). This PR
fixed a race condition between `openIterators.isEmpty()` and
`openIterators.first()`, by catching
a potential exception. Because we except the race condition to be rare, we
rather catch the
exception in favor of introducing a guard via locking.
Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../kafka/streams/state/internals/MeteredKeyValueStore.java | 9 ++++++++-
.../kafka/streams/state/internals/MeteredSessionStore.java | 9 ++++++++-
.../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 ++++++++-
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index e8e613acdbf..0825de0af17 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -52,6 +52,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -154,7 +155,13 @@ public class MeteredKeyValueStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index a6546c1edb5..266e8a7e4bb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -46,6 +46,7 @@ import
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -124,7 +125,13 @@ public class MeteredSessionStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e59665fb2eb..2ed95ac9b3b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
@@ -142,7 +143,13 @@ public class MeteredWindowStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> openIterators.isEmpty() ? null :
openIterators.first().startTimestamp()
+ (config, now) -> {
+ try {
+ return openIterators.isEmpty() ? null :
openIterators.first().startTimestamp();
+ } catch (final NoSuchElementException ignored) {
+ return null;
+ }
+ }
);
}