This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 60ff226 ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes
60ff226 is described below
commit 60ff22670f885058ebedb19c19ff40856e74b53c
Author: cguttapalem <[email protected]>
AuthorDate: Wed Jan 24 21:56:26 2018 -0800
ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes
Descriptions of the changes in this PR:
- fix for Bug 2 of 1014. Fixing race condition with node removal in
'asyncProcessLedgers'
and making it robust to concurrent modifications like deletions of ledgers.
- fix for Bug 1 of 1014, is already made as part of Issue #978 (decouple
metaformat cmd)
feature.
- testcases to validate the fixes.
Master Issue: #1014
Author: cguttapalem <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1048 from reddycharan/asyncprocessfix, closes #1014
---
.../meta/AbstractHierarchicalLedgerManager.java | 12 ++++-
.../bookkeeper/meta/AbstractZkLedgerManager.java | 5 +-
.../bookkeeper/meta/LedgerManagerIteratorTest.java | 58 +++++++++++++++++++++-
3 files changed, 71 insertions(+), 4 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 510f04c..1aa8e15 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -63,7 +63,11 @@ public abstract class AbstractHierarchicalLedgerManager
extends AbstractZkLedger
zk.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
+ if (rc == Code.NONODE.intValue()) {
+ // Raced with node removal
+ finalCb.processResult(successRc, null, context);
+ return;
+ } else if (rc != Code.OK.intValue()) {
LOG.error("Error syncing path " + path + " when getting
its chidren: ",
KeeperException.create(KeeperException.Code.get(rc), path));
finalCb.processResult(failureRc, null, context);
@@ -74,7 +78,11 @@ public abstract class AbstractHierarchicalLedgerManager
extends AbstractZkLedger
@Override
public void processResult(int rc, String path, Object ctx,
List<String> levelNodes) {
- if (rc != Code.OK.intValue()) {
+ if (rc == Code.NONODE.intValue()) {
+ // Raced with node removal
+ finalCb.processResult(successRc, null, context);
+ return;
+ } else if (rc != Code.OK.intValue()) {
LOG.error("Error polling hash nodes of " + path,
KeeperException.create(KeeperException.Code.get(rc), path));
finalCb.processResult(failureRc, null, context);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 56fd035..6f1872c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -466,7 +466,10 @@ public abstract class AbstractZkLedgerManager implements
LedgerManager, Watcher
ZkUtils.getChildrenInSingleNode(zk, path, new
GenericCallback<List<String>>() {
@Override
public void operationComplete(int rc, List<String> ledgerNodes) {
- if (Code.OK.intValue() != rc) {
+ if (Code.NONODE.intValue() == rc) {
+ finalCb.processResult(successRc, null, ctx);
+ return;
+ } else if (Code.OK.intValue() != rc) {
finalCb.processResult(failureRc, null, ctx);
return;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index b3aa425..7d421aa 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -37,9 +37,11 @@ import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -162,6 +164,24 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
return ret;
}
+ static Set<Long> getLedgerIdsByUsingAsyncProcessLedgers(LedgerManager lm)
throws InterruptedException{
+ Set<Long> ledgersReadAsync = ConcurrentHashMap.newKeySet();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger finalRC = new AtomicInteger();
+
+ lm.asyncProcessLedgers((ledgerId, callback) -> {
+ ledgersReadAsync.add(ledgerId);
+ callback.processResult(BKException.Code.OK, null, null);
+ }, (rc, s, obj) -> {
+ finalRC.set(rc);
+ latch.countDown();
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+ latch.await();
+ assertEquals("Final RC of asyncProcessLedgers", BKException.Code.OK,
finalRC.get());
+ return ledgersReadAsync;
+ }
+
@Test
public void testIterateNoLedgers() throws Exception {
LedgerManager lm = getLedgerManager();
@@ -186,6 +206,9 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
Set<Long> lids = ledgerRangeToSet(lri);
assertEquals(lids.size(), 1);
assertEquals(lids.iterator().next().longValue(), id);
+
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", lids,
ledgersReadAsync);
}
@Test
@@ -201,6 +224,9 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
+
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", ids,
ledgersReadAsync);
}
@Test
@@ -217,6 +243,9 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
+
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", ids,
ledgersReadAsync);
}
@Test
@@ -306,6 +335,9 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", ids,
ledgersReadAsync);
+
lri = lm.getLedgerRanges();
int emptyRanges = 0;
while (lri.hasNext()) {
@@ -352,6 +384,9 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
assertNotNull(lri);
Set<Long> returnedIds = ledgerRangeToSet(lri);
assertEquals(ids, returnedIds);
+
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", ids,
ledgersReadAsync);
}
@Test
@@ -423,7 +458,12 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
for (long id: mustExist) {
assertTrue(returnedIds.contains(id));
}
- } catch (IOException e) {
+
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(checkerLM);
+ for (long id: mustExist) {
+ assertTrue(ledgersReadAsync.contains(id));
+ }
+ } catch (IOException | InterruptedException e) {
e.printStackTrace();
fail("Got exception scanning ledgers: " +
e.toString());
}
@@ -517,4 +557,20 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain
all the invalid znodes created",
childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
}
+
+ @Test
+ public void hierarchicalLedgerManagerAsyncProcessLedgersTest() throws
Throwable {
+
Assume.assumeTrue(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class));
+ LedgerManager lm = getLedgerManager();
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+
+ Set<Long> ledgerIds = new TreeSet<>(Arrays.asList(1234L,
123456789123456789L));
+ for (Long ledgerId : ledgerIds) {
+ createLedger(lm, ledgerId, Optional.of(BKException.Code.OK));
+ }
+ Set<Long> ledgersReadThroughIterator = ledgerRangeToSet(lri);
+ assertEquals("Comparing LedgersIds read through Iterator", ledgerIds,
ledgersReadThroughIterator);
+ Set<Long> ledgersReadAsync =
getLedgerIdsByUsingAsyncProcessLedgers(lm);
+ assertEquals("Comparing LedgersIds read asynchronously", ledgerIds,
ledgersReadAsync);
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].