This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new f9cc9d4a550 KAFKA-17954: Error getting oldest-iterator-open-since-ms 
from JMX (#17713)
f9cc9d4a550 is described below

commit f9cc9d4a550d156669c82f116a8c010298b56ab8
Author: Nick Telford <[email protected]>
AuthorDate: Tue Nov 19 01:45:49 2024 +0000

    KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713)
    
    The thread that evaluates the gauge for the oldest-iterator-open-since-ms 
runs concurrently
    with threads that open/close iterators (stream threads and interactive 
query threads). This PR
    fixed a race condition between `openIterators.isEmpty()` and 
`openIterators.first()`, by catching
    a potential exception. Because we except the race condition to be rare, we 
rather catch the
    exception in favor of introducing a guard via locking.
    
    Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../kafka/streams/state/internals/MeteredKeyValueStore.java      | 9 ++++++++-
 .../kafka/streams/state/internals/MeteredSessionStore.java       | 9 ++++++++-
 .../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 ++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index f828c502877..1d2933c528d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -53,6 +53,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -173,7 +174,13 @@ public class MeteredKeyValueStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 233c2c00b87..7440ad45f55 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -142,7 +143,13 @@ public class MeteredSessionStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index bf6f749977f..fc83f20ef6d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -161,7 +162,13 @@ public class MeteredWindowStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();
+                    } catch (final NoSuchElementException ignored) {
+                        return null;
+                    }
+                }
         );
     }
 

Reply via email to