This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 60ff226  ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes
60ff226 is described below

commit 60ff22670f885058ebedb19c19ff40856e74b53c
Author: cguttapalem <[email protected]>
AuthorDate: Wed Jan 24 21:56:26 2018 -0800

    ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes
    
    Descriptions of the changes in this PR:
    
    - fix for Bug 2 of 1014. Fixing race condition with node removal in 
'asyncProcessLedgers'
     and making it robust to concurrent modifications like deletions of ledgers.
    - fix for Bug 1 of 1014, is already made as part of Issue #978 (decouple 
metaformat cmd)
     feature.
    - testcases to validate the fixes.
    
    Master Issue: #1014
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1048 from reddycharan/asyncprocessfix, closes #1014
---
 .../meta/AbstractHierarchicalLedgerManager.java    | 12 ++++-
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  5 +-
 .../bookkeeper/meta/LedgerManagerIteratorTest.java | 58 +++++++++++++++++++++-
 3 files changed, 71 insertions(+), 4 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 510f04c..1aa8e15 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -63,7 +63,11 @@ public abstract class AbstractHierarchicalLedgerManager 
extends AbstractZkLedger
         zk.sync(path, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
-                if (rc != Code.OK.intValue()) {
+                if (rc == Code.NONODE.intValue()) {
+                    // Raced with node removal
+                    finalCb.processResult(successRc, null, context);
+                    return;
+                } else if (rc != Code.OK.intValue()) {
                     LOG.error("Error syncing path " + path + " when getting 
its chidren: ",
                               
KeeperException.create(KeeperException.Code.get(rc), path));
                     finalCb.processResult(failureRc, null, context);
@@ -74,7 +78,11 @@ public abstract class AbstractHierarchicalLedgerManager 
extends AbstractZkLedger
                     @Override
                     public void processResult(int rc, String path, Object ctx,
                                               List<String> levelNodes) {
-                        if (rc != Code.OK.intValue()) {
+                        if (rc == Code.NONODE.intValue()) {
+                            // Raced with node removal
+                            finalCb.processResult(successRc, null, context);
+                            return;
+                        } else if (rc != Code.OK.intValue()) {
                             LOG.error("Error polling hash nodes of " + path,
                                       
KeeperException.create(KeeperException.Code.get(rc), path));
                             finalCb.processResult(failureRc, null, context);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 56fd035..6f1872c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -466,7 +466,10 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
         ZkUtils.getChildrenInSingleNode(zk, path, new 
GenericCallback<List<String>>() {
             @Override
             public void operationComplete(int rc, List<String> ledgerNodes) {
-                if (Code.OK.intValue() != rc) {
+                if (Code.NONODE.intValue() == rc) {
+                    finalCb.processResult(successRc, null, ctx);
+                    return;
+                } else if (Code.OK.intValue() != rc) {
                     finalCb.processResult(failureRc, null, ctx);
                     return;
                 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index b3aa425..7d421aa 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -37,9 +37,11 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -162,6 +164,24 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         return ret;
     }
 
+    static Set<Long> getLedgerIdsByUsingAsyncProcessLedgers(LedgerManager lm) 
throws InterruptedException{
+        Set<Long> ledgersReadAsync = ConcurrentHashMap.newKeySet();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger finalRC = new AtomicInteger();
+
+        lm.asyncProcessLedgers((ledgerId, callback) -> {
+            ledgersReadAsync.add(ledgerId);
+            callback.processResult(BKException.Code.OK, null, null);
+        }, (rc, s, obj) -> {
+            finalRC.set(rc);
+            latch.countDown();
+        }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+        latch.await();
+        assertEquals("Final RC of asyncProcessLedgers", BKException.Code.OK, 
finalRC.get());
+        return ledgersReadAsync;
+    }
+
     @Test
     public void testIterateNoLedgers() throws Exception {
         LedgerManager lm = getLedgerManager();
@@ -186,6 +206,9 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         Set<Long> lids = ledgerRangeToSet(lri);
         assertEquals(lids.size(), 1);
         assertEquals(lids.iterator().next().longValue(), id);
+
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", lids, 
ledgersReadAsync);
     }
 
     @Test
@@ -201,6 +224,9 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, 
ledgersReadAsync);
     }
 
     @Test
@@ -217,6 +243,9 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, 
ledgersReadAsync);
     }
 
     @Test
@@ -306,6 +335,9 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
 
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, 
ledgersReadAsync);
+
         lri = lm.getLedgerRanges();
         int emptyRanges = 0;
         while (lri.hasNext()) {
@@ -352,6 +384,9 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, 
ledgersReadAsync);
     }
 
     @Test
@@ -423,7 +458,12 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
                         for (long id: mustExist) {
                             assertTrue(returnedIds.contains(id));
                         }
-                    } catch (IOException e) {
+
+                        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(checkerLM);
+                        for (long id: mustExist) {
+                            assertTrue(ledgersReadAsync.contains(id));
+                        }
+                    } catch (IOException | InterruptedException e) {
                         e.printStackTrace();
                         fail("Got exception scanning ledgers: " + 
e.toString());
                     }
@@ -517,4 +557,20 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain 
all the invalid znodes created",
                 
childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
     }
+
+    @Test
+    public void hierarchicalLedgerManagerAsyncProcessLedgersTest() throws 
Throwable {
+        
Assume.assumeTrue(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class));
+        LedgerManager lm = getLedgerManager();
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+
+        Set<Long> ledgerIds = new TreeSet<>(Arrays.asList(1234L, 
123456789123456789L));
+        for (Long ledgerId : ledgerIds) {
+            createLedger(lm, ledgerId, Optional.of(BKException.Code.OK));
+        }
+        Set<Long> ledgersReadThroughIterator = ledgerRangeToSet(lri);
+        assertEquals("Comparing LedgersIds read through Iterator", ledgerIds, 
ledgersReadThroughIterator);
+        Set<Long> ledgersReadAsync = 
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ledgerIds, 
ledgersReadAsync);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to