BewareMyPower commented on code in PR #24961:
URL: https://github.com/apache/pulsar/pull/24961#discussion_r2509378558


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2318,6 +2319,170 @@ public void asyncReadEntry(Position position, 
ReadEntryCallback callback, Object
 
     }
 
+    @Override
+    public CompletableFuture<List<Entry>> asyncReadEntries(Position start, 
long numberOfEntriesToRead) {
+        CompletableFuture<List<Entry>> f = new CompletableFuture<>();
+        asyncReadEntries(start, numberOfEntriesToRead, new 
ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                f.complete(entries);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f.completeExceptionally(exception);
+            }
+        }, null);
+        return f;
+    }
+
+    @Override
+    public void asyncReadEntries(Position start, long numberOfEntriesToRead, 
ReadEntriesCallback callback, Object ctx) {
+        if (!ledgers.containsKey(start.getLedgerId())) {
+            callback.readEntriesFailed(new ManagedLedgerException("Ledger not 
found"), ctx);
+            return;
+        }
+        if (start.getLedgerId() < 0) {
+            start = PositionFactory.create(start.getLedgerId(), 0L);
+        }
+        if (!isValidPosition(start)) {
+            callback.readEntriesFailed(new ManagedLedgerException("Invalid 
position"), ctx);
+            return;
+        }
+
+        Map<Long, Pair<Long, Long>> readerPositions = 
getReaderPositions(start, numberOfEntriesToRead);
+        if (readerPositions.isEmpty()) {
+            callback.readEntriesComplete(Collections.emptyList(), ctx);
+            return;
+        }
+
+        int actualReadEntries = getNumberOfEntries(readerPositions);
+        ReadEntriesCallback callback0 = new 
InternalReadEntriesCallback(actualReadEntries, callback);
+
+        // Parallel read
+        for (Map.Entry<Long, Pair<Long, Long>> entry : 
readerPositions.entrySet()) {
+            long ledgerId = entry.getKey();
+            Pair<Long, Long> position = entry.getValue();
+            long firstEntry = position.getLeft();
+            long lastEntry = position.getRight();
+            Position finalStart = start;
+            getLedgerHandle(ledgerId)
+                    .thenAccept(ledger -> asyncReadEntry(ledger, firstEntry, 
lastEntry, callback0, ctx))
+                    .exceptionally(ex -> {
+                        log.error("[{}] Error opening ledger for reading at 
position {} - {}, entries {}", name,
+                                finalStart, numberOfEntriesToRead, 
ex.getMessage());
+                        
callback0.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
+                                ctx);
+                        return null;
+                    });
+        }
+    }
+
+    static class InternalReadEntriesCallback implements ReadEntriesCallback {
+        private final List<Entry> readEntries;
+        private final AtomicInteger remaining;
+        private final AtomicBoolean completed;
+        private final ReadEntriesCallback callback;
+
+        public InternalReadEntriesCallback(int actualReadEntries, 
ReadEntriesCallback callback) {
+            this.callback = callback;
+            this.completed = new AtomicBoolean(false);
+            readEntries = new ArrayList<>(actualReadEntries);
+            remaining = new AtomicInteger(actualReadEntries);
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> entries, Object ctx) {
+            if (completed.get()) {
+                releaseEntries(entries);
+                return;
+            }
+            this.readEntries.addAll(entries);
+            if (remaining.addAndGet(-entries.size()) == 0) {
+                completed.set(true);
+                sortEntries();
+                callback.readEntriesComplete(readEntries, ctx);
+            }
+        }
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            if (completed.compareAndSet(false, true)) {
+                releaseEntries(readEntries);
+                callback.readEntriesFailed(exception, ctx);
+            }
+        }
+
+        private void sortEntries() {
+            readEntries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
+        }
+
+        private void releaseEntries(List<Entry> entries) {
+            for (Entry entry : entries) {
+                entry.release();
+            }
+        }
+    }
+
+    /**
+     * Get the number of entries to read from a given set of reader positions.
+     *
+     * @param readPositions
+     * @return
+     */
+    @VisibleForTesting
+    public static int getNumberOfEntries(Map<Long, Pair<Long, Long>> 
readPositions) {
+        int numberOfEntries = 0;
+        for (Map.Entry<Long, Pair<Long, Long>> entry : 
readPositions.entrySet()) {
+            Pair<Long, Long> range = entry.getValue();
+            numberOfEntries += (int) (range.getRight() - range.getLeft() + 1);
+        }
+        return numberOfEntries;
+    }
+
+    /**
+     * Get the reader positions for a given start position and number of 
entries.
+     *
+     * @param start
+     * @param numberOfEntries
+     * @return
+     */
+    @VisibleForTesting
+    public Map<Long, Pair<Long, Long>> getReaderPositions(Position start, long 
numberOfEntries) {

Review Comment:
   If this method is exposed as a public method, is there still a reason to add 
`asyncReadEntries`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to