codelipenghui commented on code in PR #25280:
URL: https://github.com/apache/pulsar/pull/25280#discussion_r2887551905


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2332,6 +2332,12 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private boolean managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed = 
true;
 
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Enable batch read API when reading entries from bookkeeper. 
"
+                    + "Batch read allows reading multiple entries in a single 
RPC call, "
+                    + "reducing network overhead for sequential reads.")
+    private boolean managedLedgerBatchReadEnabled = true;

Review Comment:
   It should be false as the PR's description?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java:
##########
@@ -49,6 +61,79 @@ static CompletableFuture<LedgerEntries> 
readAsync(ManagedLedger ml, ReadHandle h
             return CompletableFuture.failedFuture(new 
ManagedLedgerException("LastConfirmedEntry is "
                     + lastConfirmedEntry + " when reading entry " + 
lastEntry));
         }
+
+        int numberOfEntries = (int) (lastEntry - firstEntry + 1);
+
+        // Use batch read for multiple entries when enabled
+        if (batchReadEnabled && numberOfEntries > 1) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using batch read for ledger {} entries {}-{}, 
maxCount={}, maxSize={}",
+                        handle.getId(), firstEntry, lastEntry, 
numberOfEntries, batchReadMaxSize);
+            }
+            return batchReadWithAutoRefill(handle, firstEntry, lastEntry, 
numberOfEntries, batchReadMaxSize);
+        }
+
         return handle.readUnconfirmedAsync(firstEntry, lastEntry);
     }
+
+    private static CompletableFuture<LedgerEntries> batchReadWithAutoRefill(
+            ReadHandle handle, long firstEntry, long lastEntry,
+            int maxCount, long maxSize) {
+
+        return handle.batchReadAsync(firstEntry, maxCount, maxSize)
+                .exceptionallyCompose(ex -> {
+                    // Fallback to readUnconfirmedAsync if batch read fails
+                    log.warn("Batch read failed for ledger {} entries {}-{}, 
falling back to regular read: {}",
+                            handle.getId(), firstEntry, lastEntry, 
ex.getMessage());
+                    return handle.readUnconfirmedAsync(firstEntry, lastEntry);
+                })
+                .thenCompose(entries -> {
+                    // Collect entries and find the last received entry id in 
a single pass
+                    List<LedgerEntry> receivedList = new ArrayList<>();
+                    long lastReceivedEntryId = -1;
+                    for (LedgerEntry e : entries) {
+                        receivedList.add(e);
+                        lastReceivedEntryId = e.getEntryId();
+                    }
+                    int receivedCount = receivedList.size();
+
+                    // All entries received, return as-is
+                    if (receivedCount >= maxCount) {
+                        return CompletableFuture.completedFuture(entries);
+                    }
+
+                    // Partial result: need to read remaining entries
+                    if (receivedCount == 0) {
+                        // Edge case: no entries returned, use regular read
+                        entries.close();
+                        log.warn("Batch read returned 0 entries for ledger {} 
entries {}-{}, falling back to "
+                                + "regular read", handle.getId(), firstEntry, 
lastEntry);
+                        return handle.readUnconfirmedAsync(firstEntry, 
lastEntry);
+                    }
+
+                    // Close the original entries since we've collected them 
into receivedList
+                    entries.close();
+
+                    long nextEntryId = lastReceivedEntryId + 1;
+                    int remainingCount = (int) (lastEntry - nextEntryId + 1);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Batch read partial result for ledger {}: 
received {}/{}, reading remaining {}-{}",
+                                handle.getId(), receivedCount, maxCount, 
nextEntryId, lastEntry);
+                    }
+
+                    // Recursively read remaining entries
+                    return batchReadWithAutoRefill(handle, nextEntryId, 
lastEntry, remainingCount, maxSize)
+                            .thenApply(remainingEntries -> {
+                                // Combine received and remaining entries
+                                List<LedgerEntry> combined = new 
ArrayList<>(receivedCount + remainingCount);
+                                combined.addAll(receivedList);

Review Comment:
   The entry in the receivedList will get released in line:115?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to