This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 57299cfbb1f KAFKA-17954: Error getting oldest-iterator-open-since-ms 
from JMX (#17713)
57299cfbb1f is described below

commit 57299cfbb1f33b9cf603473c96b2c77e8a5b6bf0
Author: Nick Telford <[email protected]>
AuthorDate: Tue Nov 19 01:45:49 2024 +0000

    KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713)
    
    The thread that evaluates the gauge for the oldest-iterator-open-since-ms 
runs concurrently
    with threads that open/close iterators (stream threads and interactive 
query threads). This PR
    fixed a race condition between `openIterators.isEmpty()` and 
`openIterators.first()`, by catching
    a potential exception. Because we except the race condition to be rare, we 
rather catch the
    exception in favor of introducing a guard via locking.
    
    Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../kafka/streams/state/internals/MeteredKeyValueStore.java      | 9 ++++++++-
 .../kafka/streams/state/internals/MeteredSessionStore.java       | 9 ++++++++-
 .../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 ++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index e8e613acdbf..0825de0af17 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -52,6 +52,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -154,7 +155,13 @@ public class MeteredKeyValueStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index a6546c1edb5..266e8a7e4bb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -46,6 +46,7 @@ import 
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -124,7 +125,13 @@ public class MeteredSessionStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e59665fb2eb..2ed95ac9b3b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -142,7 +143,13 @@ public class MeteredWindowStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 

Reply via email to