ivankelly commented on a change in pull request #1513: Managed ledger uses ReadHandle in read path URL: https://github.com/apache/incubator-pulsar/pull/1513#discussion_r180074490
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java ########## @@ -239,43 +243,44 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo } // Read all the entries from bookkeeper - lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) -> { - - if (rc != BKException.Code.OK) { - if (rc == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(createManagedLedgerException(rc), ctx); - } else { - ml.invalidateLedgerHandle(lh1, rc); - ManagedLedgerException mlException = createManagedLedgerException(rc); - callback.readEntriesFailed(mlException, ctx); - } - return; - } - - checkNotNull(ml.getName()); - checkNotNull(ml.getExecutor()); - ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { - // We got the entries, we need to transform them to a List<> type - long totalSize = 0; - final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); - while (sequence.hasMoreElements()) { - // Insert the entries at the end of the list (they will be unsorted for now) - LedgerEntry ledgerEntry = sequence.nextElement(); - EntryImpl entry = EntryImpl.create(ledgerEntry); - ledgerEntry.getEntryBuffer().release(); - - entriesToReturn.add(entry); - - totalSize += entry.getLength(); - - } - - manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); - ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); - - callback.readEntriesComplete((List) entriesToReturn, ctx); - })); - }, callback); + lh.readAsync(firstEntry, lastEntry).whenComplete( + (ledgerEntries, exception) -> { + if (exception != null) { + if (exception instanceof BKException + && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + } else { + ml.invalidateLedgerHandle(lh, exception); + ManagedLedgerException mlException = createManagedLedgerException(exception); + callback.readEntriesFailed(mlException, ctx); + } + return; + } + + checkNotNull(ml.getName()); + checkNotNull(ml.getExecutor()); + ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { Review comment: Didn't occur to me when I was first doing it, but done now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services