This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4c64b3c changingEnsemble should be negated before calling unset
success
4c64b3c is described below
commit 4c64b3c78d1120d3421a9f50abb9ec9ca97e8864
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Dec 6 14:20:22 2018 +0100
changingEnsemble should be negated before calling unset success
If the first pending add op is completed, but does not have the
replaced bookie in its write set, callbacks are triggered straight
away.
Previously this would then hang forever, as the changingEnsemble would
be true. This patch sets changingEnsemble to false before calling
unsetSuccessAndSendWriteRequest so that if callbacks are triggered
straight away, they can actually complete. It also moves the call to
unsetSuccessAndSendWriteRequest outside of the metadataLock so that
the callbacks don't run inside the lock.
Reviewers: Sijie Guo <[email protected]>
This closes #1857 from ivankelly/block-order
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 11 ++++--
.../bookkeeper/client/HandleFailuresTest.java | 46 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 075788b..6f0af27 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1904,6 +1904,8 @@ public class LedgerHandle implements WriteHandle {
LOG.debug("{}[attempt:{}] Success updating
metadata.", logContext, attempts.get());
}
+ List<BookieSocketAddress> newEnsemble = null;
+ Set<Integer> replaced = null;
synchronized (metadataLock) {
if (!delayedWriteFailedBookies.isEmpty()) {
Map<Integer, BookieSocketAddress> toReplace =
new HashMap<>(delayedWriteFailedBookies);
@@ -1911,13 +1913,16 @@ public class LedgerHandle implements WriteHandle {
ensembleChangeLoop(origEnsemble, toReplace);
} else {
- List<BookieSocketAddress> newEnsemble =
getCurrentEnsemble();
- Set<Integer> replaced =
EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
+ newEnsemble = getCurrentEnsemble();
+ replaced =
EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
LOG.info("New Ensemble: {} for ledger: {}",
newEnsemble, ledgerId);
- unsetSuccessAndSendWriteRequest(newEnsemble,
replaced);
+
changingEnsemble = false;
}
}
+ if (newEnsemble != null) { // unsetSuccess outside of
lock
+ unsetSuccessAndSendWriteRequest(newEnsemble,
replaced);
+ }
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
index 9f95a94..1b8220c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
@@ -447,4 +447,50 @@ public class HandleFailuresTest {
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L),
Lists.newArrayList(b1, b2, b3));
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(1L),
Lists.newArrayList(b1, b2, b4));
}
+
+ @Test
+ public void testHandleFailureBookieNotInWriteSet() throws Exception {
+ MockClientContext clientCtx = MockClientContext.create();
+ Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
+ LedgerMetadataBuilder.create()
+
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+ .newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
+ clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+ CompletableFuture<Void> b1Delay = new CompletableFuture<>();
+ // Delay the first write to b1, then error it
+ clientCtx.getMockBookieClient().setPreWriteHook((bookie, ledgerId,
entryId) -> {
+ if (bookie.equals(b1)) {
+ return b1Delay;
+ } else {
+ return FutureUtils.value(null);
+ }
+ });
+
+ CompletableFuture<Void> changeInProgress = new CompletableFuture<>();
+ CompletableFuture<Void> blockEnsembleChange = new
CompletableFuture<>();
+ clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata)
-> {
+ changeInProgress.complete(null);
+ return blockEnsembleChange;
+ });
+
+ LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md,
BookKeeper.DigestType.CRC32C,
+ ClientUtil.PASSWD, WriteFlag.NONE);
+ log.info("b2 should be enough to complete first add");
+ lh.append("entry1".getBytes());
+
+ log.info("when b1 completes with failure, handleFailures should kick
off");
+ b1Delay.completeExceptionally(new BKException.BKWriteException());
+
+ log.info("write second entry, should have enough bookies, but blocks
completion on failure handling");
+ CompletableFuture<?> e2 = lh.appendAsync("entry2".getBytes());
+ changeInProgress.get();
+ assertEventuallyTrue("e2 should eventually complete", () ->
lh.pendingAddOps.peek().completed);
+ Assert.assertFalse("e2 shouldn't be completed to client", e2.isDone());
+ blockEnsembleChange.complete(null); // allow ensemble change to
continue
+
+ log.info("e2 should complete");
+ e2.get(10, TimeUnit.SECONDS);
+ }
+
}