This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c766575 Recovery uses immutable metadata
c766575 is described below
commit c766575c7b36c2692fe403c996a3fd3516e5708a
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Sep 4 12:31:15 2018 -0700
Recovery uses immutable metadata
And MetadataUpdateLoop to update it.
Master issue: #281
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1621 from ivankelly/recovery-immutable
---
.../org/apache/bookkeeper/client/BKException.java | 13 +
.../org/apache/bookkeeper/client/LedgerHandle.java | 112 +--------
.../apache/bookkeeper/client/LedgerMetadata.java | 10 +-
.../bookkeeper/client/LedgerMetadataBuilder.java | 21 +-
.../org/apache/bookkeeper/client/LedgerOpenOp.java | 2 +-
.../apache/bookkeeper/client/LedgerRecoveryOp.java | 63 ++---
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 254 +++++++++++++++++--
.../apache/bookkeeper/client/BKExceptionTest.java | 67 +++++
.../bookkeeper/client/LedgerRecovery2Test.java | 278 +++++++++++++++++++++
.../bookkeeper/client/LedgerRecoveryTest.java | 23 +-
.../bookkeeper/client/MetadataUpdateLoopTest.java | 2 +-
.../bookkeeper/client/MockBookKeeperTestCase.java | 21 +-
.../bookkeeper/client/MockClientContext.java | 7 +-
.../client/ParallelLedgerRecoveryTest.java | 6 +-
.../discover/MockRegistrationClient.java | 8 +-
.../apache/bookkeeper/proto/MockBookieClient.java | 166 ++++++------
.../bookkeeper/client/BookKeeperAccessor.java | 6 +-
17 files changed, 764 insertions(+), 295 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index ddfc795..8ab563b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -426,4 +426,17 @@ public abstract class BKException extends
org.apache.bookkeeper.client.api.BKExc
super(Code.LedgerIdOverflowException);
}
}
+
+ /**
+ * Extract an exception code from an BKException, or use a default if it's
another type.
+ */
+ public static int getExceptionCode(Throwable t, int defaultCode) {
+ if (t instanceof BKException) {
+ return ((BKException) t).getCode();
+ } else if (t.getCause() != null) {
+ return getExceptionCode(t.getCause(), defaultCode);
+ } else {
+ return defaultCode;
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 741610d..1b859a3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -76,9 +76,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
@@ -329,6 +327,10 @@ public class LedgerHandle implements WriteHandle {
// ensure that we only update the metadata if it is the object we
expect it to be
if (metadata == expected) {
metadata = newMetadata;
+ if (metadata.isClosed()) {
+ lastAddConfirmed = lastAddPushed =
metadata.getLastEntryId();
+ length = metadata.getLength();
+ }
return true;
} else {
return false;
@@ -2224,112 +2226,6 @@ public class LedgerHandle implements WriteHandle {
}
}
-
- void recover(GenericCallback<Void> finalCb) {
- recover(finalCb, null, false);
- }
-
- /**
- * Recover the ledger.
- *
- * @param finalCb
- * callback after recovery is done.
- * @param listener
- * read entry listener on recovery reads.
- * @param forceRecovery
- * force the recovery procedure even the ledger metadata shows
the ledger is closed.
- */
- void recover(GenericCallback<Void> finalCb,
- final @VisibleForTesting
BookkeeperInternalCallbacks.ReadEntryListener listener,
- final boolean forceRecovery) {
- final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
- finalCb,
- BKException.Code.OK,
- clientCtx.getClientStats().getRecoverOpLogger());
- boolean wasClosed = false;
- boolean wasInRecovery = false;
-
- LedgerMetadata metadata = getLedgerMetadata();
- synchronized (this) {
- if (metadata.isClosed()) {
- if (forceRecovery) {
- wasClosed = false;
- // mark the ledger back to in recovery state, so it would
proceed ledger recovery again.
- wasInRecovery = false;
- metadata.markLedgerInRecovery();
- } else {
- lastAddConfirmed = lastAddPushed =
metadata.getLastEntryId();
- length = metadata.getLength();
- wasClosed = true;
- }
- } else {
- wasClosed = false;
- if (metadata.isInRecovery()) {
- wasInRecovery = true;
- } else {
- wasInRecovery = false;
- metadata.markLedgerInRecovery();
- }
- }
- }
-
- if (wasClosed) {
- // We are already closed, nothing to do
- cb.operationComplete(BKException.Code.OK, null);
- return;
- }
-
- if (wasInRecovery) {
- // if metadata is already in recover, dont try to write again,
- // just do the recovery from the starting point
- new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
- .setEntryListener(listener)
- .initiate();
- return;
- }
-
- writeLedgerConfig(new
OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(), ledgerId)
{
- @Override
- public void safeOperationComplete(final int rc, LedgerMetadata
writtenMetadata) {
- if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new
OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(),
-
ledgerId) {
- @Override
- public void safeOperationComplete(int rc,
LedgerMetadata newMeta) {
- if (rc != BKException.Code.OK) {
- cb.operationComplete(rc, null);
- } else {
- LedgerHandle.this.metadata = newMeta;
- recover(cb, listener, forceRecovery);
- }
- }
-
- @Override
- public String toString() {
- return
String.format("ReReadMetadataForRecover(%d)", ledgerId);
- }
- });
- } else if (rc == BKException.Code.OK) {
- // we only could issue recovery operation after we
successfully update the ledger state to
- // in recovery otherwise, it couldn't prevent us advancing
last confirmed while the other writer is
- // closing the ledger, which will cause inconsistent last
add confirmed on bookies & zookeeper
- // metadata.
- new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
- .setEntryListener(listener)
- .initiate();
- } else {
- LOG.error("Error writing ledger {} config: {}", ledgerId,
BKException.codeLogger(rc));
- cb.operationComplete(rc, null);
- }
- }
-
- @Override
- public String toString() {
- return String.format("WriteLedgerConfigForRecover(%d)",
ledgerId);
- }
- });
- }
-
static class NoopCloseCallback implements CloseCallback {
static NoopCloseCallback instance = new NoopCloseCallback();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 4f58aa8..ffb0d6a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -135,6 +135,7 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
int ackQuorumSize,
LedgerMetadataFormat.State state,
java.util.Optional<Long> lastEntryId,
+ java.util.Optional<Long> length,
Map<Long, List<BookieSocketAddress>> ensembles,
DigestType digestType,
java.util.Optional<byte[]> password,
@@ -148,7 +149,7 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
this.ackQuorumSize = ackQuorumSize;
this.state = state;
lastEntryId.ifPresent((eid) -> this.lastEntryId = eid);
-
+ length.ifPresent((l) -> this.length = l);
setEnsembles(ensembles);
if (state != LedgerMetadataFormat.State.CLOSED) {
currentEnsemble = this.ensembles.lastEntry().getValue();
@@ -787,4 +788,11 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
return bookies;
}
+ java.util.Optional<Long> getLastEnsembleKey() {
+ if (ensembles.size() > 0) {
+ return java.util.Optional.of(ensembles.lastKey());
+ } else {
+ return java.util.Optional.empty();
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 8c37bd5..a9d83b0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,7 @@ class LedgerMetadataBuilder {
private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
private Optional<Long> lastEntryId = Optional.empty();
+ private Optional<Long> length = Optional.empty();
private TreeMap<Long, List<BookieSocketAddress>> ensembles = new
TreeMap<>();
@@ -67,6 +69,10 @@ class LedgerMetadataBuilder {
if (lastEntryId != LedgerHandle.INVALID_ENTRY_ID) {
builder.lastEntryId = Optional.of(lastEntryId);
}
+ long length = other.getLength();
+ if (length > 0) {
+ builder.length = Optional.of(length);
+ }
builder.ensembles.putAll(other.getEnsembles());
@@ -85,6 +91,11 @@ class LedgerMetadataBuilder {
return builder;
}
+ LedgerMetadataBuilder withPassword(byte[] password) {
+ this.password = Optional.of(Arrays.copyOf(password, password.length));
+ return this;
+ }
+
LedgerMetadataBuilder withEnsembleSize(int ensembleSize) {
checkState(ensembles.size() == 0, "Can only set ensemble size before
adding ensembles to the builder");
this.ensembleSize = ensembleSize;
@@ -109,17 +120,21 @@ class LedgerMetadataBuilder {
return this;
}
+ LedgerMetadataBuilder withInRecoveryState() {
+ this.state = LedgerMetadataFormat.State.IN_RECOVERY;
+ return this;
+ }
-
- LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
+ LedgerMetadataBuilder closingAt(long lastEntryId, long length) {
this.lastEntryId = Optional.of(lastEntryId);
+ this.length = Optional.of(length);
this.state = LedgerMetadataFormat.State.CLOSED;
return this;
}
LedgerMetadata build() {
return new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize,
- state, lastEntryId, ensembles,
+ state, lastEntryId, length, ensembles,
digestType, password, ctime, customMetadata,
version);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index cf5f3a7..da82531 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -53,7 +53,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata>
{
final long ledgerId;
final OpenCallback cb;
final Object ctx;
- LedgerHandle lh;
+ ReadOnlyLedgerHandle lh;
final byte[] passwd;
boolean doRecovery = true;
boolean administrativeOpen = false;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 6ab25d6..923c4df 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -18,11 +18,10 @@
package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
import org.slf4j.Logger;
@@ -32,9 +31,7 @@ import org.slf4j.LoggerFactory;
* This class encapsulated the ledger recovery operation. It first does a read
* with entry-id of -1 (BookieProtocol.LAST_ADD_CONFIRMED) to all bookies. Then
* starting from the last confirmed entry (from hints in the ledger entries),
- * it reads forward until it is not able to find a particular entry. It closes
- * the ledger at that entry.
- *
+ * it reads forward until it is not able to find a particular entry.
*/
class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
@@ -42,13 +39,13 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
final LedgerHandle lh;
final ClientContext clientCtx;
+ final CompletableFuture<LedgerHandle> promise;
final AtomicLong readCount, writeCount;
volatile boolean readDone;
- final AtomicBoolean callbackDone;
volatile long startEntryToRead;
volatile long endEntryToRead;
- final GenericCallback<Void> cb;
+
// keep a copy of metadata for recovery.
LedgerMetadata metadataForRecovery;
@@ -72,13 +69,11 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
}
- public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx,
- GenericCallback<Void> cb) {
+ public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx) {
readCount = new AtomicLong(0);
writeCount = new AtomicLong(0);
readDone = false;
- callbackDone = new AtomicBoolean(false);
- this.cb = cb;
+ this.promise = new CompletableFuture<>();
this.lh = lh;
this.clientCtx = clientCtx;
}
@@ -96,7 +91,7 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
return this;
}
- public void initiate() {
+ public CompletableFuture<LedgerHandle> initiate() {
ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
clientCtx.getBookieClient(), lh.getCurrentEnsemble(),
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
public void readLastConfirmedDataComplete(int rc,
RecoveryData data) {
@@ -125,24 +120,27 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
* from writing to it.
*/
rlcop.initiateWithFencing();
+
+ return promise;
}
private void submitCallback(int rc) {
if (BKException.Code.OK == rc) {
clientCtx.getClientStats().getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
clientCtx.getClientStats().getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
+ promise.complete(lh);
} else {
clientCtx.getClientStats().getRecoverAddCountLogger().registerFailedValue(writeCount.get());
clientCtx.getClientStats().getRecoverReadCountLogger().registerFailedValue(readCount.get());
+ promise.completeExceptionally(BKException.create(rc));
}
- cb.operationComplete(rc, null);
}
/**
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
- if (!callbackDone.get()) {
+ if (!promise.isDone()) {
startEntryToRead = endEntryToRead + 1;
endEntryToRead = endEntryToRead +
clientCtx.getConf().recoveryReadBatchSize;
new RecoveryReadOp(lh, clientCtx, startEntryToRead,
endEntryToRead, this, null)
@@ -150,26 +148,6 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
}
}
- private void closeAndCallback() {
- if (callbackDone.compareAndSet(false, true)) {
- lh.asyncCloseInternal(new CloseCallback() {
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx)
{
- if (rc != BKException.Code.OK) {
- LOG.warn("Close ledger {} failed during recovery: ",
- LedgerRecoveryOp.this.lh.getId(),
BKException.getMessage(rc));
- submitCallback(rc);
- } else {
- submitCallback(BKException.Code.OK);
- if (LOG.isDebugEnabled()) {
- LOG.debug("After closing length is: {}",
lh.getLength());
- }
- }
- }
- }, null, BKException.Code.LedgerClosedException);
- }
- }
-
@Override
public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry,
Object ctx) {
// notify entry listener on individual entries being read during
ledger recovery.
@@ -179,7 +157,7 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
}
// we only trigger recovery add an entry when readDone == false &&
callbackDone == false
- if (!callbackDone.get() && !readDone && rc == BKException.Code.OK) {
+ if (!promise.isDone() && !readDone && rc == BKException.Code.OK) {
readCount.incrementAndGet();
byte[] data = entry.getEntry();
@@ -211,15 +189,15 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
if (rc == BKException.Code.NoSuchEntryException || rc ==
BKException.Code.NoSuchLedgerExistsException) {
readDone = true;
if (readCount.get() == writeCount.get()) {
- closeAndCallback();
+ submitCallback(BKException.Code.OK);
}
return;
}
// otherwise, some other error, we can't handle
- if (BKException.Code.OK != rc && callbackDone.compareAndSet(false,
true)) {
+ if (BKException.Code.OK != rc && !promise.isDone()) {
LOG.error("Failure {} while reading entries: ({} - {}), ledger: {}
while recovering ledger",
- BKException.getMessage(rc), startEntryToRead,
endEntryToRead, lh.getId());
+ BKException.getMessage(rc), startEntryToRead,
endEntryToRead, lh.getId());
submitCallback(rc);
} else if (BKException.Code.OK == rc) {
// we are here is because we successfully read an entry but
readDone was already set to true.
@@ -235,15 +213,12 @@ class LedgerRecoveryOp implements ReadEntryListener,
AddCallback {
if (rc != BKException.Code.OK) {
LOG.error("Failure {} while writing entry: {} while recovering
ledger: {}",
BKException.codeLogger(rc), entryId + 1, lh.ledgerId);
- if (callbackDone.compareAndSet(false, true)) {
- // Give up, we can't recover from this error
- submitCallback(rc);
- }
+ submitCallback(rc);
return;
}
long numAdd = writeCount.incrementAndGet();
if (readDone && readCount.get() == numAdd) {
- closeAndCallback();
+ submitCallback(rc);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index aa0290b..e2c9a44 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -20,8 +20,21 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -30,10 +43,16 @@ import
org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Read only ledger handle. This ledger handle allows you to
* read from a ledger but not to write to it. It overrides all
@@ -41,6 +60,10 @@ import org.apache.bookkeeper.versioning.Version;
* It should be returned for BookKeeper#openLedger operations.
*/
class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListener {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadOnlyLedgerHandle.class);
+
+ private Object metadataLock = new Object();
+ private final NavigableMap<Long, List<BookieSocketAddress>>
newEnsemblesFromRecovery = new TreeMap<>();
class MetadataUpdater extends SafeRunnable {
@@ -128,29 +151,6 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
}
@Override
- void handleBookieFailure(final Map<Integer, BookieSocketAddress>
failedBookies) {
- blockAddCompletions.incrementAndGet();
- synchronized (getLedgerMetadata()) {
- try {
- EnsembleInfo ensembleInfo =
replaceBookieInMetadata(failedBookies,
- numEnsembleChanges.incrementAndGet());
- if (ensembleInfo.replacedBookies.isEmpty()) {
- blockAddCompletions.decrementAndGet();
- return;
- }
- blockAddCompletions.decrementAndGet();
- // the failed bookie has been replaced
- unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble,
ensembleInfo.replacedBookies);
- } catch (BKException.BKNotEnoughBookiesException e) {
- LOG.error("Could not get additional bookie to "
- + "remake ensemble, closing ledger: " + ledgerId);
- handleUnrecoverableErrorDuringAdd(e.getCode());
- return;
- }
- }
- }
-
- @Override
public void onChanged(long lid, LedgerMetadata newMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ledger metadata update on {} : {}", lid,
newMetadata);
@@ -208,4 +208,212 @@ class ReadOnlyLedgerHandle extends LedgerHandle
implements LedgerMetadataListene
}
}, ctx);
}
+
+ List<BookieSocketAddress> replaceBookiesInEnsemble(LedgerMetadata metadata,
+
List<BookieSocketAddress> oldEnsemble,
+ Map<Integer,
BookieSocketAddress> failedBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ List<BookieSocketAddress> newEnsemble = new ArrayList<>(oldEnsemble);
+
+ int ensembleSize = metadata.getEnsembleSize();
+ int writeQ = metadata.getWriteQuorumSize();
+ int ackQ = metadata.getAckQuorumSize();
+ Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+
+ Set<BookieSocketAddress> exclude = new
HashSet<>(failedBookies.values());
+
+ int replaced = 0;
+ for (Map.Entry<Integer, BookieSocketAddress> entry :
failedBookies.entrySet()) {
+ int idx = entry.getKey();
+ BookieSocketAddress addr = entry.getValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[EnsembleChange-L{}] replacing bookie: {} index:
{}", getId(), addr, idx);
+ }
+
+ if (!newEnsemble.get(idx).equals(addr)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[EnsembleChange-L{}] Not changing failed bookie
{} at index {}, already changed to {}",
+ getId(), addr, idx, newEnsemble.get(idx));
+ }
+ continue;
+ }
+ try {
+ BookieSocketAddress newBookie =
clientCtx.getBookieWatcher().replaceBookie(
+ ensembleSize, writeQ, ackQ, customMetadata,
newEnsemble, idx, exclude);
+ newEnsemble.set(idx, newBookie);
+
+ replaced++;
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ // if there is no bookie replaced, we throw not enough bookie
exception
+ if (replaced <= 0) {
+ throw e;
+ } else {
+ break;
+ }
+ }
+ }
+ return newEnsemble;
+ }
+
+ private static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
+ List<BookieSocketAddress> e2) {
+ checkArgument(e1.size() == e2.size(), "Ensembles must be of same
size");
+ Set<Integer> diff = new HashSet<>();
+ for (int i = 0; i < e1.size(); i++) {
+ if (!e1.get(i).equals(e2.get(i))) {
+ diff.add(i);
+ }
+ }
+ return diff;
+ }
+
+ /**
+ * For a read only ledger handle, this method will only ever be called
during recovery,
+ * when we are reading forward from LAC and writing back those entries. As
such,
+ * unlike with LedgerHandle, we do not want to persist changes to the
metadata as they occur,
+ * but rather, we want to defer the persistence until recovery has
completed, and do it all
+ * on the close.
+ */
+ @Override
+ void handleBookieFailure(final Map<Integer, BookieSocketAddress>
failedBookies) {
+ blockAddCompletions.incrementAndGet();
+
+ // handleBookieFailure should always run in the ordered executor
thread for this
+ // ledger, so this synchronized should be unnecessary, but putting it
here now
+ // just in case (can be removed when we validate threads)
+ synchronized (metadataLock) {
+ long lac = getLastAddConfirmed();
+ LedgerMetadata metadata = getLedgerMetadata();
+ List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
+ try {
+ List<BookieSocketAddress> newEnsemble =
replaceBookiesInEnsemble(metadata, currentEnsemble,
+
failedBookies);
+
+ Set<Integer> replaced = diffEnsemble(currentEnsemble,
newEnsemble);
+ blockAddCompletions.decrementAndGet();
+ if (!replaced.isEmpty()) {
+ newEnsemblesFromRecovery.put(lac + 1, newEnsemble);
+ unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
+ }
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ LOG.error("Could not get additional bookie to remake ensemble,
closing ledger: {}", ledgerId);
+
+ handleUnrecoverableErrorDuringAdd(e.getCode());
+ return;
+ }
+ }
+ }
+
+ @Override
+ void handleUnrecoverableErrorDuringAdd(int rc) {
+ errorOutPendingAdds(rc);
+ }
+
+ void recover(GenericCallback<Void> finalCb) {
+ recover(finalCb, null, false);
+ }
+
+ /**
+ * Recover the ledger.
+ *
+ * @param finalCb
+ * callback after recovery is done.
+ * @param listener
+ * read entry listener on recovery reads.
+ * @param forceRecovery
+ * force the recovery procedure even the ledger metadata shows
the ledger is closed.
+ */
+ void recover(GenericCallback<Void> finalCb,
+ final @VisibleForTesting ReadEntryListener listener,
+ final boolean forceRecovery) {
+ final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
+ finalCb,
+ BKException.Code.OK,
+ clientCtx.getClientStats().getRecoverOpLogger());
+
+ MetadataUpdateLoop.NeedsUpdatePredicate needsUpdate =
+ (metadata) -> !(metadata.isClosed() || metadata.isInRecovery());
+ if (forceRecovery) {
+ // in the force recovery case, we want to update the metadata
+ // to IN_RECOVERY, even if the ledger is already closed
+ needsUpdate = (metadata) -> !metadata.isInRecovery();
+ }
+ new MetadataUpdateLoop(
+ clientCtx.getLedgerManager(), getId(),
+ this::getLedgerMetadata,
+ needsUpdate,
+ (metadata) ->
LedgerMetadataBuilder.from(metadata).withInRecoveryState().build(),
+ this::setLedgerMetadata)
+ .run()
+ .thenCompose((metadata) -> {
+ if (metadata.isClosed()) {
+ return
CompletableFuture.completedFuture(ReadOnlyLedgerHandle.this);
+ } else {
+ return new LedgerRecoveryOp(ReadOnlyLedgerHandle.this,
clientCtx)
+ .setEntryListener(listener)
+ .initiate();
+ }
+ })
+ .thenCompose((ignore) -> closeRecovered())
+ .whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ cb.operationComplete(
+ BKException.getExceptionCode(ex,
BKException.Code.UnexpectedConditionException), null);
+ } else {
+ cb.operationComplete(BKException.Code.OK, null);
+ }
+ });
+ }
+
+ CompletableFuture<LedgerMetadata> closeRecovered() {
+ long lac, len;
+ synchronized (this) {
+ lac = lastAddConfirmed;
+ len = length;
+ }
+ LOG.info("Closing recovered ledger {} at entry {}", getId(), lac);
+ CompletableFuture<LedgerMetadata> f = new MetadataUpdateLoop(
+ clientCtx.getLedgerManager(), getId(),
+ this::getLedgerMetadata,
+ (metadata) -> metadata.isInRecovery(),
+ (metadata) -> {
+ LedgerMetadataBuilder builder =
LedgerMetadataBuilder.from(metadata);
+ Optional<Long> lastEnsembleKey =
metadata.getLastEnsembleKey();
+ checkState(lastEnsembleKey.isPresent(),
+ "Metadata shouldn't have been created without
at least one ensemble");
+ synchronized (metadataLock) {
+ newEnsemblesFromRecovery.entrySet().forEach(
+ (e) -> {
+ checkState(e.getKey() >=
lastEnsembleKey.get(),
+ "Once a ledger is in recovery,
noone can add ensembles without closing");
+ // Occurs when a bookie need to be
replaced at very start of recovery
+ if
(lastEnsembleKey.get().equals(e.getKey())) {
+
builder.replaceEnsembleEntry(e.getKey(), e.getValue());
+ } else {
+ builder.newEnsembleEntry(e.getKey(),
e.getValue());
+ }
+ });
+ }
+ return builder.closingAt(lac, len).build();
+ },
+ this::setLedgerMetadata).run();
+ f.thenRun(() -> {
+ synchronized (metadataLock) {
+ newEnsemblesFromRecovery.clear();
+ }
+ });
+ return f;
+ }
+
+ @Override
+ List<BookieSocketAddress> getCurrentEnsemble() {
+ synchronized (metadataLock) {
+ if (!newEnsemblesFromRecovery.isEmpty()) {
+ return newEnsemblesFromRecovery.lastEntry().getValue();
+ } else {
+ return super.getCurrentEnsemble();
+ }
+ }
+ }
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
new file mode 100644
index 0000000..2692df8
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BKExceptionTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for extracting codes from BKException.
+ */
+public class BKExceptionTest {
+ @Test
+ public void testBKExceptionCode() {
+ Assert.assertEquals(BKException.Code.WriteException,
+ BKException.getExceptionCode(new
BKException.BKWriteException(),
+
BKException.Code.ReadException));
+ }
+
+ @Test
+ public void testNonBKExceptionCode() {
+ Assert.assertEquals(BKException.Code.ReadException,
+ BKException.getExceptionCode(new Exception(),
+
BKException.Code.ReadException));
+
+ }
+
+ @Test
+ public void testNestedBKExceptionCode() {
+ Assert.assertEquals(BKException.Code.WriteException,
+ BKException.getExceptionCode(
+ new ExecutionException("test", new
BKException.BKWriteException()),
+ BKException.Code.ReadException));
+ }
+
+ @Test
+ public void testDoubleNestedBKExceptionCode() {
+ Assert.assertEquals(BKException.Code.WriteException,
+ BKException.getExceptionCode(
+ new ExecutionException("test",
+ new CompletionException("blah",
+ new
BKException.BKWriteException())),
+ BKException.Code.ReadException));
+
+ }
+}
+
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
new file mode 100644
index 0000000..9a7cc7f
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ledger recovery tests using mocks rather than a real cluster.
+ */
+public class LedgerRecovery2Test {
+ private static final Logger log =
LoggerFactory.getLogger(LedgerRecovery2Test.class);
+
+ private static final byte[] PASSWD = "foobar".getBytes();
+ private static final BookieSocketAddress b1 = new
BookieSocketAddress("b1", 3181);
+ private static final BookieSocketAddress b2 = new
BookieSocketAddress("b2", 3181);
+ private static final BookieSocketAddress b3 = new
BookieSocketAddress("b3", 3181);
+ private static final BookieSocketAddress b4 = new
BookieSocketAddress("b4", 3181);
+ private static final BookieSocketAddress b5 = new
BookieSocketAddress("b5", 3181);
+
+ private static LedgerMetadata setupLedger(ClientContext clientCtx, long
ledgerId,
+ List<BookieSocketAddress>
bookies) throws Exception {
+ LedgerMetadata md = LedgerMetadataBuilder.create()
+ .withPassword(PASSWD)
+ .newEnsembleEntry(0, bookies).build();
+ GenericCallbackFuture<LedgerMetadata> mdPromise = new
GenericCallbackFuture<>();
+ clientCtx.getLedgerManager().createLedgerMetadata(1L, md, mdPromise);
+ return mdPromise.get();
+ }
+
+ @Test
+ public void testCantRecoverAllDown() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1L, Lists.newArrayList(b1,
b2, b3));
+
+ clientCtx.getMockBookieClient().errorBookies(b1, b2, b3);
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+ try {
+ GenericCallbackFuture<Void> promise = new
GenericCallbackFuture<>();
+ lh.recover(promise, null, false);
+ promise.get();
+ Assert.fail("Recovery shouldn't have been able to complete");
+ } catch (ExecutionException ee) {
+ Assert.assertEquals(BKException.BKReadException.class,
ee.getCause().getClass());
+ }
+ }
+
+ @Test
+ public void testCanReadLacButCantWrite() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> FutureUtils.exception(new
BKException.BKWriteException()));
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+ try {
+ GenericCallbackFuture<Void> promise = new
GenericCallbackFuture<>();
+ lh.recover(promise, null, false);
+ promise.get();
+ Assert.fail("Recovery shouldn't have been able to complete");
+ } catch (ExecutionException ee) {
+ Assert.assertEquals(BKException.BKNotEnoughBookiesException.class,
ee.getCause().getClass());
+ }
+ }
+
+ @Test
+ public void testMetadataClosedDuringRecovery() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+
+ CompletableFuture<Void> writingBack = new CompletableFuture<>();
+ CompletableFuture<Void> blocker = new CompletableFuture<>();
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ // will block recovery at the writeback phase
+ clientCtx.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> {
+ writingBack.complete(null);
+ return blocker;
+ });
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+
+ GenericCallbackFuture<Void> recoveryPromise = new
GenericCallbackFuture<>();
+ lh.recover(recoveryPromise, null, false);
+
+ writingBack.get(10, TimeUnit.SECONDS);
+
+ GenericCallbackFuture<LedgerMetadata> readPromise = new
GenericCallbackFuture<>();
+ clientCtx.getLedgerManager().readLedgerMetadata(1L, readPromise);
+ LedgerMetadataBuilder builder =
LedgerMetadataBuilder.from(readPromise.get());
+ GenericCallbackFuture<LedgerMetadata> writePromise = new
GenericCallbackFuture<>();
+ clientCtx.getLedgerManager().writeLedgerMetadata(1L,
builder.closingAt(-1, 0).build(), writePromise);
+ writePromise.get();
+
+ // allow recovery to continue
+ blocker.complete(null);
+
+ recoveryPromise.get();
+
+ Assert.assertEquals(lh.getLastAddConfirmed(), -1);
+ Assert.assertEquals(lh.getLength(), 0);
+ }
+
+ @Test
+ public void testNewEnsembleAddedDuringRecovery() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+ clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+
+ CompletableFuture<Void> writingBack = new CompletableFuture<>();
+ CompletableFuture<Void> blocker = new CompletableFuture<>();
+ CompletableFuture<Void> failing = new CompletableFuture<>();
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ // will block recovery at the writeback phase
+ clientCtx.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> {
+ writingBack.complete(null);
+ if (bookie.equals(b3)) {
+ return failing;
+ } else {
+ return blocker;
+ }
+ });
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+
+ GenericCallbackFuture<Void> recoveryPromise = new
GenericCallbackFuture<>();
+ lh.recover(recoveryPromise, null, false);
+
+ writingBack.get(10, TimeUnit.SECONDS);
+
+ GenericCallbackFuture<LedgerMetadata> readPromise = new
GenericCallbackFuture<>();
+ clientCtx.getLedgerManager().readLedgerMetadata(1L, readPromise);
+ LedgerMetadata newMeta = LedgerMetadataBuilder.from(readPromise.get())
+ .newEnsembleEntry(1L, Lists.newArrayList(b1, b2, b4)).build();
+ GenericCallbackFuture<LedgerMetadata> writePromise = new
GenericCallbackFuture<>();
+ clientCtx.getLedgerManager().writeLedgerMetadata(1L, newMeta,
writePromise);
+ writePromise.get();
+
+ // allow recovery to continue
+ failing.completeExceptionally(new BKException.BKWriteException());
+ blocker.complete(null);
+
+ try {
+ recoveryPromise.get();
+ Assert.fail("Should fail on the update");
+ } catch (ExecutionException ee) {
+
Assert.assertEquals(BKException.BKUnexpectedConditionException.class,
ee.getCause().getClass());
+ }
+ }
+
+ @Test
+ public void testRecoveryBookieFailedAtStart() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+ clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+
+ CompletableFuture<Void> writingBack = new CompletableFuture<>();
+ CompletableFuture<Void> blocker = new CompletableFuture<>();
+ CompletableFuture<Void> failing = new CompletableFuture<>();
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().errorBookies(b2);
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+
+ GenericCallbackFuture<Void> recoveryPromise = new
GenericCallbackFuture<>();
+ lh.recover(recoveryPromise, null, false);
+ recoveryPromise.get();
+
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 1);
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(0L),
+ Lists.newArrayList(b1, b4, b3));
+ }
+
+ @Test
+ public void testRecoveryOneBookieFailsDuring() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+ clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().seedEntries(b3, 1L, 1L, -1L);
+ clientCtx.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> {
+ if (bookie.equals(b2) && entryId == 1L) {
+ return FutureUtils.exception(new
BKException.BKWriteException());
+ } else {
+ return FutureUtils.value(null);
+ }
+ });
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+
+ GenericCallbackFuture<Void> recoveryPromise = new
GenericCallbackFuture<>();
+ lh.recover(recoveryPromise, null, false);
+ recoveryPromise.get();
+
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 2);
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(0L),
+ Lists.newArrayList(b1, b2, b3));
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().get(1L),
+ Lists.newArrayList(b1, b4, b3));
+ Assert.assertEquals(lh.getLastAddConfirmed(), 1L);
+ }
+
+ @Test
+ public void testRecoveryTwoBookiesFailOnSameEntry() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+ clientCtx.getMockRegistrationClient().addBookies(b4, b5).get();
+
+ LedgerMetadata md = setupLedger(clientCtx, 1, Lists.newArrayList(b1,
b2, b3));
+ clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> {
+ if (bookie.equals(b1) || bookie.equals(b2)) {
+ return FutureUtils.exception(new
BKException.BKWriteException());
+ } else {
+ return FutureUtils.value(null);
+ }
+ });
+
+ ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+ clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD,
false);
+
+ GenericCallbackFuture<Void> recoveryPromise = new
GenericCallbackFuture<>();
+ lh.recover(recoveryPromise, null, false);
+ recoveryPromise.get();
+
+ Assert.assertEquals(lh.getLedgerMetadata().getEnsembles().size(), 1);
+
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b3));
+
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b4));
+
Assert.assertTrue(lh.getLedgerMetadata().getEnsembles().get(0L).contains(b5));
+ Assert.assertEquals(lh.getLastAddConfirmed(), 0L);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index e126001..6a21da2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -29,9 +29,9 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Enumeration;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
@@ -357,7 +356,6 @@ public class LedgerRecoveryTest extends
BookKeeperClusterTestCase {
// start a new good server
startNewBookie();
-
LedgerHandle lhafter = bkc.openLedger(lhbefore.getId(), digestType,
"".getBytes());
assertEquals("Fenced ledger should have correct lastAddConfirmed",
@@ -479,24 +477,13 @@ public class LedgerRecoveryTest extends
BookKeeperClusterTestCase {
LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(),
digestType, "".getBytes());
assertEquals(BookieProtocol.INVALID_ENTRY_ID,
recoverLh.getLastAddConfirmed());
- final CountDownLatch recoverLatch = new CountDownLatch(1);
- final AtomicBoolean success = new AtomicBoolean(false);
-
MockClientContext parallelReadCtx =
MockClientContext.copyOf(bkc.getClientCtx())
.setConf(ClientInternalConf.fromConfig(newConf.setEnableParallelRecoveryRead(true)));
- LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(
- recoverLh, parallelReadCtx,
- new BookkeeperInternalCallbacks.GenericCallback<Void>() {
- @Override
- public void operationComplete(int rc, Void result) {
- success.set(BKException.Code.OK == rc);
- recoverLatch.countDown();
- }
- });
- recoveryOp.initiate();
- recoverLatch.await(10, TimeUnit.SECONDS);
- assertTrue(success.get());
+ LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh,
parallelReadCtx);
+ CompletableFuture<LedgerHandle> f = recoveryOp.initiate();
+ f.get(10, TimeUnit.SECONDS);
+
assertEquals(numEntries, recoveryOp.readCount.get());
assertEquals(numEntries, recoveryOp.writeCount.get());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index 5ed75ce..e70a7d3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -361,7 +361,7 @@ public class MetadataUpdateLoopTest {
ledgerId,
reference::get,
(currentMetadata) -> !currentMetadata.isClosed(),
- (currentMetadata) ->
LedgerMetadataBuilder.from(currentMetadata).closingAtEntry(10L).build(),
+ (currentMetadata) ->
LedgerMetadataBuilder.from(currentMetadata).closingAt(10L, 100L).build(),
reference::compareAndSet).run();
CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
lm,
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 783e435..c290a7e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -430,16 +430,17 @@ public abstract class MockBookKeeperTestCase {
@SuppressWarnings("unchecked")
private void setupWriteLedgerMetadata() {
doAnswer(invocation -> {
- Object[] args = invocation.getArguments();
- Long ledgerId = (Long) args[0];
- LedgerMetadata metadata = (LedgerMetadata) args[1];
- BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
- executor.executeOrdered(ledgerId, () -> {
- mockLedgerMetadataRegistry.put(ledgerId, new
LedgerMetadata(metadata));
- cb.operationComplete(BKException.Code.OK, null);
- });
- return null;
- }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any());
+ Object[] args = invocation.getArguments();
+ Long ledgerId = (Long) args[0];
+ LedgerMetadata metadata = (LedgerMetadata) args[1];
+ BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+ executor.executeOrdered(ledgerId, () -> {
+ LedgerMetadata newMetadata =
LedgerMetadataBuilder.from(metadata).build();
+ mockLedgerMetadataRegistry.put(ledgerId, newMetadata);
+ cb.operationComplete(BKException.Code.OK, newMetadata);
+ });
+ return null;
+ }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(),
any());
}
@SuppressWarnings("unchecked")
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index 2f5b2dc..f36c008 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -51,16 +51,19 @@ public class MockClientContext implements ClientContext {
private BooleanSupplier isClientClosed;
private MockRegistrationClient regClient;
- static MockClientContext create() {
+ static MockClientContext create() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
OrderedScheduler scheduler =
OrderedScheduler.newSchedulerBuilder().name("mock-executor").numThreads(1).build();
MockRegistrationClient regClient = new MockRegistrationClient();
EnsemblePlacementPolicy placementPolicy = new
DefaultEnsemblePlacementPolicy();
+ BookieWatcherImpl bookieWatcherImpl = new BookieWatcherImpl(conf,
placementPolicy,
+ regClient,
NullStatsLogger.INSTANCE);
+ bookieWatcherImpl.initialBlockingBookieRead();
return new MockClientContext()
.setConf(ClientInternalConf.fromConfig(conf))
.setLedgerManager(new MockLedgerManager())
- .setBookieWatcher(new BookieWatcherImpl(conf, placementPolicy,
regClient, NullStatsLogger.INSTANCE))
+ .setBookieWatcher(bookieWatcherImpl)
.setPlacementPolicy(placementPolicy)
.setRegistrationClient(regClient)
.setBookieClient(new MockBookieClient(scheduler))
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 8b5f5a6..9bdbfac 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -342,7 +342,7 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean(false);
- recoverLh.recover(new GenericCallback<Void>() {
+ ((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>()
{
@Override
public void operationComplete(int rc, Void result) {
LOG.info("Recovering ledger {} completed : {}.", lh.getId(),
rc);
@@ -459,7 +459,7 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
final AtomicBoolean isMetadataClosed = new AtomicBoolean(false);
final AtomicInteger numSuccessCalls = new AtomicInteger(0);
final AtomicInteger numFailureCalls = new AtomicInteger(0);
- recoverLh.recover(new GenericCallback<Void>() {
+ ((ReadOnlyLedgerHandle) recoverLh).recover(new GenericCallback<Void>()
{
@Override
public void operationComplete(int rc, Void result) {
if (BKException.Code.OK == rc) {
@@ -639,7 +639,7 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
tlm1.setLatch(metadataLatch);
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicBoolean recoverSuccess = new AtomicBoolean(false);
- lh1.recover(new GenericCallback<Void>() {
+ ((ReadOnlyLedgerHandle) lh1).recover(new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
LOG.info("Recovering ledger {} completed : {}", lh1.getId(),
rc);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
index 40178b9..8ce3686 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
@@ -55,7 +55,7 @@ public class MockRegistrationClient implements
RegistrationClient {
return new Versioned<>(Collections.unmodifiableSet(bookies), new
LongVersion(version));
}
- CompletableFuture<Void> addBookies(BookieSocketAddress... bookies) {
+ public CompletableFuture<Void> addBookies(BookieSocketAddress... bookies) {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
currentVersion++;
@@ -68,7 +68,7 @@ public class MockRegistrationClient implements
RegistrationClient {
return promise;
}
- CompletableFuture<Void> removeBookies(BookieSocketAddress... bookies) {
+ public CompletableFuture<Void> removeBookies(BookieSocketAddress...
bookies) {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
currentVersion++;
@@ -81,7 +81,7 @@ public class MockRegistrationClient implements
RegistrationClient {
return promise;
}
- CompletableFuture<Void> addReadOnlyBookies(BookieSocketAddress... bookies)
{
+ public CompletableFuture<Void> addReadOnlyBookies(BookieSocketAddress...
bookies) {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
currentVersion++;
@@ -94,7 +94,7 @@ public class MockRegistrationClient implements
RegistrationClient {
return promise;
}
- CompletableFuture<Void> removeReadOnlyBookies(BookieSocketAddress...
bookies) {
+ public CompletableFuture<Void>
removeReadOnlyBookies(BookieSocketAddress... bookies) {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
currentVersion++;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 79cc5ba..04119d5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -25,19 +25,19 @@ import static
org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
@@ -46,8 +46,9 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,24 +64,36 @@ public class MockBookieClient implements BookieClient {
final Set<BookieSocketAddress> errorBookies =
Collections.newSetFromMap(new ConcurrentHashMap<BookieSocketAddress,
Boolean>());
- final Map<BookieSocketAddress, Boolean> stalledBookies = new HashMap<>();
- final Map<BookieSocketAddress, List<Consumer<Integer>>> stalledRequests =
new HashMap<>();
+ /**
+ * Runs before or after an operation. Can stall the operation or error it.
+ */
+ public interface Hook {
+ CompletableFuture<Void> runHook(BookieSocketAddress bookie, long
ledgerId, long entryId);
+ }
+
+ private Hook preReadHook = (bookie, ledgerId, entryId) ->
FutureUtils.value(null);
+ private Hook postReadHook = (bookie, ledgerId, entryId) ->
FutureUtils.value(null);
+ private Hook preWriteHook = (bookie, ledgerId, entryId) ->
FutureUtils.value(null);
+ private Hook postWriteHook = (bookie, ledgerId, entryId) ->
FutureUtils.value(null);
public MockBookieClient(OrderedExecutor executor) {
this.executor = executor;
}
- public void stallBookie(BookieSocketAddress bookie) {
- synchronized (this) {
- stalledBookies.put(bookie, true);
- }
+ public void setPreReadHook(Hook hook) {
+ this.preReadHook = hook;
}
- public void releaseStalledBookie(BookieSocketAddress bookie, int rc) {
- synchronized (this) {
- stalledBookies.remove(bookie);
- stalledRequests.remove(bookie).forEach((r) -> r.accept(rc));
- }
+ public void setPostReadHook(Hook hook) {
+ this.postReadHook = hook;
+ }
+
+ public void setPreWriteHook(Hook hook) {
+ this.preWriteHook = hook;
+ }
+
+ public void setPostWriteHook(Hook hook) {
+ this.postWriteHook = hook;
}
public void errorBookies(BookieSocketAddress... bookies) {
@@ -95,6 +108,15 @@ public class MockBookieClient implements BookieClient {
}
}
+ public void seedEntries(BookieSocketAddress bookie, long ledgerId, long
entryId, long lac) throws Exception {
+ DigestManager digestManager = DigestManager.instantiate(ledgerId, new
byte[0], DigestType.CRC32C);
+ ByteBuf entry =
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
+ entryId, lac, 0,
Unpooled.buffer(10)));
+
+ LedgerData ledger = getBookieData(bookie).computeIfAbsent(ledgerId,
LedgerData::new);
+ ledger.addEntry(entryId, entry);
+ }
+
@Override
public List<BookieSocketAddress> getFaultyBookies() {
return Collections.emptyList();
@@ -134,41 +156,29 @@ public class MockBookieClient implements BookieClient {
public void addEntry(BookieSocketAddress addr, long ledgerId, byte[]
masterKey,
long entryId, ByteBufList toSend, WriteCallback cb,
Object ctx,
int options, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
- SafeRunnable write = safeRun(() -> {
- LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
- if (errorBookies.contains(addr)) {
- LOG.warn("[{};L{}] erroring write {}", addr, ledgerId,
entryId);
- cb.writeComplete(BKException.Code.WriteException,
ledgerId, entryId, addr, ctx);
- return;
- }
- LedgerData ledger =
getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
- ledger.addEntry(entryId, copyData(toSend));
- cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr,
ctx);
- toSend.release();
- });
-
toSend.retain();
- synchronized (this) {
- if (stalledBookies.getOrDefault(addr, false)) {
- LOG.info("[{};{};{}] Stalling write {}", addr, ledgerId,
System.identityHashCode(write), entryId);
- stalledRequests.computeIfAbsent(addr, (key) -> new
ArrayList<>())
- .add((rc) -> {
- LOG.info("[{};{};{}] Unstalled write {}",
- addr, ledgerId,
System.identityHashCode(write), entryId);
- if (rc == BKException.Code.OK) {
- executor.executeOrdered(ledgerId, write);
- } else {
- executor.executeOrdered(
- ledgerId, safeRun(() -> {
- cb.writeComplete(rc, ledgerId,
entryId, addr, ctx);
- toSend.release();
- }));
- }
- });
- } else {
- executor.executeOrdered(ledgerId, write);
- }
- }
+ preWriteHook.runHook(addr, ledgerId, entryId)
+ .thenComposeAsync(
+ (ignore) -> {
+ LOG.info("[{};L{}] write entry {}", addr, ledgerId,
entryId);
+ if (errorBookies.contains(addr)) {
+ LOG.warn("[{};L{}] erroring write {}", addr, ledgerId,
entryId);
+ return FutureUtils.exception(new
BKException.BKWriteException());
+ }
+ LedgerData ledger =
getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
+ ledger.addEntry(entryId, copyData(toSend));
+ toSend.release();
+ return FutureUtils.value(null);
+ }, executor.chooseThread(ledgerId))
+ .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId,
entryId))
+ .whenCompleteAsync((res, ex) -> {
+ if (ex != null) {
+ cb.writeComplete(BKException.getExceptionCode(ex,
BKException.Code.WriteException),
+ ledgerId, entryId, addr, ctx);
+ } else {
+ cb.writeComplete(BKException.Code.OK, ledgerId,
entryId, addr, ctx);
+ }
+ }, executor.chooseThread(ledgerId));
}
@Override
@@ -184,34 +194,38 @@ public class MockBookieClient implements BookieClient {
public void readEntry(BookieSocketAddress addr, long ledgerId, long
entryId,
ReadEntryCallback cb, Object ctx, int flags, byte[]
masterKey,
boolean allowFastFail) {
- executor.executeOrdered(ledgerId,
- safeRun(() -> {
- LOG.info("[{};L{}] read entry {}", addr, ledgerId,
entryId);
- if (errorBookies.contains(addr)) {
- LOG.warn("[{};L{}] erroring read {}", addr,
ledgerId, entryId);
-
cb.readEntryComplete(BKException.Code.ReadException, ledgerId, entryId, null,
ctx);
- return;
- }
-
- LedgerData ledger = getBookieData(addr).get(ledgerId);
- if (ledger == null) {
- LOG.warn("[{};L{}] ledger not found", addr,
ledgerId);
-
cb.readEntryComplete(BKException.Code.NoSuchLedgerExistsException,
- ledgerId, entryId, null, ctx);
- return;
- }
-
- ByteBuf entry = ledger.getEntry(entryId);
- if (entry == null) {
- LOG.warn("[{};L{}] entry({}) not found", addr,
ledgerId, entryId);
-
cb.readEntryComplete(BKException.Code.NoSuchEntryException,
- ledgerId, entryId, null, ctx);
- return;
- }
-
+ preReadHook.runHook(addr, ledgerId, entryId)
+ .thenComposeAsync((res) -> {
+ LOG.info("[{};L{}] read entry {}", addr, ledgerId,
entryId);
+ if (errorBookies.contains(addr)) {
+ LOG.warn("[{};L{}] erroring read {}", addr, ledgerId,
entryId);
+ return FutureUtils.exception(new
BKException.BKReadException());
+ }
+
+ LedgerData ledger = getBookieData(addr).get(ledgerId);
+ if (ledger == null) {
+ LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
+ return FutureUtils.exception(new
BKException.BKNoSuchLedgerExistsException());
+ }
+
+ ByteBuf entry = ledger.getEntry(entryId);
+ if (entry == null) {
+ LOG.warn("[{};L{}] entry({}) not found", addr,
ledgerId, entryId);
+ return FutureUtils.exception(new
BKException.BKNoSuchEntryException());
+ }
+
+ return FutureUtils.value(entry);
+ }, executor.chooseThread(ledgerId))
+ .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId,
entryId).thenApply((res) -> buf))
+ .whenCompleteAsync((res, ex) -> {
+ if (ex != null) {
+ cb.readEntryComplete(BKException.getExceptionCode(ex,
BKException.Code.ReadException),
+ ledgerId, entryId, null, ctx);
+ } else {
cb.readEntryComplete(BKException.Code.OK,
- ledgerId, entryId, entry.slice(),
ctx);
- }));
+ ledgerId, entryId, res.slice(),
ctx);
+ }
+ }, executor.chooseThread(ledgerId));
}
@Override
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
index 8978ff3..3152a74 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java
@@ -17,6 +17,8 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -31,7 +33,9 @@ public class BookKeeperAccessor {
public static void forceRecoverLedger(LedgerHandle lh,
BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
- lh.recover(cb, null, true);
+ checkArgument(lh instanceof ReadOnlyLedgerHandle,
+ "Recovery can only run on ReadOnlyLedgerHandle");
+ ((ReadOnlyLedgerHandle) lh).recover(cb, null, true);
}
public static LedgerMetadata getLedgerMetadata(LedgerHandle lh) {