This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c766575  Recovery uses immutable metadata
c766575 is described below

commit c766575c7b36c2692fe403c996a3fd3516e5708a
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Sep 4 12:31:15 2018 -0700

    Recovery uses immutable metadata
    
    And MetadataUpdateLoop to update it.
    
    Master issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1621 from ivankelly/recovery-immutable
---
 .../org/apache/bookkeeper/client/BKException.java  |  13 +
 .../org/apache/bookkeeper/client/LedgerHandle.java | 112 +--------
 .../apache/bookkeeper/client/LedgerMetadata.java   |  10 +-
 .../bookkeeper/client/LedgerMetadataBuilder.java   |  21 +-
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |   2 +-
 .../apache/bookkeeper/client/LedgerRecoveryOp.java |  63 ++---
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    | 254 +++++++++++++++++--
 .../apache/bookkeeper/client/BKExceptionTest.java  |  67 +++++
 .../bookkeeper/client/LedgerRecovery2Test.java     | 278 +++++++++++++++++++++
 .../bookkeeper/client/LedgerRecoveryTest.java      |  23 +-
 .../bookkeeper/client/MetadataUpdateLoopTest.java  |   2 +-
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  21 +-
 .../bookkeeper/client/MockClientContext.java       |   7 +-
 .../client/ParallelLedgerRecoveryTest.java         |   6 +-
 .../discover/MockRegistrationClient.java           |   8 +-
 .../apache/bookkeeper/proto/MockBookieClient.java  | 166 ++++++------
 .../bookkeeper/client/BookKeeperAccessor.java      |   6 +-
 17 files changed, 764 insertions(+), 295 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index ddfc795..8ab563b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -426,4 +426,17 @@ public abstract class BKException extends 
org.apache.bookkeeper.client.api.BKExc
             super(Code.LedgerIdOverflowException);
         }
     }
