coderzc commented on code in PR #25280:
URL: https://github.com/apache/pulsar/pull/25280#discussion_r2903415240


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java:
##########
@@ -49,6 +61,79 @@ static CompletableFuture<LedgerEntries> 
readAsync(ManagedLedger ml, ReadHandle h
             return CompletableFuture.failedFuture(new 
ManagedLedgerException("LastConfirmedEntry is "
                     + lastConfirmedEntry + " when reading entry " + 
lastEntry));
         }
+
+        int numberOfEntries = (int) (lastEntry - firstEntry + 1);
+
+        // Use batch read for multiple entries when enabled
+        if (batchReadEnabled && numberOfEntries > 1) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using batch read for ledger {} entries {}-{}, 
maxCount={}, maxSize={}",
+                        handle.getId(), firstEntry, lastEntry, 
numberOfEntries, batchReadMaxSize);
+            }
+            return batchReadWithAutoRefill(handle, firstEntry, lastEntry, 
numberOfEntries, batchReadMaxSize);
+        }
+
         return handle.readUnconfirmedAsync(firstEntry, lastEntry);
     }
+
+    private static CompletableFuture<LedgerEntries> batchReadWithAutoRefill(
+            ReadHandle handle, long firstEntry, long lastEntry,
+            int maxCount, long maxSize) {
+
+        return handle.batchReadAsync(firstEntry, maxCount, maxSize)
+                .exceptionallyCompose(ex -> {
+                    // Fallback to readUnconfirmedAsync if batch read fails
+                    log.warn("Batch read failed for ledger {} entries {}-{}, 
falling back to regular read: {}",
+                            handle.getId(), firstEntry, lastEntry, 
ex.getMessage());
+                    return handle.readUnconfirmedAsync(firstEntry, lastEntry);
+                })
+                .thenCompose(entries -> {
+                    // Collect entries and find the last received entry id in 
a single pass
+                    List<LedgerEntry> receivedList = new ArrayList<>();
+                    long lastReceivedEntryId = -1;
+                    for (LedgerEntry e : entries) {
+                        receivedList.add(e);
+                        lastReceivedEntryId = e.getEntryId();
+                    }
+                    int receivedCount = receivedList.size();
+
+                    // All entries received, return as-is
+                    if (receivedCount >= maxCount) {
+                        return CompletableFuture.completedFuture(entries);
+                    }
+
+                    // Partial result: need to read remaining entries
+                    if (receivedCount == 0) {
+                        // Edge case: no entries returned, use regular read
+                        entries.close();
+                        log.warn("Batch read returned 0 entries for ledger {} 
entries {}-{}, falling back to "
+                                + "regular read", handle.getId(), firstEntry, 
lastEntry);
+                        return handle.readUnconfirmedAsync(firstEntry, 
lastEntry);
+                    }
+
+                    // Close the original entries since we've collected them 
into receivedList
+                    entries.close();

Review Comment:
   `receivedList` keeps the original `LedgerEntry` objects, but 
`entries.close()` is called before the merged `LedgerEntriesImpl` is returned. 
Since `LedgerEntriesImpl.close()` closes all contained entries, the combined 
result can end up holding entries whose buffers were already released. That can 
later fail with ref-count errors or invalid reads when the caller turns them 
into `EntryImpl`. Please retain/duplicate the entries before closing the 
original container, or delay closing until the merged result is consumed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to