This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bc3ea21385e [fix][ml] Fix NoSuchElementException in 
EntryCountEstimator caused by a race condition (#25177)
bc3ea21385e is described below

commit bc3ea21385e9c21eea20f1479d32e3ac3b1a0371
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jan 23 05:51:49 2026 +0200

    [fix][ml] Fix NoSuchElementException in EntryCountEstimator caused by a 
race condition (#25177)
    
    (cherry picked from commit 9b70ba303d5aa65c497bcdc4684cbc9e9017bb42)
---
 .../mledger/impl/EntryCountEstimator.java          | 45 ++++++++++++++++------
 .../mledger/impl/EntryCountEstimatorTest.java      | 40 +++++++++++++++++++
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index bb5bfe7c5f4..ed0bedf8a73 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKK
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.NoSuchElementException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -81,17 +82,15 @@ class EntryCountEstimator {
             return maxEntries;
         }
 
-        // Adjust the read position to ensure it falls within the valid range 
of available ledgers.
-        // This handles special cases such as EARLIEST and LATEST positions by 
resetting them
-        // to the first available ledger or the last active ledger, 
respectively.
-        if (lastLedgerId != null && readPosition.getLedgerId() > 
lastLedgerId.longValue()) {
-            readPosition = PositionImpl.get(lastLedgerId, 
Math.max(lastLedgerTotalEntries - 1, 0));
-        } else if (lastLedgerId == null && readPosition.getLedgerId() > 
ledgersInfo.lastKey()) {
-            Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
lastEntry = ledgersInfo.lastEntry();
-            readPosition =
-                    PositionImpl.get(lastEntry.getKey(), 
Math.max(lastEntry.getValue().getEntries() - 1, 0));
-        } else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
-            readPosition = PositionImpl.get(ledgersInfo.firstKey(), 0);
+        if (ledgersInfo.isEmpty()) {
+            return 1;
+        }
+
+        try {
+            readPosition = adjustReadPosition(readPosition, ledgersInfo, 
lastLedgerId, lastLedgerTotalEntries);
+        } catch (NoSuchElementException e) {
+            // there was a race condition where ledgersInfo became empty just 
before adjustReadPosition was called
+            return 1;
         }
 
         long estimatedEntryCount = 0;
@@ -182,4 +181,28 @@ class EntryCountEstimator {
         // Ensure at least one entry is always returned as the result
         return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
     }
+
+    private static Position adjustReadPosition(Position readPosition,
+                                               NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo>
+                                                       ledgersInfo,
+                                               Long lastLedgerId, long 
lastLedgerTotalEntries) {
+        // Adjust the read position to ensure it falls within the valid range 
of available ledgers.
+        // This handles special cases such as EARLIEST and LATEST positions by 
resetting them
+        // to the first available ledger or the last active ledger, 
respectively.
+        if (lastLedgerId != null && readPosition.getLedgerId() > 
lastLedgerId.longValue()) {
+            return PositionImpl.get(lastLedgerId, 
Math.max(lastLedgerTotalEntries - 1, 0));
+        }
+        long lastKey = ledgersInfo.lastKey();
+        if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
+            Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
lastEntry = ledgersInfo.lastEntry();
+            if (lastEntry != null && lastEntry.getKey() == lastKey) {
+                return PositionImpl.get(lastEntry.getKey(), 
Math.max(lastEntry.getValue().getEntries() - 1, 0));
+            }
+        }
+        long firstKey = ledgersInfo.firstKey();
+        if (readPosition.getLedgerId() < firstKey) {
+            return PositionImpl.get(firstKey, 0);
+        }
+        return readPosition;
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
index e6c77b73596..8477321552c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
@@ -19,9 +19,12 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.NavigableMap;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.bookkeeper.mledger.Position;
@@ -288,4 +291,41 @@ public class EntryCountEstimatorTest {
         int result = estimateEntryCountByBytesSize(Long.MAX_VALUE);
         assertEquals(result, maxEntries);
     }
+
+    @Test
+    public void testNoLedgers() {
+        readPosition = PositionImpl.EARLIEST;
+        // remove all ledgers from ledgersInfo
+        ledgersInfo.clear();
+        int result = estimateEntryCountByBytesSize(5_000_000);
+        // expect that result is 1 because the estimation couldn't be done
+        assertEquals(result, 1);
+    }
+
+    @Test
+    public void testNoLedgersRaceFirstKey() {
+        readPosition = PositionImpl.EARLIEST;
+        // remove all ledgers from ledgersInfo
+        ledgersInfo = mock(NavigableMap.class);
+        when(ledgersInfo.isEmpty()).thenReturn(false);
+        when(ledgersInfo.firstKey()).thenThrow(NoSuchElementException.class);
+        when(ledgersInfo.lastKey()).thenReturn(1L);
+        int result = estimateEntryCountByBytesSize(5_000_000);
+        // expect that result is 1 because the estimation couldn't be done
+        assertEquals(result, 1);
+    }
+
+    @Test
+    public void testNoLedgersRaceLastKey() {
+        readPosition = PositionImpl.EARLIEST;
+        // remove all ledgers from ledgersInfo
+        ledgersInfo = mock(NavigableMap.class);
+        lastLedgerId = null;
+        when(ledgersInfo.isEmpty()).thenReturn(false);
+        when(ledgersInfo.firstKey()).thenReturn(1L);
+        when(ledgersInfo.lastKey()).thenThrow(NoSuchElementException.class);
+        int result = estimateEntryCountByBytesSize(5_000_000);
+        // expect that result is 1 because the estimation couldn't be done
+        assertEquals(result, 1);
+    }
 }
\ No newline at end of file

Reply via email to