+
+    /**
+     * Extract an exception code from an BKException, or use a default if it's 
another type.
+     */
+    public static int getExceptionCode(Throwable t, int defaultCode) {
+        if (t instanceof BKException) {
+            return ((BKException) t).getCode();
+        } else if (t.getCause() != null) {
+            return getExceptionCode(t.getCause(), defaultCode);
+        } else {
+            return defaultCode;
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 741610d..1b859a3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -76,9 +76,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
@@ -329,6 +327,10 @@ public class LedgerHandle implements WriteHandle {
             // ensure that we only update the metadata if it is the object we 
expect it to be
             if (metadata == expected) {
                 metadata = newMetadata;
+                if (metadata.isClosed()) {
+                    lastAddConfirmed = lastAddPushed = 
metadata.getLastEntryId();
+                    length = metadata.getLength();
+                }
                 return true;
             } else {
                 return false;
@@ -2224,112 +2226,6 @@ public class LedgerHandle implements WriteHandle {
         }
     }
 
-
-    void recover(GenericCallback<Void> finalCb) {
-        recover(finalCb, null, false);
-    }
-
-    /**
-     * Recover the ledger.
-     *
-     * @param finalCb
-     *          callback after recovery is done.
-     * @param listener
-     *          read entry listener on recovery reads.
-     * @param forceRecovery
-     *          force the recovery procedure even the ledger metadata shows 
the ledger is closed.
-     */
-    void recover(GenericCallback<Void> finalCb,
-                 final @VisibleForTesting 
BookkeeperInternalCallbacks.ReadEntryListener listener,
-                 final boolean forceRecovery) {
-        final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
-            finalCb,
-            BKException.Code.OK,
-            clientCtx.getClientStats().getRecoverOpLogger());
-        boolean wasClosed = false;
-        boolean wasInRecovery = false;
-
-        LedgerMetadata metadata = getLedgerMetadata();
-        synchronized (this) {
-            if (metadata.isClosed()) {
-                if (forceRecovery) {
-                    wasClosed = false;
-                    // mark the ledger back to in recovery state, so it would 
proceed ledger recovery again.
-                    wasInRecovery = false;
-                    metadata.markLedgerInRecovery();
-                } else {
-                    lastAddConfirmed = lastAddPushed = 
metadata.getLastEntryId();
-                    length = metadata.getLength();
-                    wasClosed = true;
-                }
-            } else {
-                wasClosed = false;
-                if (metadata.isInRecovery()) {
-                    wasInRecovery = true;
-                } else {
-                    wasInRecovery = false;
-                    metadata.markLedgerInRecovery();
-                }
-            }
-        }
-
-        if (wasClosed) {
-            // We are already closed, nothing to do
-            cb.operationComplete(BKException.Code.OK, null);
-            return;
-        }
-
-        if (wasInRecovery) {
-            // if metadata is already in recover, dont try to write again,
-            // just do the recovery from the starting point
-            new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
-                    .setEntryListener(listener)
-                    .initiate();
-            return;
-        }
-
-        writeLedgerConfig(new 
OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(), ledgerId) 
{
-            @Override
-            public void safeOperationComplete(final int rc, LedgerMetadata 
writtenMetadata) {
-                if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new 
OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(),
-                                                                              
ledgerId) {
-                        @Override
-                        public void safeOperationComplete(int rc, 
LedgerMetadata newMeta) {
-                            if (rc != BKException.Code.OK) {
-                                cb.operationComplete(rc, null);
-                            } else {
-                                LedgerHandle.this.metadata = newMeta;
-                                recover(cb, listener, forceRecovery);
-                            }
-                        }
-
-                        @Override
-                        public String toString() {
-                            return 
String.format("ReReadMetadataForRecover(%d)", ledgerId);
-                        }
-                    });
-                } else if (rc == BKException.Code.OK) {
-                    // we only could issue recovery operation after we 
successfully update the ledger state to
-                    // in recovery otherwise, it couldn't prevent us advancing 
last confirmed while the other writer is
-                    // closing the ledger, which will cause inconsistent last 
add confirmed on bookies & zookeeper
-                    // metadata.
-                    new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
-                        .setEntryListener(listener)
-                        .initiate();
-                } else {
-                    LOG.error("Error writing ledger {} config: {}", ledgerId, 
BKException.codeLogger(rc));
-                    cb.operationComplete(rc, null);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("WriteLedgerConfigForRecover(%d)", 
ledgerId);
-            }
-        });
-    }
-
     static class NoopCloseCallback implements CloseCallback {
         static NoopCloseCallback instance = new NoopCloseCallback();
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 4f58aa8..ffb0d6a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -135,6 +135,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
                    int ackQuorumSize,
                    LedgerMetadataFormat.State state,
                    java.util.Optional<Long> lastEntryId,
+                   java.util.Optional<Long> length,
                    Map<Long, List<BookieSocketAddress>> ensembles,
                    DigestType digestType,
                    java.util.Optional<byte[]> password,
@@ -148,7 +149,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         this.ackQuorumSize = ackQuorumSize;
         this.state = state;
         lastEntryId.ifPresent((eid) -> this.lastEntryId = eid);
-
+        length.ifPresent((l) -> this.length = l);
         setEnsembles(ensembles);
         if (state != LedgerMetadataFormat.State.CLOSED) {
             currentEnsemble = this.ensembles.lastEntry().getValue();
@@ -787,4 +788,11 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         return bookies;
     }
 
+    java.util.Optional<Long> getLastEnsembleKey() {
+        if (ensembles.size() > 0) {
+            return java.util.Optional.of(ensembles.lastKey());
+        } else {
+            return java.util.Optional.empty();
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 8c37bd5..a9d83b0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ImmutableMap;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ class LedgerMetadataBuilder {
 
     private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
     private Optional<Long> lastEntryId = Optional.empty();
+    private Optional<Long> length = Optional.empty();
 
     private TreeMap<Long, List<BookieSocketAddress>> ensembles = new 
TreeMap<>();
 
@@ -67,6 +69,10 @@ class LedgerMetadataBuilder {
         if (lastEntryId != LedgerHandle.INVALID_ENTRY_ID) {
             builder.lastEntryId = Optional.of(lastEntryId);
         }
+        long length = other.getLength();
+        if (length > 0) {
+            builder.length = Optional.of(length);
+        }
 
         builder.ensembles.putAll(other.getEnsembles());
 
@@ -85,6 +91,11 @@ class LedgerMetadataBuilder {
         return builder;
     }
 
+    LedgerMetadataBuilder withPassword(byte[] password) {
+        this.password = Optional.of(Arrays.copyOf(password, password.length));
+        return this;
+    }
+
     LedgerMetadataBuilder withEnsembleSize(int ensembleSize) {
         checkState(ensembles.size() == 0, "Can only set ensemble size before 
adding ensembles to the builder");
         this.ensembleSize = ensembleSize;
@@ -109,17 +120,21 @@ class LedgerMetadataBuilder {
         return this;
     }
 
+    LedgerMetadataBuilder withInRecoveryState() {
+        this.state = LedgerMetadataFormat.State.IN_RECOVERY;
+        return this;
+    }
 
-
-    LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
+    LedgerMetadataBuilder closingAt(long lastEntryId, long length) {
         this.lastEntryId = Optional.of(lastEntryId);
+        this.length = Optional.of(length);
         this.state = LedgerMetadataFormat.State.CLOSED;
         return this;
     }
 
     LedgerMetadata build() {
         return new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize,
-                                  state, lastEntryId, ensembles,
+                                  state, lastEntryId, length, ensembles,
                                   digestType, password, ctime, customMetadata,
                                   version);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index cf5f3a7..da82531 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -53,7 +53,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> 
{
     final long ledgerId;
     final OpenCallback cb;
     final Object ctx;
-    LedgerHandle lh;
+    ReadOnlyLedgerHandle lh;
     final byte[] passwd;
     boolean doRecovery = true;
     boolean administrativeOpen = false;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 6ab25d6..923c4df 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -18,11 +18,10 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
 import org.slf4j.Logger;
@@ -32,9 +31,7 @@ import org.slf4j.LoggerFactory;
  * This class encapsulated the ledger recovery operation. It first does a read
  * with entry-id of -1 (BookieProtocol.LAST_ADD_CONFIRMED) to all bookies. Then
  * starting from the last confirmed entry (from hints in the ledger entries),
- * it reads forward until it is not able to find a particular entry. It closes
- * the ledger at that entry.
- *
+ * it reads forward until it is not able to find a particular entry.
  */
 class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
 
@@ -42,13 +39,13 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
 
     final LedgerHandle lh;
     final ClientContext clientCtx;
+    final CompletableFuture<LedgerHandle> promise;
 
     final AtomicLong readCount, writeCount;
     volatile boolean readDone;
-    final AtomicBoolean callbackDone;
     volatile long startEntryToRead;
     volatile long endEntryToRead;
-    final GenericCallback<Void> cb;
+
     // keep a copy of metadata for recovery.
     LedgerMetadata metadataForRecovery;
 
@@ -72,13 +69,11 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
 
     }
 
-    public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx,
-                            GenericCallback<Void> cb) {
+    public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx) {
         readCount = new AtomicLong(0);
         writeCount = new AtomicLong(0);
         readDone = false;
-        callbackDone = new AtomicBoolean(false);
-        this.cb = cb;
+        this.promise = new CompletableFuture<>();
         this.lh = lh;
         this.clientCtx = clientCtx;
     }
@@ -96,7 +91,7 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
         return this;
     }
 
-    public void initiate() {
+    public CompletableFuture<LedgerHandle> initiate() {
         ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, 
clientCtx.getBookieClient(), lh.getCurrentEnsemble(),
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     public void readLastConfirmedDataComplete(int rc, 
RecoveryData data) {
@@ -125,24 +120,27 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
          * from writing to it.
          */
         rlcop.initiateWithFencing();
+
+        return promise;
     }
 
     private void submitCallback(int rc) {
         if (BKException.Code.OK == rc) {
             
clientCtx.getClientStats().getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
             
clientCtx.getClientStats().getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
+            promise.complete(lh);
         } else {
             
clientCtx.getClientStats().getRecoverAddCountLogger().registerFailedValue(writeCount.get());
             
clientCtx.getClientStats().getRecoverReadCountLogger().registerFailedValue(readCount.get());
+            promise.completeExceptionally(BKException.create(rc));
         }
-        cb.operationComplete(rc, null);
     }
 
     /**
      * Try to read past the last confirmed.
      */
     private void doRecoveryRead() {
-        if (!callbackDone.get()) {
+        if (!promise.isDone()) {
             startEntryToRead = endEntryToRead + 1;
             endEntryToRead = endEntryToRead + 
clientCtx.getConf().recoveryReadBatchSize;
             new RecoveryReadOp(lh, clientCtx, startEntryToRead, 
endEntryToRead, this, null)
@@ -150,26 +148,6 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
         }
     }
 
-    private void closeAndCallback() {
-        if (callbackDone.compareAndSet(false, true)) {
-            lh.asyncCloseInternal(new CloseCallback() {
-                @Override
-                public void closeComplete(int rc, LedgerHandle lh, Object ctx) 
{
-                    if (rc != BKException.Code.OK) {
-                        LOG.warn("Close ledger {} failed during recovery: ",
-                            LedgerRecoveryOp.this.lh.getId(), 
BKException.getMessage(rc));
-                        submitCallback(rc);
-                    } else {
-                        submitCallback(BKException.Code.OK);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("After closing length is: {}", 
lh.getLength());
-                        }
-                    }
-                }
-            }, null, BKException.Code.LedgerClosedException);
-        }
-    }
-
     @Override
     public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, 
Object ctx) {
         // notify entry listener on individual entries being read during 
ledger recovery.
@@ -179,7 +157,7 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
         }
 
         // we only trigger recovery add an entry when readDone == false && 
callbackDone == false
-        if (!callbackDone.get() && !readDone && rc == BKException.Code.OK) {
+        if (!promise.isDone() && !readDone && rc == BKException.Code.OK) {
             readCount.incrementAndGet();
             byte[] data = entry.getEntry();
 
@@ -211,15 +189,15 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
         if (rc == BKException.Code.NoSuchEntryException || rc == 
BKException.Code.NoSuchLedgerExistsException) {
             readDone = true;
             if (readCount.get() == writeCount.get()) {
-                closeAndCallback();
+                submitCallback(BKException.Code.OK);
             }
             return;
         }
 
         // otherwise, some other error, we can't handle
-        if (BKException.Code.OK != rc && callbackDone.compareAndSet(false, 
true)) {
+        if (BKException.Code.OK != rc && !promise.isDone()) {
             LOG.error("Failure {} while reading entries: ({} - {}), ledger: {} 
while recovering ledger",
-                    BKException.getMessage(rc), startEntryToRead, 
endEntryToRead, lh.getId());
+                      BKException.getMessage(rc), startEntryToRead, 
endEntryToRead, lh.getId());
             submitCallback(rc);
         } else if (BKException.Code.OK == rc) {
             // we are here is because we successfully read an entry but 
readDone was already set to true.
@@ -235,15 +213,12 @@ class LedgerRecoveryOp implements ReadEntryListener, 
AddCallback {
         if (rc != BKException.Code.OK) {
             LOG.error("Failure {} while writing entry: {} while recovering 
ledger: {}",
                     BKException.codeLogger(rc), entryId + 1, lh.ledgerId);
-            if (callbackDone.compareAndSet(false, true)) {
-                // Give up, we can't recover from this error
-                submitCallback(rc);
-            }
+            submitCallback(rc);
             return;
         }
         long numAdd = writeCount.incrementAndGet();
         if (readDone && readCount.get() == numAdd) {
-            closeAndCallback();
+            submitCallback(rc);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index aa0290b..e2c9a44 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -20,8 +20,21 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -30,10 +43,16 @@ import 
org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Version;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Read only ledger handle. This ledger handle allows you to
  * read from a ledger but not to write to it. It overrides all
@@ -41,6 +60,10 @@ import org.apache.bookkeeper.versioning.Version;
  * It should be returned for BookKeeper#openLedger operations.
  */
 class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadOnlyLedgerHandle.class);
+
+    private Object metadataLock = new Object();
+    private final NavigableMap<Long, List<BookieSocketAddress>> 
newEnsemblesFromRecovery = new TreeMap<>();
 
     class MetadataUpdater extends SafeRunnable {
 
@@ -128,29 +151,6 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
     }
 
     @Override
-    void handleBookieFailure(final Map<Integer, BookieSocketAddress> 
failedBookies) {
-        blockAddCompletions.incrementAndGet();
-        synchronized (getLedgerMetadata()) {
-            try {
-                EnsembleInfo ensembleInfo = 
replaceBookieInMetadata(failedBookies,
-                        numEnsembleChanges.incrementAndGet());
-                if (ensembleInfo.replacedBookies.isEmpty()) {
-                    blockAddCompletions.decrementAndGet();
-                    return;
-                }
-                blockAddCompletions.decrementAndGet();
-                // the failed bookie has been replaced
-                unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, 
ensembleInfo.replacedBookies);
-            } catch (BKException.BKNotEnoughBookiesException e) {
-                LOG.error("Could not get additional bookie to "
-                          + "remake ensemble, closing ledger: " + ledgerId);
-                handleUnrecoverableErrorDuringAdd(e.getCode());
-                return;
-            }
-        }
-    }
-
-    @Override
     public void onChanged(long lid, LedgerMetadata newMetadata) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received ledger metadata update on {} : {}", lid, 
newMetadata);
@@ -208,4 +208,212 @@ class ReadOnlyLedgerHandle extends LedgerHandle 
implements LedgerMetadataListene
             }
         }, ctx);
     }
+
+    List<BookieSocketAddress> replaceBookiesInEnsemble(LedgerMetadata metadata,
+                                                       
List<BookieSocketAddress> oldEnsemble,
+                                                       Map<Integer, 
BookieSocketAddress> failedBookies)
+            throws BKException.BKNotEnoughBookiesException {
+        List<BookieSocketAddress> newEnsemble = new ArrayList<>(oldEnsemble);
+
+        int ensembleSize = metadata.getEnsembleSize();
+        int writeQ = metadata.getWriteQuorumSize();
+        int ackQ = metadata.getAckQuorumSize();
+        Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+
+        Set<BookieSocketAddress> exclude = new 
HashSet<>(failedBookies.values());
+
+        int replaced = 0;
+        for (Map.Entry<Integer, BookieSocketAddress> entry : 
failedBookies.entrySet()) {
+            int idx = entry.getKey();
+            BookieSocketAddress addr = entry.getValue();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[EnsembleChange-L{}] replacing bookie: {} index: 
{}", getId(), addr, idx);
+            }
+
+            if (!newEnsemble.get(idx).equals(addr)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}] Not changing failed bookie 
{} at index {}, already changed to {}",
+                              getId(), addr, idx, newEnsemble.get(idx));
+                }
+                continue;
+            }
+            try {
+                BookieSocketAddress newBookie = 
clientCtx.getBookieWatcher().replaceBookie(
+                        ensembleSize, writeQ, ackQ, customMetadata, 
newEnsemble, idx, exclude);
+                newEnsemble.set(idx, newBookie);
+
+                replaced++;
+            } catch (BKException.BKNotEnoughBookiesException e) {
+                // if there is no bookie replaced, we throw not enough bookie 
exception
+                if (replaced <= 0) {
+                    throw e;
+                } else {
+                    break;
+                }
+            }
+        }
+        return newEnsemble;
+    }
+
+    private static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
+                                             List<BookieSocketAddress> e2) {
+        checkArgument(e1.size() == e2.size(), "Ensembles must be of same 
size");
+        Set<Integer> diff = new HashSet<>();
+        for (int i = 0; i < e1.size(); i++) {
+            if (!e1.get(i).equals(e2.get(i))) {
+                diff.add(i);
+            }
+        }
+        return diff;
+    }
+
+    /**
+     * For a read only ledger handle, this method will only ever be called 
during recovery,
+     * when we are reading forward from LAC and writing back those entries. As 
such,
+     * unlike with LedgerHandle, we do not want to persist changes to the 
metadata as they occur,
+     * but rather, we want to defer the persistence until recovery has 
completed, and do it all
+     * on the close.
+     */
+    @Override
+    void handleBookieFailure(final Map<Integer, BookieSocketAddress> 
failedBookies) {
+        blockAddCompletions.incrementAndGet();
+
+        // handleBookieFailure should always run in the ordered executor 
thread for this
+        // ledger, so this synchronized should be unnecessary, but putting it 
here now
+        // just in case (can be removed when we validate threads)
+        synchronized (metadataLock) {
+            long lac = getLastAddConfirmed();
+            LedgerMetadata metadata = getLedgerMetadata();
+            List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
+            try {
+                List<BookieSocketAddress> newEnsemble = 
replaceBookiesInEnsemble(metadata, currentEnsemble,
+                                                                               
  failedBookies);
+
+                Set<Integer> replaced = diffEnsemble(currentEnsemble, 
newEnsemble);
+                blockAddCompletions.decrementAndGet();
+                if (!replaced.isEmpty()) {
+                    newEnsemblesFromRecovery.put(lac + 1, newEnsemble);
+                    unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
+                }
+            } catch (BKException.BKNotEnoughBookiesException e) {
+                LOG.error("Could not get additional bookie to remake ensemble, 
closing ledger: {}", ledgerId);
+
+                handleUnrecoverableErrorDuringAdd(e.getCode());
+                return;
+            }
+        }
+    }
+
+    @Override
+    void handleUnrecoverableErrorDuringAdd(int rc) {
+        errorOutPendingAdds(rc);
+    }
+
+    void recover(GenericCallback<Void> finalCb) {
+        recover(finalCb, null, false);
+    }
+
+    /**
+     * Recover the ledger.
+     *
+     * @param finalCb
+     *          callback after recovery is done.
+     * @param listener
+     *          read entry listener on recovery reads.
+     * @param forceRecovery
+     *          force the recovery procedure even the ledger metadata shows 
the ledger is closed.
+     */
+    void recover(GenericCallback<Void> finalCb,
+                 final @VisibleForTesting ReadEntryListener listener,
+                 final boolean forceRecovery) {
+        final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
+            finalCb,
+            BKException.Code.OK,
+            clientCtx.getClientStats().getRecoverOpLogger());
+
+        MetadataUpdateLoop.NeedsUpdatePredicate needsUpdate =
+            (metadata) -> !(metadata.isClosed() || metadata.isInRecovery());
+        if (forceRecovery) {
+            // in the force recovery case, we want to update the metadata
+            // to IN_RECOVERY, even if the ledger is already closed
+            needsUpdate = (metadata) -> !metadata.isInRecovery();
+        }
+        new MetadataUpdateLoop(
+                clientCtx.getLedgerManager(), getId(),
+                this::getLedgerMetadata,
+                needsUpdate,
+                (metadata) -> 
LedgerMetadataBuilder.from(metadata).withInRecoveryState().build(),
+                this::setLedgerMetadata)
+            .run()
+            .thenCompose((metadata) -> {
+                    if (metadata.isClosed()) {
+                        return 
CompletableFuture.completedFuture(ReadOnlyLedgerHandle.this);
+                    } else {
+                        return new LedgerRecoveryOp(ReadOnlyLedgerHandle.this, 
clientCtx)
+                            .setEntryListener(listener)
+                            .initiate();
+                    }
+            })
+            .thenCompose((ignore) -> closeRecovered())
+            .whenComplete((ignore, ex) -> {
+                    if (ex != null) {
+                        cb.operationComplete(
+                                BKException.getExceptionCode(ex, 
BKException.Code.UnexpectedConditionException), null);
+                    } else {
+                        cb.operationComplete(BKException.Code.OK, null);
+                    }
+            });
+    }
+
+    CompletableFuture<LedgerMetadata> closeRecovered() {
+        long lac, len;
+        synchronized (this) {
+            lac = lastAddConfirmed;
+            len = length;
+        }
+        LOG.info("Closing recovered ledger {} at entry {}", getId(), lac);
+        CompletableFuture<LedgerMetadata> f = new MetadataUpdateLoop(
+                clientCtx.getLedgerManager(), getId(),
+                this::getLedgerMetadata,
+                (metadata) -> metadata.isInRecovery(),
+                (metadata) -> {
+                    LedgerMetadataBuilder builder = 
LedgerMetadataBuilder.from(metadata);
+                    Optional<Long> lastEnsembleKey = 
metadata.getLastEnsembleKey();
+                    checkState(lastEnsembleKey.isPresent(),
+                               "Metadata shouldn't have been created without 
at least one ensemble");
+                    synchronized (metadataLock) {
+                        newEnsemblesFromRecovery.entrySet().forEach(
+                                (e) -> {
+                                    checkState(e.getKey() >= 
lastEnsembleKey.get(),
+                                               "Once a ledger is in recovery, 
noone can add ensembles without closing");
+                                    // Occurs when a bookie need to be 
replaced at very start of recovery
+                                    if 
(lastEnsembleKey.get().equals(e.getKey())) {
+                                        
builder.replaceEnsembleEntry(e.getKey(), e.getValue());
+                                    } else {
+                                        builder.newEnsembleEntry(e.getKey(), 
e.getValue());
+                                    }
+                                });
+                    }
+                    return builder.closingAt(lac, len).build();
+                },
+                this::setLedgerMetadata).run();
+        f.thenRun(() -> {
+                synchronized (metadataLock) {
+                    newEnsemblesFromRecovery.clear();
+                }
+            });
+        return f;
+    }
+
+    @Override
+    List<BookieSocketAddress> getCurrentEnsemble() {
+        synchronized (metadataLock) {
+            if (!newEnsemblesFromRecovery.isEmpty()) {
+                return newEnsemblesFromRecovery.lastEntry().getValue();
+            } else {
+                return super.getCurrentEnsemble();
+            }
+        }
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
new file mode 100644
index 0000000..2692df8
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for extracting codes from BKException.
+ */
+public class BKExceptionTest {
+    @Test
+    public void testBKExceptionCode() {
+        Assert.assertEquals(BKException.Code.WriteException,
+                            BKException.getExceptionCode(new 
BKException.BKWriteException(),
+                                                         
BKException.Code.ReadException));
+    }
+
+    @Test
+    public void testNonBKExceptionCode() {
+        Assert.assertEquals(BKException.Code.ReadException,
+                            BKException.getExceptionCode(new Exception(),
+                                                         
BKException.Code.ReadException));
+
+    }
+
+    @Test
+    public void testNestedBKExceptionCode() {
+        Assert.assertEquals(BKException.Code.WriteException,
+                            BKException.getExceptionCode(
+                                    new ExecutionException("test", new 
BKException.BKWriteException()),
+                                    BKException.Code.ReadException));
+    }
+
+    @Test
+    public void testDoubleNestedBKExceptionCode() {
+        Assert.assertEquals(BKException.Code.WriteException,
+                            BKException.getExceptionCode(
+                                    new ExecutionException("test",
+                                            new CompletionException("blah",
+                                                    new 
BKException.BKWriteException())),
+                                    BKException.Code.ReadException));
+
+    }
+}
+
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
new file mode 100644
index 0000000..9a7cc7f
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ledger recovery tests using mocks rather than a real cluster.
+ */
+public class LedgerRecovery2Test {
+    private static final Logger log = 
LoggerFactory.getLogger(LedgerRecovery2Test.class);
+
+    private static final byte[] PASSWD = "foobar".getBytes();
+    private static final BookieSocketAddress b1 = new 
BookieSocketAddress("b1", 3181);
+    private static final BookieSocketAddress b2 = new 
BookieSocketAddress("b2", 3181);
+    private static final BookieSocketAddress b3 = new 
BookieSocketAddress("b3", 3181);
+    private static final BookieSocketAddress b4 = new 
BookieSocketAddress("b4", 3181);
+    private static final BookieSocketAddress b5 = new 
BookieSocketAddress("b5", 3181);
+
+    private static LedgerMetadata setupLedger(ClientContext clientCtx, long 
ledgerId,
+                                              List<BookieSocketAddress> 
bookies) throws Exception {
+        LedgerMetadata md = LedgerMetadataBuilder.create()
+            .withPassword(PASSWD)
+            .newEnsembleEntry(0, bookies).build();
+        GenericCallbackFuture<LedgerMetadata> mdPromise = new 
GenericCallbackFuture<>();
+        clientCtx.getLedgerManager().createLedgerMetadata(1L, md, mdPromise);
+        return mdPromise.get();
+    }
+
+    @Test
+    public void testCantRecoverAllDown() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1L, Lists.newArrayList(b1, 
b2, b3));
+
+        clientCtx.getMockBookieClient().errorBookies(b1, b2, b3);
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+        try {
+            GenericCallbackFuture<Void> promise = new 
GenericCallbackFuture<>();
+            lh.recover(promise, null, false);
+            promise.get();
+            Assert.fail("Recovery shouldn't have been able to complete");
+        } catch (ExecutionException ee) {
+            Assert.assertEquals(BKException.BKReadException.class, 
ee.getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testCanReadLacButCantWrite() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> FutureUtils.exception(new 
BKException.BKWriteException()));
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+        try {
+            GenericCallbackFuture<Void> promise = new 
GenericCallbackFuture<>();
+            lh.recover(promise, null, false);
+            promise.get();
+            Assert.fail("Recovery shouldn't have been able to complete");
+        } catch (ExecutionException ee) {
+            Assert.assertEquals(BKException.BKNotEnoughBookiesException.class, 
ee.getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testMetadataClosedDuringRecovery() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+
+        CompletableFuture<Void> writingBack = new CompletableFuture<>();
+        CompletableFuture<Void> blocker = new CompletableFuture<>();
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        // will block recovery at the writeback phase
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    writingBack.complete(null);
+                    return blocker;
+                });
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+
+        writingBack.get(10, TimeUnit.SECONDS);
+
+        GenericCallbackFuture<LedgerMetadata> readPromise = new 
GenericCallbackFuture<>();
+        clientCtx.getLedgerManager().readLedgerMetadata(1L, readPromise);
+        LedgerMetadataBuilder builder = 
LedgerMetadataBuilder.from(readPromise.get());
+        GenericCallbackFuture<LedgerMetadata> writePromise = new 
GenericCallbackFuture<>();
+        clientCtx.getLedgerManager().writeLedgerMetadata(1L, 
builder.closingAt(-1, 0).build(), writePromise);
+        writePromise.get();
+
+        // allow recovery to continue
+        blocker.complete(null);
+
+        recoveryPromise.get();
+
+        Assert.assertEquals(lh.getLastAddConfirmed(), -1);
+        Assert.assertEquals(lh.getLength(), 0);
+    }
+
+    @Test
+    public void testNewEnsembleAddedDuringRecovery() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+
+        CompletableFuture<Void> writingBack = new CompletableFuture<>();
+        CompletableFuture<Void> blocker = new CompletableFuture<>();
+        CompletableFuture<Void> failing = new CompletableFuture<>();
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        // will block recovery at the writeback phase
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    writingBack.complete(null);
+                    if (bookie.equals(b3)) {
+                        return failing;
+                    } else {
+                        return blocker;
+                    }
+                });
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+
+        writingBack.get(10, TimeUnit.SECONDS);
+
+        GenericCallbackFuture<LedgerMetadata> readPromise = new 
GenericCallbackFuture<>();
+        clientCtx.getLedgerManager().readLedgerMetadata(1L, readPromise);
+        LedgerMetadata newMeta = LedgerMetadataBuilder.from(readPromise.get())
+            .newEnsembleEntry(1L, Lists.newArrayList(b1, b2, b4)).build();
+        GenericCallbackFuture<LedgerMetadata> writePromise = new 
GenericCallbackFuture<>();
+        clientCtx.getLedgerManager().writeLedgerMetadata(1L, newMeta, 
writePromise);
+        writePromise.get();
+
+        // allow recovery to continue
+        failing.completeExceptionally(new BKException.BKWriteException());
+        blocker.complete(null);
+
+        try {
+            recoveryPromise.get();
+            Assert.fail("Should fail on the update");
+        } catch (ExecutionException ee) {
+            
Assert.assertEquals(BKException.BKUnexpectedConditionException.class, 
ee.getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testRecoveryBookieFailedAtStart() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+
+        CompletableFuture<Void> writingBack = new CompletableFuture<>();
+        CompletableFuture<Void> blocker = new CompletableFuture<>();
+        CompletableFuture<Void> failing = new CompletableFuture<>();
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().errorBookies(b2);
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+        recoveryPromise.get();
+
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 1);
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(0L),
+                            Lists.newArrayList(b1, b4, b3));
+    }
+
+    @Test
+    public void testRecoveryOneBookieFailsDuring() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().seedEntries(b3, 1L, 1L, -1L);
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b2) && entryId == 1L) {
+                        return FutureUtils.exception(new 
BKException.BKWriteException());
+                    } else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+        recoveryPromise.get();
+
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 2);
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(0L),
+                            Lists.newArrayList(b1, b2, b3));
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(1L),
+                            Lists.newArrayList(b1, b4, b3));
+        Assert.assertEquals(lh.getLastAddConfirmed(), 1L);
+    }
+
+    @Test
+    public void testRecoveryTwoBookiesFailOnSameEntry() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        clientCtx.getMockRegistrationClient().addBookies(b4, b5).get();
+
+        LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, 
b2, b3));
+        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b1) || bookie.equals(b2)) {
+                        return FutureUtils.exception(new 
BKException.BKWriteException());
+                    } else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+        recoveryPromise.get();
+
+        Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 1);
+        
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b3));
+        
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b4));
+        
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b5));
+        Assert.assertEquals(lh.getLastAddConfirmed(), 0L);
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index e126001..6a21da2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -29,9 +29,9 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
 import java.util.Enumeration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
