This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c64b3c  changingEnsemble should be negated before calling unset 
success
4c64b3c is described below

commit 4c64b3c78d1120d3421a9f50abb9ec9ca97e8864
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Dec 6 14:20:22 2018 +0100

    changingEnsemble should be negated before calling unset success
    
    If the first pending add op is completed, but does not have the
    replaced bookie in its write set, callbacks are triggered straight
    away.
    
    Previously this would then hang forever, as the changingEnsemble would
    be true. This patch sets changingEnsemble to false before calling
    unsetSuccessAndSendWriteRequest so that if callbacks are triggered
    straight away, they can actually complete. It also moves the call to
    unsetSuccessAndSendWriteRequest outside of the metadataLock so that
    the callbacks don't run inside the lock.
    
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1857 from ivankelly/block-order
---
 .../org/apache/bookkeeper/client/LedgerHandle.java | 11 ++++--
 .../bookkeeper/client/HandleFailuresTest.java      | 46 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 075788b..6f0af27 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1904,6 +1904,8 @@ public class LedgerHandle implements WriteHandle {
                             LOG.debug("{}[attempt:{}] Success updating 
metadata.", logContext, attempts.get());
                         }
 
+                        List<BookieSocketAddress> newEnsemble = null;
+                        Set<Integer> replaced = null;
                         synchronized (metadataLock) {
                             if (!delayedWriteFailedBookies.isEmpty()) {
                                 Map<Integer, BookieSocketAddress> toReplace = 
new HashMap<>(delayedWriteFailedBookies);
@@ -1911,13 +1913,16 @@ public class LedgerHandle implements WriteHandle {
 
                                 ensembleChangeLoop(origEnsemble, toReplace);
                             } else {
-                                List<BookieSocketAddress> newEnsemble = 
getCurrentEnsemble();
-                                Set<Integer> replaced = 
EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
+                                newEnsemble = getCurrentEnsemble();
+                                replaced = 
EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
                                 LOG.info("New Ensemble: {} for ledger: {}", 
newEnsemble, ledgerId);
-                                unsetSuccessAndSendWriteRequest(newEnsemble, 
replaced);
+
                                 changingEnsemble = false;
                             }
                         }
+                        if (newEnsemble != null) { // unsetSuccess outside of 
lock
+                            unsetSuccessAndSendWriteRequest(newEnsemble, 
replaced);
+                        }
                     }
             }, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
index 9f95a94..1b8220c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/HandleFailuresTest.java
@@ -447,4 +447,50 @@ public class HandleFailuresTest {
         Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), 
Lists.newArrayList(b1, b2, b3));
         Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(1L), 
Lists.newArrayList(b1, b2, b4));
     }
+
+    @Test
+    public void testHandleFailureBookieNotInWriteSet() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
+                LedgerMetadataBuilder.create()
+                
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+                .newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
+        clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+        CompletableFuture<Void> b1Delay = new CompletableFuture<>();
+        // Delay the first write to b1, then error it
+        clientCtx.getMockBookieClient().setPreWriteHook((bookie, ledgerId, 
entryId) -> {
+                if (bookie.equals(b1)) {
+                    return b1Delay;
+                } else {
+                    return FutureUtils.value(null);
+                }
+            });
+
+        CompletableFuture<Void> changeInProgress = new CompletableFuture<>();
+        CompletableFuture<Void> blockEnsembleChange = new 
CompletableFuture<>();
+        clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) 
-> {
+                changeInProgress.complete(null);
+                return blockEnsembleChange;
+            });
+
+        LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, 
BookKeeper.DigestType.CRC32C,
+                                           ClientUtil.PASSWD, WriteFlag.NONE);
+        log.info("b2 should be enough to complete first add");
+        lh.append("entry1".getBytes());
+
+        log.info("when b1 completes with failure, handleFailures should kick 
off");
+        b1Delay.completeExceptionally(new BKException.BKWriteException());
+
+        log.info("write second entry, should have enough bookies, but blocks 
completion on failure handling");
+        CompletableFuture<?> e2 = lh.appendAsync("entry2".getBytes());
+        changeInProgress.get();
+        assertEventuallyTrue("e2 should eventually complete", () -> 
lh.pendingAddOps.peek().completed);
+        Assert.assertFalse("e2 shouldn't be completed to client", e2.isDone());
+        blockEnsembleChange.complete(null); // allow ensemble change to 
continue
+
+        log.info("e2 should complete");
+        e2.get(10, TimeUnit.SECONDS);
+    }
+
 }

Reply via email to