merlimat closed pull request #1513: Managed ledger uses ReadHandle in read path
URL: https://github.com/apache/incubator-pulsar/pull/1513
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index b38999b091..9aa5e1debf 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -24,7 +24,7 @@
 import com.google.common.base.Charsets;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.DigestType;
 
 /**
  * Configuration class for a ManagedLedger.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
index bb8f2a651f..0c99650cec 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
@@ -18,7 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.commons.lang3.tuple.Pair;
@@ -94,7 +94,7 @@
      * @param ctx
      *            the context object
      */
-    void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, 
boolean isSlowestReader,
+    void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, 
boolean isSlowestReader,
             ReadEntriesCallback callback, Object ctx);
 
     /**
@@ -111,7 +111,7 @@ void asyncReadEntry(LedgerHandle lh, long firstEntry, long 
lastEntry, boolean is
      * @param ctx
      *            the context object
      */
-    void asyncReadEntry(LedgerHandle lh, PositionImpl position, 
ReadEntryCallback callback, Object ctx);
+    void asyncReadEntry(ReadHandle lh, PositionImpl position, 
ReadEntryCallback callback, Object ctx);
 
     /**
      * Get the total size in bytes of all the entries stored in this cache.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 37ddc54629..ff80feccf6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -28,10 +28,11 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -159,7 +160,7 @@ public void invalidateAllEntries(long ledgerId) {
     }
 
     @Override
-    public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final 
ReadEntryCallback callback,
+    public void asyncReadEntry(ReadHandle lh, PositionImpl position, final 
ReadEntryCallback callback,
             final Object ctx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), 
lh.getId(), position.getEntryId());
@@ -171,37 +172,38 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl 
position, final ReadEnt
             manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
             callback.readEntryComplete(cachedEntry, ctx);
         } else {
-            lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), 
(rc, ledgerHandle, sequence, obj) -> {
-                if (rc != BKException.Code.OK) {
-                    ml.invalidateLedgerHandle(ledgerHandle, rc);
-                    callback.readEntryFailed(createManagedLedgerException(rc), 
obj);
-                    return;
-                }
-
-                if (sequence.hasMoreElements()) {
-                    LedgerEntry ledgerEntry = sequence.nextElement();
-                    EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
-
-                    // The EntryImpl is now the owner of the buffer, so we can 
release the original one
-                    ledgerEntry.getEntryBuffer().release();
-
-                    manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
-                    ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
-
-                    ml.getExecutor().executeOrdered(ml.getName(), safeRun(() 
-> {
-                        callback.readEntryComplete(returnEntry, obj);
-                    }));
-                } else {
-                    // got an empty sequence
-                    callback.readEntryFailed(new ManagedLedgerException("Could 
not read given position"), obj);
-                }
-            }, ctx);
+            lh.readAsync(position.getEntryId(), 
position.getEntryId()).whenCompleteAsync(
+                    (ledgerEntries, exception) -> {
+                        if (exception != null) {
+                            ml.invalidateLedgerHandle(lh, exception);
+                            
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
+                            return;
+                        }
+
+                        try {
+                            Iterator<LedgerEntry> iterator = 
ledgerEntries.iterator();
+                            if (iterator.hasNext()) {
+                                LedgerEntry ledgerEntry = iterator.next();
+                                EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry);
+
+                                manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
+                                ml.mbean.addReadEntriesSample(1, 
returnEntry.getLength());
+                                callback.readEntryComplete(returnEntry, ctx);
+                            } else {
+                                // got an empty sequence
+                                callback.readEntryFailed(new 
ManagedLedgerException("Could not read given position"),
+                                                         ctx);
+                            }
+                        } finally {
+                            ledgerEntries.close();
+                        }
+                    }, ml.getExecutor().chooseThread(ml.getName()));
         }
     }
 
     @Override
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public void asyncReadEntry(LedgerHandle lh, long firstEntry, long 
lastEntry, boolean isSlowestReader,
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, 
boolean isSlowestReader,
             final ReadEntriesCallback callback, Object ctx) {
         final long ledgerId = lh.getId();
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -239,43 +241,43 @@ public void asyncReadEntry(LedgerHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) 
-> {
-
-                if (rc != BKException.Code.OK) {
-                    if (rc == BKException.Code.TooManyRequestsException) {
-                        
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
-                    } else {
-                        ml.invalidateLedgerHandle(lh1, rc);
-                        ManagedLedgerException mlException = 
createManagedLedgerException(rc);
-                        callback.readEntriesFailed(mlException, ctx);
-                    }
-                    return;
-                }
-
-                checkNotNull(ml.getName());
-                checkNotNull(ml.getExecutor());
-                ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
-                    // We got the entries, we need to transform them to a 
List<> type
-                    long totalSize = 0;
-                    final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                    while (sequence.hasMoreElements()) {
-                        // Insert the entries at the end of the list (they 
will be unsorted for now)
-                        LedgerEntry ledgerEntry = sequence.nextElement();
-                        EntryImpl entry = EntryImpl.create(ledgerEntry);
-                        ledgerEntry.getEntryBuffer().release();
-
-                        entriesToReturn.add(entry);
-
-                        totalSize += entry.getLength();
-
-                    }
-
-                    
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                    ml.getMBean().addReadEntriesSample(entriesToReturn.size(), 
totalSize);
-
-                    callback.readEntriesComplete((List) entriesToReturn, ctx);
-                }));
-            }, callback);
+            lh.readAsync(firstEntry, lastEntry).whenCompleteAsync(
+                    (ledgerEntries, exception) -> {
+                        if (exception != null) {
+                            if (exception instanceof BKException
+                                && ((BKException)exception).getCode() == 
BKException.Code.TooManyRequestsException) {
+                                
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                            } else {
+                                ml.invalidateLedgerHandle(lh, exception);
+                                ManagedLedgerException mlException = 
createManagedLedgerException(exception);
+                                callback.readEntriesFailed(mlException, ctx);
+                            }
+                            return;
+                        }
+
+                        checkNotNull(ml.getName());
+                        checkNotNull(ml.getExecutor());
+
+                        try {
+                            // We got the entries, we need to transform them 
to a List<> type
+                            long totalSize = 0;
+                            final List<EntryImpl> entriesToReturn
+                                = 
Lists.newArrayListWithExpectedSize(entriesToRead);
+                            for (LedgerEntry e : ledgerEntries) {
+                                EntryImpl entry = EntryImpl.create(e);
+
+                                entriesToReturn.add(entry);
+                                totalSize += entry.getLength();
+                            }
+
+                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                            
ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
+                        } finally {
+                            ledgerEntries.close();
+                        }
+                    }, ml.getExecutor().chooseThread(ml.getName()));
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9f6f837982..c551002b47 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -33,8 +33,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -190,37 +190,35 @@ public void clear() {
         }
 
         @Override
-        public void asyncReadEntry(LedgerHandle lh, long firstEntry, long 
lastEntry, boolean isSlowestReader,
+        public void asyncReadEntry(ReadHandle lh, long firstEntry, long 
lastEntry, boolean isSlowestReader,
                 final ReadEntriesCallback callback, Object ctx) {
-            lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() {
-                public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object bkctx) {
-                    if (rc != BKException.Code.OK) {
-                        
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
-                        return;
-                    }
-
-                    List<Entry> entries = Lists.newArrayList();
-                    long totalSize = 0;
-                    while (seq.hasMoreElements()) {
-                        // Insert the entries at the end of the list (they 
will be unsorted for now)
-                        LedgerEntry ledgerEntry = seq.nextElement();
-                        EntryImpl entry = EntryImpl.create(ledgerEntry);
-                        ledgerEntry.getEntryBuffer().release();
-
-                        entries.add(entry);
-                        totalSize += entry.getLength();
-                    }
-
-                    mlFactoryMBean.recordCacheMiss(entries.size(), totalSize);
-                    ml.mbean.addReadEntriesSample(entries.size(), totalSize);
-
-                    callback.readEntriesComplete(entries, null);
-                }
-            }, null);
+            lh.readAsync(firstEntry, lastEntry).whenComplete(
+                    (ledgerEntries, exception) -> {
+                        if (exception != null) {
+                            
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                            return;
+                        }
+                        List<Entry> entries = Lists.newArrayList();
+                        long totalSize = 0;
+                        try {
+                            for (LedgerEntry e : ledgerEntries) {
+                                // Insert the entries at the end of the list 
(they will be unsorted for now)
+                                EntryImpl entry = EntryImpl.create(e);
+                                entries.add(entry);
+                                totalSize += entry.getLength();
+                            }
+                        } finally {
+                            ledgerEntries.close();
+                        }
+                        mlFactoryMBean.recordCacheMiss(entries.size(), 
totalSize);
+                        ml.mbean.addReadEntriesSample(entries.size(), 
totalSize);
+
+                        callback.readEntriesComplete(entries, null);
+                    });
         }
 
         @Override
-        public void asyncReadEntry(LedgerHandle lh, PositionImpl position, 
AsyncCallbacks.ReadEntryCallback callback,
+        public void asyncReadEntry(ReadHandle lh, PositionImpl position, 
AsyncCallbacks.ReadEntryCallback callback,
                 Object ctx) {
         }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index d1c6defee9..bf3e925605 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -25,7 +25,7 @@
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCounted;
-import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.Entry;
 
 public final class EntryImpl extends AbstractReferenceCounted implements 
Entry, Comparable<EntryImpl>, ReferenceCounted {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 462428e1c2..acf17f12c3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -92,6 +92,7 @@
     protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
+    private final BookKeeper.DigestType digestType;
 
     protected volatile PositionImpl markDeletePosition;
     protected volatile PositionImpl readPosition;
@@ -179,6 +180,7 @@ public MarkDeleteEntry(PositionImpl newPosition, 
Map<String, Long> properties,
         this.config = config;
         this.ledger = ledger;
         this.name = cursorName;
+        this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         STATE_UPDATER.set(this, State.Uninitialized);
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
         PENDING_READ_OPS_UPDATER.set(this, 0);
@@ -253,7 +255,7 @@ protected void recoverFromLedger(final ManagedCursorInfo 
info, final VoidCallbac
         // a new ledger and write the position into it
         ledger.mbean.startCursorLedgerOpenOp();
         long ledgerId = info.getCursorsLedgerId();
-        bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), 
config.getPassword(), (rc, lh, ctx) -> {
+        bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), 
(rc, lh, ctx) -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Opened ledger {} for consumer {}. rc={}", 
ledger.getName(), ledgerId, name, rc);
             }
@@ -1924,7 +1926,7 @@ void createNewMetadataLedger(final VoidCallback callback) 
{
         ledger.mbean.startCursorLedgerCreateOp();
 
         bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), 
config.getMetadataWriteQuorumSize(),
-                config.getMetadataAckQuorumSize(), config.getDigestType(), 
config.getPassword(), (rc, lh, ctx) -> {
+                config.getMetadataAckQuorumSize(), digestType, 
config.getPassword(), (rc, lh, ctx) -> {
                     ledger.getExecutor().execute(safeRun(() -> {
                         ledger.mbean.endCursorLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f6c7e3ffd9..77b1dff7b2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -54,6 +54,8 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -104,11 +106,12 @@
 
     private final BookKeeper bookKeeper;
     private final String name;
+    private final BookKeeper.DigestType digestType;
 
     private ManagedLedgerConfig config;
     private final MetaStore store;
 
-    private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> 
ledgerCache = new ConcurrentLongHashMap<>();
+    private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> 
ledgerCache = new ConcurrentLongHashMap<>();
     private final NavigableMap<Long, LedgerInfo> ledgers = new 
ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
 
@@ -212,6 +215,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper
         this.config = config;
         this.store = store;
         this.name = name;
+        this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         this.scheduledExecutor = scheduledExecutor;
         this.executor = orderedExecutor;
         TOTAL_SIZE_UPDATER.set(this, 0);
@@ -278,7 +282,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, 
Stat stat) {
                         log.debug("[{}] Opening legder {}", name, id);
                     }
                     mbean.startDataLedgerOpenOp();
-                    bookKeeper.asyncOpenLedger(id, config.getDigestType(), 
config.getPassword(), opencb, null);
+                    bookKeeper.asyncOpenLedger(id, digestType, 
config.getPassword(), opencb, null);
                 } else {
                     initializeBookKeeper(callback);
                 }
@@ -342,7 +346,7 @@ public void operationFailed(MetaStoreException e) {
         this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
         mbean.startDataLedgerCreateOp();
         bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
-                config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> 
{
+                digestType, config.getPassword(), (rc, lh, ctx) -> {
                     executor.executeOrdered(name, safeRun(() -> {
                         mbean.endDataLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
@@ -528,7 +532,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry 
addOperation) {
                 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
                 mbean.startDataLedgerCreateOp();
                 bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                        config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, null,
+                        config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
                         Collections.emptyMap());
             }
         } else {
@@ -1263,7 +1267,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
             this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
             mbean.startDataLedgerCreateOp();
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, null,
+                    config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
                     Collections.emptyMap());
         }
     }
@@ -1314,52 +1318,53 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
         }
     }
 
-    CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
-        CompletableFuture<LedgerHandle> ledgerHandle = 
ledgerCache.get(ledgerId);
+    CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
+        CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
         if (ledgerHandle != null) {
             return ledgerHandle;
         }
 
         // If not present try again and create if necessary
         return ledgerCache.computeIfAbsent(ledgerId, lid -> {
-            // Open the ledger for reading if it was not already opened
-            CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Asynchronously opening ledger {} for read", 
name, ledgerId);
-            }
-            mbean.startDataLedgerOpenOp();
-            bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), 
config.getPassword(),
-                    (int rc, LedgerHandle lh, Object ctx) -> {
-                        executor.executeOrdered(name, safeRun(() -> {
+                // Open the ledger for reading if it was not already opened
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Asynchronously opening ledger {} for 
read", name, ledgerId);
+                }
+                mbean.startDataLedgerOpenOp();
+
+                CompletableFuture<ReadHandle> promise = new 
CompletableFuture<>();
+                bookKeeper.newOpenLedgerOp()
+                    .withRecovery(true)
+                    .withLedgerId(ledgerId)
+                    .withDigestType(config.getDigestType())
+                    .withPassword(config.getPassword()).execute()
+                    .whenCompleteAsync((res,ex) -> {
                             mbean.endDataLedgerOpenOp();
-                            if (rc != BKException.Code.OK) {
-                                // Remove the ledger future from cache to give 
chance to reopen it later
-                                ledgerCache.remove(ledgerId, future);
-                                
future.completeExceptionally(createManagedLedgerException(rc));
+                            if (ex != null) {
+                                ledgerCache.remove(ledgerId, promise);
+                                
promise.completeExceptionally(createManagedLedgerException(ex));
                             } else {
                                 if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Successfully opened ledger 
{} for reading", name, lh.getId());
+                                    log.debug("[{}] Successfully opened ledger 
{} for reading", name, ledgerId);
                                 }
-                                future.complete(lh);
+                                promise.complete(res);
                             }
-                        }));
-                    }, null);
-            return future;
-        });
+                        }, executor.chooseThread(name));
+                return promise;
+            });
     }
 
-    void invalidateLedgerHandle(LedgerHandle ledgerHandle, int rc) {
+    void invalidateLedgerHandle(ReadHandle ledgerHandle, Throwable t) {
         long ledgerId = ledgerHandle.getId();
         if (ledgerId != currentLedger.getId()) {
             // remove handle from ledger cache since we got a (read) error
             ledgerCache.remove(ledgerId);
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Removed ledger {} from cache (after read 
error: {})", name, ledgerId, rc);
+                log.debug("[{}] Removed ledger {} from cache (after read 
error)", name, ledgerId, t);
             }
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Ledger that encountered read error {} is 
current ledger", name, rc);
+                log.debug("[{}] Ledger that encountered read error is current 
ledger", name, t);
             }
         }
     }
@@ -1384,7 +1389,7 @@ void asyncReadEntry(PositionImpl position, 
ReadEntryCallback callback, Object ct
 
     }
 
-    private void internalReadFromLedger(LedgerHandle ledger, OpReadEntry 
opReadEntry) {
+    private void internalReadFromLedger(ReadHandle ledger, OpReadEntry 
opReadEntry) {
 
         // Perform the read
         long firstEntry = opReadEntry.readPosition.getEntryId();
@@ -2256,6 +2261,14 @@ public static ManagedLedgerException 
createManagedLedgerException(int bkErrorCod
         }
     }
 
+    public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
+        if (t instanceof org.apache.bookkeeper.client.api.BKException) {
+            return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)t).getCode());
+        } else {
+            return new ManagedLedgerException("Unknown exception");
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index 39405af512..06acb5b866 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -34,6 +34,7 @@
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.common.naming.TopicName;
@@ -52,9 +53,9 @@
     private boolean accurate = false;
     private String brokerName;
 
-    public ManagedLedgerOfflineBacklog(BookKeeper.DigestType digestType, 
byte[] password, String brokerName,
+    public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] password, 
String brokerName,
             boolean accurate) {
-        this.digestType = digestType;
+        this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType);
         this.password = password;
         this.accurate = accurate;
         this.brokerName = brokerName;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index 1be2692a6b..c48ea24d88 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -27,11 +27,13 @@
 import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Vector;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -59,7 +61,7 @@ public void setUp(Method method) throws Exception {
 
     @Test(timeOut = 5000)
     void testRead() throws Exception {
-        LedgerHandle lh = getLedgerHandle();
+        ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
@@ -86,12 +88,12 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         counter.await();
 
         // Verify no entries were read from bookkeeper
-        verify(lh, never()).asyncReadEntries(anyLong(), anyLong(), 
any(ReadCallback.class), any());
+        verify(lh, never()).readAsync(anyLong(), anyLong());
     }
 
     @Test(timeOut = 5000)
     void testReadMissingBefore() throws Exception {
-        LedgerHandle lh = getLedgerHandle();
+        ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
@@ -119,7 +121,7 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
     @Test(timeOut = 5000)
     void testReadMissingAfter() throws Exception {
-        LedgerHandle lh = getLedgerHandle();
+        ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
@@ -147,7 +149,7 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
     @Test(timeOut = 5000)
     void testReadMissingMiddle() throws Exception {
-        LedgerHandle lh = getLedgerHandle();
+        ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
@@ -176,7 +178,7 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
     @Test(timeOut = 5000)
     void testReadMissingMultiple() throws Exception {
-        LedgerHandle lh = getLedgerHandle();
+        ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
@@ -205,18 +207,14 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
     @Test(timeOut = 5000)
     void testReadWithError() throws Exception {
-        final LedgerHandle lh = getLedgerHandle();
+        final ReadHandle lh = getLedgerHandle();
         when(lh.getId()).thenReturn((long) 0);
 
-        doAnswer(new Answer<Object>() {
-            public Object answer(InvocationOnMock invocation) {
-                Object[] args = invocation.getArguments();
-                ReadCallback callback = (ReadCallback) args[2];
-                Object ctx = args[3];
-                
callback.readComplete(BKException.Code.NoSuchLedgerExistsException, lh, null, 
ctx);
-                return null;
-            }
-        }).when(lh).asyncReadEntries(anyLong(), anyLong(), 
any(ReadCallback.class), any());
+        doAnswer((invocation) -> {
+                CompletableFuture<LedgerEntries> future = new 
CompletableFuture<>();
+                future.completeExceptionally(new 
BKNoSuchLedgerExistsException());
+                return future;
+            }).when(lh).readAsync(anyLong(), anyLong());
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
         EntryCache entryCache = cacheManager.getEntryCache(ml);
@@ -238,29 +236,25 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         counter.await();
     }
 
-    private static LedgerHandle getLedgerHandle() {
-        final LedgerHandle lh = mock(LedgerHandle.class);
+    private static ReadHandle getLedgerHandle() {
+        final ReadHandle lh = mock(ReadHandle.class);
         final LedgerEntry ledgerEntry = mock(LedgerEntry.class, 
Mockito.CALLS_REAL_METHODS);
-        doReturn(new byte[10]).when(ledgerEntry).getEntry();
         doReturn(Unpooled.wrappedBuffer(new 
byte[10])).when(ledgerEntry).getEntryBuffer();
         doReturn((long) 10).when(ledgerEntry).getLength();
 
-        doAnswer(new Answer<Object>() {
-            public Object answer(InvocationOnMock invocation) {
+        doAnswer((invocation) -> {
                 Object[] args = invocation.getArguments();
                 long firstEntry = (Long) args[0];
                 long lastEntry = (Long) args[1];
-                ReadCallback callback = (ReadCallback) args[2];
-                Object ctx = args[3];
 
                 Vector<LedgerEntry> entries = new Vector<LedgerEntry>();
                 for (int i = 0; i <= (lastEntry - firstEntry); i++) {
                     entries.add(ledgerEntry);
                 }
-                callback.readComplete(0, lh, entries.elements(), ctx);
-                return null;
-            }
-        }).when(lh).asyncReadEntries(anyLong(), anyLong(), 
any(ReadCallback.class), any());
+                LedgerEntries ledgerEntries = mock(LedgerEntries.class);
+                doAnswer((invocation2) -> 
entries.iterator()).when(ledgerEntries).iterator();
+                return CompletableFuture.completedFuture(ledgerEntries);
+            }).when(lh).readAsync(anyLong(), anyLong());
 
         return lh;
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ad85a35a99..8cf5ee4775 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2473,7 +2473,8 @@ public void operationFailed(ManagedLedgerException 
exception) {
         latch2.await();
 
         try {
-            bkc.openLedgerNoRecovery(ledgerId, mlConfig.getDigestType(), 
mlConfig.getPassword());
+            bkc.openLedgerNoRecovery(ledgerId, 
DigestType.fromApiDigestType(mlConfig.getDigestType()),
+                                     mlConfig.getPassword());
             fail("ledger should have deleted due to update-cursor failure");
         } catch (BKException e) {
             // ok
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 3035c286a5..a5b58d7df9 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -33,8 +33,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -345,7 +346,7 @@ public void ledgerFencedByAutoReplication() throws 
Exception {
         PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
 
         // Trigger the closure of the data ledger
-        bkc.openLedger(p1.getLedgerId(), DigestType.CRC32C, new byte[] {});
+        bkc.openLedger(p1.getLedgerId(), BookKeeper.DigestType.CRC32C, new 
byte[] {});
 
         ledger.addEntry("entry-2".getBytes());
 
@@ -423,8 +424,8 @@ public void testOfflineTopicBacklog() throws Exception {
         entries.forEach(e -> e.release());
         ledger.close();
 
-        ManagedLedgerOfflineBacklog offlineTopicBacklog = new 
ManagedLedgerOfflineBacklog(DigestType.CRC32,
-                "".getBytes(Charsets.UTF_8), "", false);
+        ManagedLedgerOfflineBacklog offlineTopicBacklog = new 
ManagedLedgerOfflineBacklog(
+                DigestType.CRC32, "".getBytes(Charsets.UTF_8), "", false);
         PersistentOfflineTopicStats offlineTopicStats = 
offlineTopicBacklog.getEstimatedUnloadedTopicBacklog(
                 (ManagedLedgerFactoryImpl) factory, 
"property/cluster/namespace/my-ledger");
         factory.shutdown();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 4efeefb9c4..35eb986df5 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.Entry;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dd9cabdc36..100271487e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -24,7 +24,7 @@
 import java.util.Properties;
 import java.util.Set;
 
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 038fb4dec3..d830c4ff05 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -32,7 +32,7 @@
 import java.io.PrintWriter;
 import java.util.Properties;
 
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.testng.annotations.Test;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 3730121098..c07217c9b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -384,7 +384,7 @@ private String getSchemaPath(String schemaId) {
             config.getManagedLedgerDefaultEnsembleSize(),
             config.getManagedLedgerDefaultWriteQuorum(),
             config.getManagedLedgerDefaultAckQuorum(),
-            config.getManagedLedgerDigestType(),
+            
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
@@ -402,7 +402,7 @@ private String getSchemaPath(String schemaId) {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
         bookKeeper.asyncOpenLedger(
             ledgerId,
-            config.getManagedLedgerDigestType(),
+            
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index e975b37586..0ffea79933 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -49,7 +49,7 @@
 
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.Recorder;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to