@@ -357,7 +356,6 @@ public class LedgerRecoveryTest extends 
BookKeeperClusterTestCase {
 
         // start a new good server
         startNewBookie();
-
         LedgerHandle lhafter = bkc.openLedger(lhbefore.getId(), digestType,
                 "".getBytes());
         assertEquals("Fenced ledger should have correct lastAddConfirmed",
@@ -479,24 +477,13 @@ public class LedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), 
digestType, "".getBytes());
         assertEquals(BookieProtocol.INVALID_ENTRY_ID, 
recoverLh.getLastAddConfirmed());
 
-        final CountDownLatch recoverLatch = new CountDownLatch(1);
-        final AtomicBoolean success = new AtomicBoolean(false);
-
         MockClientContext parallelReadCtx = 
MockClientContext.copyOf(bkc.getClientCtx())
             
.setConf(ClientInternalConf.fromConfig(newConf.setEnableParallelRecoveryRead(true)));
 
-        LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(
-                recoverLh, parallelReadCtx,
-                new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-                    @Override
-                    public void operationComplete(int rc, Void result) {
-                        success.set(BKException.Code.OK == rc);
-                        recoverLatch.countDown();
-                    }
-                });
-        recoveryOp.initiate();
-        recoverLatch.await(10, TimeUnit.SECONDS);
-        assertTrue(success.get());
+        LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh, 
parallelReadCtx);
+        CompletableFuture<LedgerHandle> f = recoveryOp.initiate();
+        f.get(10, TimeUnit.SECONDS);
+
         assertEquals(numEntries, recoveryOp.readCount.get());
         assertEquals(numEntries, recoveryOp.writeCount.get());
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index 5ed75ce..e70a7d3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -361,7 +361,7 @@ public class MetadataUpdateLoopTest {
                     ledgerId,
                     reference::get,
                     (currentMetadata) -> !currentMetadata.isClosed(),
-                    (currentMetadata) -> 
LedgerMetadataBuilder.from(currentMetadata).closingAtEntry(10L).build(),
+                    (currentMetadata) -> 
LedgerMetadataBuilder.from(currentMetadata).closingAt(10L, 100L).build(),
                     reference::compareAndSet).run();
             CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
                     lm,
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 783e435..c290a7e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -430,16 +430,17 @@ public abstract class MockBookKeeperTestCase {
     @SuppressWarnings("unchecked")
     private void setupWriteLedgerMetadata() {
         doAnswer(invocation -> {
-            Object[] args = invocation.getArguments();
-            Long ledgerId = (Long) args[0];
-            LedgerMetadata metadata = (LedgerMetadata) args[1];
-            BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
-            executor.executeOrdered(ledgerId, () -> {
-                mockLedgerMetadataRegistry.put(ledgerId, new 
LedgerMetadata(metadata));
-                cb.operationComplete(BKException.Code.OK, null);
-            });
-            return null;
-        }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any());
+                Object[] args = invocation.getArguments();
+                Long ledgerId = (Long) args[0];
+                LedgerMetadata metadata = (LedgerMetadata) args[1];
+                BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+                executor.executeOrdered(ledgerId, () -> {
+                        LedgerMetadata newMetadata = 
LedgerMetadataBuilder.from(metadata).build();
+                        mockLedgerMetadataRegistry.put(ledgerId, newMetadata);
+                        cb.operationComplete(BKException.Code.OK, newMetadata);
+                    });
+                return null;
+            }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), 
any());
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index 2f5b2dc..f36c008 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -51,16 +51,19 @@ public class MockClientContext implements ClientContext {
     private BooleanSupplier isClientClosed;
     private MockRegistrationClient regClient;
 
-    static MockClientContext create() {
+    static MockClientContext create() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         OrderedScheduler scheduler = 
OrderedScheduler.newSchedulerBuilder().name("mock-executor").numThreads(1).build();
         MockRegistrationClient regClient = new MockRegistrationClient();
         EnsemblePlacementPolicy placementPolicy = new 
DefaultEnsemblePlacementPolicy();
+        BookieWatcherImpl bookieWatcherImpl = new BookieWatcherImpl(conf, 
placementPolicy,
+                                                                    regClient, 
NullStatsLogger.INSTANCE);
+        bookieWatcherImpl.initialBlockingBookieRead();
 
         return new MockClientContext()
             .setConf(ClientInternalConf.fromConfig(conf))
             .setLedgerManager(new MockLedgerManager())
-            .setBookieWatcher(new BookieWatcherImpl(conf, placementPolicy, 
regClient, NullStatsLogger.INSTANCE))
+            .setBookieWatcher(bookieWatcherImpl)
             .setPlacementPolicy(placementPolicy)
             .setRegistrationClient(regClient)
             .setBookieClient(new MockBookieClient(scheduler))
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 8b5f5a6..9bdbfac 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -342,7 +342,7 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
 
         final CountDownLatch recoverLatch = new CountDownLatch(1);
         final AtomicBoolean success = new AtomicBoolean(false);
-        recoverLh.recover(new GenericCallback<Void>() {
+        ((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>() 
{
             @Override
             public void operationComplete(int rc, Void result) {
                 LOG.info("Recovering ledger {} completed : {}.", lh.getId(), 
rc);
@@ -459,7 +459,7 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         final AtomicBoolean isMetadataClosed = new AtomicBoolean(false);
         final AtomicInteger numSuccessCalls = new AtomicInteger(0);
         final AtomicInteger numFailureCalls = new AtomicInteger(0);
-        recoverLh.recover(new GenericCallback<Void>() {
+        ((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>() 
{
             @Override
             public void operationComplete(int rc, Void result) {
                 if (BKException.Code.OK == rc) {
@@ -639,7 +639,7 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         tlm1.setLatch(metadataLatch);
         final CountDownLatch recoverLatch = new CountDownLatch(1);
         final AtomicBoolean recoverSuccess = new AtomicBoolean(false);
-        lh1.recover(new GenericCallback<Void>() {
+        ((ReadOnlyLedgerHandle) lh1).recover(new GenericCallback<Void>() {
             @Override
             public void operationComplete(int rc, Void result) {
                 LOG.info("Recovering ledger {} completed : {}", lh1.getId(), 
rc);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
index 40178b9..8ce3686 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
@@ -55,7 +55,7 @@ public class MockRegistrationClient implements 
RegistrationClient {
         return new Versioned<>(Collections.unmodifiableSet(bookies), new 
LongVersion(version));
     }
 
-    CompletableFuture<Void> addBookies(BookieSocketAddress... bookies) {
+    public CompletableFuture<Void> addBookies(BookieSocketAddress... bookies) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         executor.submit(() -> {
                 currentVersion++;
@@ -68,7 +68,7 @@ public class MockRegistrationClient implements 
RegistrationClient {
         return promise;
     }
 
-    CompletableFuture<Void> removeBookies(BookieSocketAddress... bookies) {
+    public CompletableFuture<Void> removeBookies(BookieSocketAddress... 
bookies) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         executor.submit(() -> {
                 currentVersion++;
@@ -81,7 +81,7 @@ public class MockRegistrationClient implements 
RegistrationClient {
         return promise;
     }
 
-    CompletableFuture<Void> addReadOnlyBookies(BookieSocketAddress... bookies) 
{
+    public CompletableFuture<Void> addReadOnlyBookies(BookieSocketAddress... 
bookies) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         executor.submit(() -> {
                 currentVersion++;
@@ -94,7 +94,7 @@ public class MockRegistrationClient implements 
RegistrationClient {
         return promise;
     }
 
-    CompletableFuture<Void> removeReadOnlyBookies(BookieSocketAddress... 
bookies) {
+    public CompletableFuture<Void> 
removeReadOnlyBookies(BookieSocketAddress... bookies) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         executor.submit(() -> {
                 currentVersion++;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 79cc5ba..04119d5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -25,19 +25,19 @@ import static 
org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
@@ -46,8 +46,9 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.SafeRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,24 +64,36 @@ public class MockBookieClient implements BookieClient {
     final Set<BookieSocketAddress> errorBookies =
         Collections.newSetFromMap(new ConcurrentHashMap<BookieSocketAddress, 
Boolean>());
 
-    final Map<BookieSocketAddress, Boolean> stalledBookies = new HashMap<>();
-    final Map<BookieSocketAddress, List<Consumer<Integer>>> stalledRequests = 
new HashMap<>();
+    /**
+     * Runs before or after an operation. Can stall the operation or error it.
+     */
+    public interface Hook {
+        CompletableFuture<Void> runHook(BookieSocketAddress bookie, long 
ledgerId, long entryId);
+    }
+
+    private Hook preReadHook = (bookie, ledgerId, entryId) -> 
FutureUtils.value(null);
+    private Hook postReadHook = (bookie, ledgerId, entryId) -> 
FutureUtils.value(null);
+    private Hook preWriteHook = (bookie, ledgerId, entryId) -> 
FutureUtils.value(null);
+    private Hook postWriteHook = (bookie, ledgerId, entryId) -> 
FutureUtils.value(null);
 
     public MockBookieClient(OrderedExecutor executor) {
         this.executor = executor;
     }
 
-    public void stallBookie(BookieSocketAddress bookie) {
-        synchronized (this) {
-            stalledBookies.put(bookie, true);
-        }
+    public void setPreReadHook(Hook hook) {
+        this.preReadHook = hook;
     }
 
-    public void releaseStalledBookie(BookieSocketAddress bookie, int rc) {
-        synchronized (this) {
-            stalledBookies.remove(bookie);
-            stalledRequests.remove(bookie).forEach((r) -> r.accept(rc));
-        }
+    public void setPostReadHook(Hook hook) {
+        this.postReadHook = hook;
+    }
+
+    public void setPreWriteHook(Hook hook) {
+        this.preWriteHook = hook;
+    }
+
+    public void setPostWriteHook(Hook hook) {
+        this.postWriteHook = hook;
     }
 
     public void errorBookies(BookieSocketAddress... bookies) {
@@ -95,6 +108,15 @@ public class MockBookieClient implements BookieClient {
         }
     }
 
+    public void seedEntries(BookieSocketAddress bookie, long ledgerId, long 
entryId, long lac) throws Exception {
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, new 
byte[0], DigestType.CRC32C);
+        ByteBuf entry = 
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
+                                                     entryId, lac, 0, 
Unpooled.buffer(10)));
+
+        LedgerData ledger = getBookieData(bookie).computeIfAbsent(ledgerId, 
LedgerData::new);
+        ledger.addEntry(entryId, entry);
+    }
+
     @Override
     public List<BookieSocketAddress> getFaultyBookies() {
         return Collections.emptyList();
@@ -134,41 +156,29 @@ public class MockBookieClient implements BookieClient {
     public void addEntry(BookieSocketAddress addr, long ledgerId, byte[] 
masterKey,
                          long entryId, ByteBufList toSend, WriteCallback cb, 
Object ctx,
                          int options, boolean allowFastFail, 
EnumSet<WriteFlag> writeFlags) {
-        SafeRunnable write = safeRun(() -> {
-                LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
-                if (errorBookies.contains(addr)) {
-                    LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, 
entryId);
-                    cb.writeComplete(BKException.Code.WriteException, 
ledgerId, entryId, addr, ctx);
-                    return;
-                }
-                LedgerData ledger = 
getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
-                ledger.addEntry(entryId, copyData(toSend));
-                cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, 
ctx);
-                toSend.release();
-            });
-
         toSend.retain();
-        synchronized (this) {
-            if (stalledBookies.getOrDefault(addr, false)) {
-                LOG.info("[{};{};{}] Stalling write {}", addr, ledgerId, 
System.identityHashCode(write), entryId);
-                stalledRequests.computeIfAbsent(addr, (key) -> new 
ArrayList<>())
-                    .add((rc) -> {
-                            LOG.info("[{};{};{}] Unstalled write {}",
-                                     addr, ledgerId, 
System.identityHashCode(write), entryId);
-                            if (rc == BKException.Code.OK) {
-                                executor.executeOrdered(ledgerId, write);
-                            } else {
-                                executor.executeOrdered(
-                                        ledgerId, safeRun(() -> {
-                                            cb.writeComplete(rc, ledgerId, 
entryId, addr, ctx);
-                                            toSend.release();
-                                        }));
-                            }
-                        });
-            } else {
-                executor.executeOrdered(ledgerId, write);
-            }
-        }
+        preWriteHook.runHook(addr, ledgerId, entryId)
+            .thenComposeAsync(
+                (ignore) -> {
+                    LOG.info("[{};L{}] write entry {}", addr, ledgerId, 
entryId);
+                    if (errorBookies.contains(addr)) {
+                        LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, 
entryId);
+                        return FutureUtils.exception(new 
BKException.BKWriteException());
+                    }
+                    LedgerData ledger = 
getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
+                    ledger.addEntry(entryId, copyData(toSend));
+                    toSend.release();
+                    return FutureUtils.value(null);
+                }, executor.chooseThread(ledgerId))
+            .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, 
entryId))
+            .whenCompleteAsync((res, ex) -> {
+                    if (ex != null) {
+                        cb.writeComplete(BKException.getExceptionCode(ex, 
BKException.Code.WriteException),
+                                         ledgerId, entryId, addr, ctx);
+                    } else {
+                        cb.writeComplete(BKException.Code.OK, ledgerId, 
entryId, addr, ctx);
+                    }
+                }, executor.chooseThread(ledgerId));
     }
 
     @Override
@@ -184,34 +194,38 @@ public class MockBookieClient implements BookieClient {
     public void readEntry(BookieSocketAddress addr, long ledgerId, long 
entryId,
                           ReadEntryCallback cb, Object ctx, int flags, byte[] 
masterKey,
                           boolean allowFastFail) {
-        executor.executeOrdered(ledgerId,
-                safeRun(() -> {
-                        LOG.info("[{};L{}] read entry {}", addr, ledgerId, 
entryId);
-                        if (errorBookies.contains(addr)) {
-                            LOG.warn("[{};L{}] erroring read {}", addr, 
ledgerId, entryId);
-                            
cb.readEntryComplete(BKException.Code.ReadException, ledgerId, entryId, null, 
ctx);
-                            return;
-                        }
-
-                        LedgerData ledger = getBookieData(addr).get(ledgerId);
-                        if (ledger == null) {
-                            LOG.warn("[{};L{}] ledger not found", addr, 
ledgerId);
-                            
cb.readEntryComplete(BKException.Code.NoSuchLedgerExistsException,
-                                                 ledgerId, entryId, null, ctx);
-                            return;
-                        }
-
-                        ByteBuf entry = ledger.getEntry(entryId);
-                        if (entry == null) {
-                            LOG.warn("[{};L{}] entry({}) not found", addr, 
ledgerId, entryId);
-                            
cb.readEntryComplete(BKException.Code.NoSuchEntryException,
-                                                 ledgerId, entryId, null, ctx);
-                            return;
-                        }
-
+        preReadHook.runHook(addr, ledgerId, entryId)
+            .thenComposeAsync((res) -> {
+                    LOG.info("[{};L{}] read entry {}", addr, ledgerId, 
entryId);
+                    if (errorBookies.contains(addr)) {
+                        LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, 
entryId);
+                        return FutureUtils.exception(new 
BKException.BKReadException());
+                    }
+
+                    LedgerData ledger = getBookieData(addr).get(ledgerId);
+                    if (ledger == null) {
+                        LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
+                        return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
+                    }
+
+                    ByteBuf entry = ledger.getEntry(entryId);
+                    if (entry == null) {
+                        LOG.warn("[{};L{}] entry({}) not found", addr, 
ledgerId, entryId);
+                        return FutureUtils.exception(new 
BKException.BKNoSuchEntryException());
+                    }
+
+                    return FutureUtils.value(entry);
+                }, executor.chooseThread(ledgerId))
+            .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, 
entryId).thenApply((res) -> buf))
+            .whenCompleteAsync((res, ex) -> {
+                    if (ex != null) {
+                        cb.readEntryComplete(BKException.getExceptionCode(ex, 
BKException.Code.ReadException),
+                                             ledgerId, entryId, null, ctx);
+                    } else {
                         cb.readEntryComplete(BKException.Code.OK,
-                                             ledgerId, entryId, entry.slice(), 
ctx);
-                    }));
+                                             ledgerId, entryId, res.slice(), 
ctx);
+                    }
+                }, executor.chooseThread(ledgerId));
     }
 
     @Override
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
index 8978ff3..3152a74 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 
@@ -31,7 +33,9 @@ public class BookKeeperAccessor {
 
     public static void forceRecoverLedger(LedgerHandle lh,
                                           
BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
-        lh.recover(cb, null, true);
+        checkArgument(lh instanceof ReadOnlyLedgerHandle,
+                      "Recovery can only run on ReadOnlyLedgerHandle");
+        ((ReadOnlyLedgerHandle) lh).recover(cb, null, true);
     }
 
     public static LedgerMetadata getLedgerMetadata(LedgerHandle lh) {

Reply via email to