This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 155c3c9 ISSUE #865: LongHierarchicalLedgerManager: refactor to fix
races
155c3c9 is described below
commit 155c3c95f3fcc823432e23a93a8cd3f51ebff917
Author: Samuel Just <[email protected]>
AuthorDate: Tue Dec 19 18:42:53 2017 -0800
ISSUE #865: LongHierarchicalLedgerManager: refactor to fix races
LongHierarchicalLedgerRangeIterator.initialize() could erroneously exit
with iteratorDone if the lexicographically first path in zk had fewer
than 4 levels. This can happen for a few reasons including a case where
a client creating a ledger on that path crashed during the zk updates or
the iterator.hasNext() call simply raced with an in-progress node
creation or removal. ScanAndCompareGarbageCollector is hasNext()
returns false will delete all ledgers on the bookie, so this is a fairly
serious bug.
Ruling out other such bugs was fairly dificult with the structure of the
code as written, so instead use a simpler recursive iterator design with
simpler pre/post conditions.
Also, surface KeeperException.NoNodeException from
ZkUtils.getChildrenInSingleNode so that we can actually handle it in
LHLM.
Master Issue: #865
Author: Samuel Just <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #875 from athanatos/forupstream/issue-865, closes #865
---
.../apache/bookkeeper/meta/FlatLedgerManager.java | 3 +
.../org/apache/bookkeeper/meta/LedgerManager.java | 2 +-
.../meta/LegacyHierarchicalLedgerManager.java | 5 +
.../meta/LongHierarchicalLedgerManager.java | 301 +++++++---------
.../java/org/apache/bookkeeper/util/ZkUtils.java | 7 +-
.../bookkeeper/meta/LedgerManagerIteratorTest.java | 390 ++++++++++++++++++++-
.../bookkeeper/meta/LedgerManagerTestCase.java | 5 +
7 files changed, 536 insertions(+), 177 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index a155c4d..26f55dc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -27,6 +27,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,6 +110,8 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
zkActiveLedgers = ledgerListToSet(
ZkUtils.getChildrenInSingleNode(zk,
ledgerRootPath), ledgerRootPath);
nextRange = new LedgerRange(zkActiveLedgers);
+ } catch (KeeperException.NoNodeException e) {
+ throw new IOException("Path does not exist: " +
ledgerRootPath, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Error when get child nodes from
zk", ie);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 3d8b693..6b56a3e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -202,7 +202,7 @@ public interface LedgerManager extends Closeable {
/**
* Get the next element.
*
- * @return the next element.
+ * @return the next element, the LedgerRange returned must be non-empty
* @throws IOException thrown when there is a problem accessing the
ledger
* metadata store. It is critical that it doesn't return false in the
case
* in the case it fails to access the ledger metadata store. Otherwise
it
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index 11e2314..ea85a8d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -261,6 +262,10 @@ class LegacyHierarchicalLedgerManager extends
AbstractHierarchicalLedgerManager
List<String> ledgerNodes = null;
try {
ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+ } catch (KeeperException.NoNodeException e) {
+ /* If the node doesn't exist, we must have raced with a
recursive node removal, just
+ * return an empty list. */
+ ledgerNodes = new ArrayList<>();
} catch (InterruptedException e) {
throw new IOException("Error when get child nodes from zk", e);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index d9f2c1f..e5bdb1c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
import java.util.NoSuchElementException;
+import java.util.Set;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -61,9 +61,6 @@ class LongHierarchicalLedgerManager extends
AbstractHierarchicalLedgerManager {
static final Logger LOG =
LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
static final String IDGEN_ZNODE = "idgen-long";
- private static final String MAX_ID_SUFFIX = "9999";
- private static final String MIN_ID_SUFFIX = "0000";
-
/**
* Constructor.
@@ -95,41 +92,6 @@ class LongHierarchicalLedgerManager extends
AbstractHierarchicalLedgerManager {
// Active Ledger Manager
//
- /**
- * Get the smallest cache id in a specified node
/level0/level1/level2/level3.
- *
- * @param level0
- * 1st level node name
- * @param level1
- * 2nd level node name
- * @param level2
- * 3rd level node name
- * @param level3
- * 4th level node name
- * @return the smallest ledger id
- */
- private long getStartLedgerIdByLevel(String level0, String level1, String
level2, String level3)
- throws IOException {
- return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
- }
-
- /**
- * Get the largest cache id in a specified node
/level0/level1/level2/level3.
- *
- * @param level0
- * 1st level node name
- * @param level1
- * 2nd level node name
- * @param level2
- * 3rd level node name
- * @param level3
- * 4th level node name
- * @return the largest ledger id
- */
- private long getEndLedgerIdByLevel(String level0, String level1, String
level2, String level3) throws IOException {
- return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
- }
-
@Override
public void asyncProcessLedgers(final Processor<Long> processor, final
AsyncCallback.VoidCallback finalCb,
final Object context, final int successRc, final int failureRc) {
@@ -188,170 +150,165 @@ class LongHierarchicalLedgerManager extends
AbstractHierarchicalLedgerManager {
return new LongHierarchicalLedgerRangeIterator();
}
+
/**
- * Iterator through each metadata bucket with hierarchical mode.
+ * Iterates recursively through each metadata bucket.
*/
private class LongHierarchicalLedgerRangeIterator implements
LedgerRangeIterator {
- private List<Iterator<String>> levelNodesIter;
- private List<String> curLevelNodes;
-
- private boolean initialized = false;
- private boolean iteratorDone = false;
- private LedgerRange nextRange = null;
+ LedgerRangeIterator rootIterator;
- private LongHierarchicalLedgerRangeIterator() {
- levelNodesIter = new
ArrayList<Iterator<String>>(Collections.nCopies(4, (Iterator<String>) null));
- curLevelNodes = new ArrayList<String>(Collections.nCopies(4,
(String) null));
+ /**
+ * Returns all children with path as a parent. If path is
non-existent,
+ * returns an empty list anyway (after all, there are no children
there).
+ * Maps all exceptions (other than NoNode) to IOException in keeping
with
+ * LedgerRangeIterator.
+ *
+ * @param path
+ * @return Iterator into set of all children with path as a parent
+ * @throws IOException
+ */
+ List<String> getChildrenAt(String path) throws IOException {
+ try {
+ List<String> children = ZkUtils.getChildrenInSingleNode(zk,
path);
+ Collections.sort(children);
+ return children;
+ } catch (KeeperException.NoNodeException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NoNodeException at path {}, assumed race with
deletion", path);
+ }
+ return new ArrayList<>();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while reading ledgers at
path " + path, ie);
+ }
}
- private synchronized void initialize(String path, int level) throws
KeeperException, InterruptedException,
- IOException {
- List<String> levelNodes = zk.getChildren(path, null);
- Collections.sort(levelNodes);
- if (level == 0) {
- Iterator<String> l0NodesIter = levelNodes.iterator();
- levelNodesIter.set(0, l0NodesIter);
- while (l0NodesIter.hasNext()) {
- String curL0Node = l0NodesIter.next();
- if (!isSpecialZnode(curL0Node)) {
- curLevelNodes.set(0, curL0Node);
- break;
- }
- }
- } else {
- Iterator<String> lNodesIter = levelNodes.iterator();
- levelNodesIter.set(level, lNodesIter);
- if (lNodesIter.hasNext()) {
- String curLNode = lNodesIter.next();
- curLevelNodes.set(level, curLNode);
+ /**
+ * Represents the ledger range rooted at a leaf node, returns at most
one LedgerRange.
+ */
+ class LeafIterator implements LedgerRangeIterator {
+ // Null iff iteration is complete
+ LedgerRange range;
+
+ LeafIterator(String path) throws IOException {
+ List<String> ledgerLeafNodes = getChildrenAt(path);
+ Set<Long> ledgerIds = ledgerListToSet(ledgerLeafNodes, path);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK for hash node {}:
{}", path, ledgerIds);
}
+ if (!ledgerIds.isEmpty()) {
+ range = new LedgerRange(ledgerIds);
+ } // else, hasNext() should return false so that advance will
skip us and move on
}
- String curLNode = curLevelNodes.get(level);
- if (curLNode != null) {
- // Traverse down through levels 0-3
- // The nextRange becomes a listing of the children
- // in the level4 directory.
- if (level != 3) {
- String nextLevelPath = path + "/" + curLNode;
- initialize(nextLevelPath, level + 1);
- } else {
- nextRange = getLedgerRangeByLevel(curLevelNodes);
- initialized = true;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return range != null;
+ }
+
+ @Override
+ public LedgerRange next() throws IOException {
+ if (range == null) {
+ throw new NoSuchElementException(
+ "next() must only be called if hasNext() is true");
}
- } else {
- iteratorDone = true;
+ LedgerRange ret = range;
+ range = null;
+ return ret;
}
}
- private void clearHigherLevels(int level) {
- for (int i = level + 1; i < 4; i++) {
- curLevelNodes.set(i, null);
+
+ /**
+ * The main constraint is that between calls one of two things must be
true.
+ * 1) nextLevelIterator is null and thisLevelIterator.hasNext() ==
false: iteration complete, hasNext()
+ * returns false
+ * 2) nextLevelIterator is non-null: nextLevelIterator.hasNext() must
return true and nextLevelIterator.next()
+ * must return the next LedgerRange
+ * The above means that nextLevelIterator != null ==>
nextLevelIterator.hasNext()
+ * It also means that hasNext() iff nextLevelIterator != null
+ */
+ private class InnerIterator implements LedgerRangeIterator {
+ final String path;
+ final int level;
+
+ // Always non-null
+ final Iterator<String> thisLevelIterator;
+ // non-null iff nextLevelIterator.hasNext() is true
+ LedgerRangeIterator nextLevelIterator;
+
+ /**
+ * Builds InnerIterator.
+ *
+ * @param path Subpath for thisLevelIterator
+ * @param level Level of thisLevelIterator (must be <= 3)
+ * @throws IOException
+ */
+ InnerIterator(String path, int level) throws IOException {
+ this.path = path;
+ this.level = level;
+ thisLevelIterator = getChildrenAt(path).iterator();
+ advance();
}
- }
- private synchronized boolean moveToNext(int level) throws
KeeperException, InterruptedException {
- Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
- boolean movedToNextNode = false;
- if (level == 0) {
- while (curLevelNodesIter.hasNext()) {
- String nextNode = curLevelNodesIter.next();
- if (isSpecialZnode(nextNode)) {
+ /**
+ * Resolves the difference between cases 1 and 2 after
nextLevelIterator is exhausted.
+ * Pre-condition: nextLevelIterator == null, thisLevelIterator !=
null
+ * Post-condition: nextLevelIterator == null &&
!thisLevelIterator.hasNext() OR
+ * nextLevelIterator.hasNext() == true and
nextLevelIterator.next()
+ * yields the next result of next()
+ * @throws IOException Exception representing error
+ */
+ void advance() throws IOException {
+ while (thisLevelIterator.hasNext()) {
+ String node = thisLevelIterator.next();
+ if (level == 0 && isSpecialZnode(node)) {
continue;
- } else {
- curLevelNodes.set(level, nextNode);
- clearHigherLevels(level);
- movedToNextNode = true;
- break;
}
- }
- } else {
- if (curLevelNodesIter.hasNext()) {
- String nextNode = curLevelNodesIter.next();
- curLevelNodes.set(level, nextNode);
- clearHigherLevels(level);
- movedToNextNode = true;
- } else {
- movedToNextNode = moveToNext(level - 1);
- if (movedToNextNode) {
- StringBuilder path = new StringBuilder(ledgerRootPath);
- for (int i = 0; i < level; i++) {
- path =
path.append("/").append(curLevelNodes.get(i));
- }
- List<String> newCurLevelNodesList =
zk.getChildren(path.toString(), null);
- Collections.sort(newCurLevelNodesList);
- Iterator<String> newCurLevelNodesIter =
newCurLevelNodesList.iterator();
- levelNodesIter.set(level, newCurLevelNodesIter);
- if (newCurLevelNodesIter.hasNext()) {
- curLevelNodes.set(level,
newCurLevelNodesIter.next());
- clearHigherLevels(level);
- movedToNextNode = true;
- }
+ LedgerRangeIterator nextIterator = level < 3
+ ? new InnerIterator(path + "/" + node, level + 1)
+ : new LeafIterator(path + "/" + node);
+ if (nextIterator.hasNext()) {
+ nextLevelIterator = nextIterator;
+ break;
}
}
}
- return movedToNextNode;
- }
- private synchronized void preload() throws IOException,
KeeperException, InterruptedException {
- if (!iteratorDone && !initialized) {
- initialize(ledgerRootPath, 0);
+ @Override
+ public boolean hasNext() throws IOException {
+ return nextLevelIterator != null;
}
- while (((nextRange == null) || (nextRange.size() == 0)) &&
!iteratorDone) {
- boolean movedToNextNode = moveToNext(3);
- if (movedToNextNode) {
- nextRange = getLedgerRangeByLevel(curLevelNodes);
- } else {
- iteratorDone = true;
+
+ @Override
+ public LedgerRange next() throws IOException {
+ LedgerRange ret = nextLevelIterator.next();
+ if (!nextLevelIterator.hasNext()) {
+ nextLevelIterator = null;
+ advance();
}
+ return ret;
}
}
- @Override
- public synchronized boolean hasNext() throws IOException {
- try {
- preload();
- } catch (KeeperException ke) {
- throw new IOException("Error preloading next range", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while preloading", ie);
+ private LongHierarchicalLedgerRangeIterator() {}
+
+ private void bootstrap() throws IOException {
+ if (rootIterator == null) {
+ rootIterator = new InnerIterator(ledgerRootPath, 0);
}
- return nextRange != null && !iteratorDone;
}
@Override
- public synchronized LedgerRange next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- LedgerRange r = nextRange;
- nextRange = null;
- return r;
+ public synchronized boolean hasNext() throws IOException {
+ bootstrap();
+ return rootIterator.hasNext();
}
- private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes)
throws IOException {
- String level0 = curLevelNodes.get(0);
- String level1 = curLevelNodes.get(1);
- String level2 = curLevelNodes.get(2);
- String level3 = curLevelNodes.get(3);
-
- StringBuilder nodeBuilder = new StringBuilder();
-
nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
- .append(level2).append("/").append(level3);
- String nodePath = nodeBuilder.toString();
- List<String> ledgerNodes = null;
- try {
- ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
- } catch (InterruptedException e) {
- throw new IOException("Error when get child nodes from zk", e);
- }
- NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes,
nodePath);
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK for hash node " + level0
+ "/" + level1 + "/" + level2 + "/"
- + level3 + " : " + zkActiveLedgers);
- }
- return new
LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1,
level2, level3), true,
- getEndLedgerIdByLevel(level0, level1, level2, level3),
true));
+ @Override
+ public synchronized LedgerRange next() throws IOException {
+ bootstrap();
+ return rootIterator.next();
}
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
index 7fceff6..e03777b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
@@ -223,7 +223,7 @@ public class ZkUtils {
* @throws IOException
*/
public static List<String> getChildrenInSingleNode(final ZooKeeper zk,
final String node)
- throws InterruptedException, IOException {
+ throws InterruptedException, IOException,
KeeperException.NoNodeException {
final GetChildrenCtx ctx = new GetChildrenCtx();
getChildrenInSingleNode(zk, node, new GenericCallback<List<String>>() {
@Override
@@ -244,11 +244,12 @@ public class ZkUtils {
ctx.wait();
}
}
- if (Code.OK.intValue() != ctx.rc) {
+ if (Code.NONODE.intValue() == ctx.rc) {
+ throw new KeeperException.NoNodeException("Got NoNode on call to
getChildren on path " + node);
+ } else if (Code.OK.intValue() != ctx.rc) {
throw new IOException("Error on getting children from node " +
node);
}
return ctx.children;
-
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index ce76e3e..239bc0e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -22,11 +22,37 @@
package org.apache.bookkeeper.meta;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
import org.junit.Test;
+
/**
* Test the ledger manager iterator.
*/
@@ -35,6 +61,104 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
super(lmFactoryCls);
}
+ final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+
+ Runnable safeWrapper(Runnable r) {
+ return () -> {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ };
+ }
+
+ @After
+ public void throwAsyncErrors() throws Throwable {
+ while (exceptions.peek() != null) {
+ throw exceptions.remove();
+ }
+ }
+
+ class RCCheckCB implements GenericCallback<Void> {
+ private final String opType;
+ private final CountDownLatch latch;
+ private final Optional<Integer> rcExpected;
+ private final long ledgerId;
+
+ public RCCheckCB(String opType, CountDownLatch latch,
Optional<Integer> rcExpected, long ledgerId) {
+ this.opType = opType;
+ this.latch = latch;
+ this.rcExpected = rcExpected;
+ this.ledgerId = ledgerId;
+ }
+
+ @Override
+ public void operationComplete(int rc, Void result) {
+ safeWrapper(() -> {
+ try {
+ rcExpected.map((Integer expected) -> {
+ assertEquals(
+ "Incorrect rc on ledger: " + ledgerId + ", op
type: " + opType,
+ expected.longValue(), rc);
+ return null;
+ });
+ } finally {
+ latch.countDown();
+ }
+ }).run();
+ }
+ }
+
+ /**
+ * Remove ledger using lm syncronously.
+ *
+ * @param lm
+ * @param ledgerId
+ * @param rcExpected return value expected, -1 to ignore
+ * @throws InterruptedException
+ */
+ void removeLedger(LedgerManager lm, Long ledgerId, Optional<Integer>
rcExpected) throws Throwable {
+ CountDownLatch latch = new CountDownLatch(1);
+ lm.removeLedgerMetadata(
+ ledgerId, Version.ANY, new RCCheckCB("removeLedger", latch,
rcExpected, ledgerId));
+ latch.await();
+ throwAsyncErrors();
+
+ }
+
+ /**
+ * Create ledger using lm syncronously.
+ *
+ * @param lm
+ * @param ledgerId
+ * @param rcExpected return value expected, -1 to ignore
+ * @throws InterruptedException
+ */
+ void createLedger(LedgerManager lm, Long ledgerId, Optional<Integer>
rcExpected) throws Throwable {
+ LedgerMetadata meta = new LedgerMetadata(
+ 3, 3, 2,
+ BookKeeper.DigestType.CRC32, "passwd".getBytes());
+ CountDownLatch latch = new CountDownLatch(1);
+ lm.createLedgerMetadata(
+ ledgerId, meta, new RCCheckCB("createLedger", latch,
rcExpected, ledgerId));
+ latch.await();
+ throwAsyncErrors();
+ }
+
+ static Set<Long> ledgerRangeToSet(LedgerRangeIterator lri) throws
IOException {
+ Set<Long> ret = new TreeSet<>();
+ long last = -1;
+ while (lri.hasNext()) {
+ LedgerManager.LedgerRange lr = lri.next();
+ assertFalse("ledger range must not be empty",
lr.getLedgers().isEmpty());
+ assertTrue("ledger ranges must not overlap", last < lr.start());
+ ret.addAll(lr.getLedgers());
+ last = lr.end();
+ }
+ return ret;
+ }
+
@Test
public void testIterateNoLedgers() throws Exception {
LedgerManager lm = getLedgerManager();
@@ -45,6 +169,270 @@ public class LedgerManagerIteratorTest extends
LedgerManagerTestCase {
}
assertEquals(false, lri.hasNext());
- assertEquals(false, lri.hasNext());
+ }
+
+ @Test
+ public void testSingleLedger() throws Throwable {
+ LedgerManager lm = getLedgerManager();
+
+ long id = 2020202;
+ createLedger(lm, id, Optional.of(BKException.Code.OK));
+
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ assertNotNull(lri);
+ Set<Long> lids = ledgerRangeToSet(lri);
+ assertEquals(lids.size(), 1);
+ assertEquals(lids.iterator().next().longValue(), id);
+ }
+
+ @Test
+ public void testTwoLedgers() throws Throwable {
+ LedgerManager lm = getLedgerManager();
+
+ Set<Long> ids = new TreeSet<>(Arrays.asList(101010101L, 2020340302L));
+ for (Long id: ids) {
+ createLedger(lm, id, Optional.of(BKException.Code.OK));
+ }
+
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ assertNotNull(lri);
+ Set<Long> returnedIds = ledgerRangeToSet(lri);
+ assertEquals(ids, returnedIds);
+ }
+
+ @Test
+ public void testSeveralContiguousLedgers() throws Throwable {
+ LedgerManager lm = getLedgerManager();
+
+ Set<Long> ids = new TreeSet<>();
+ for (long i = 0; i < 2000; ++i) {
+ createLedger(lm, i, Optional.of(BKException.Code.OK));
+ ids.add(i);
+ }
+
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ assertNotNull(lri);
+ Set<Long> returnedIds = ledgerRangeToSet(lri);
+ assertEquals(ids, returnedIds);
+ }
+
+ @Test
+ public void testRemovalOfNodeJustTraversed() throws Throwable {
+ if (baseConf.getLedgerManagerFactoryClass()
+ != LongHierarchicalLedgerManagerFactory.class) {
+ return;
+ }
+ LedgerManager lm = getLedgerManager();
+
+ /* For LHLM, first two should be leaves on the same node, second
should be on adjacent level 4 node
+ * Removing all 3 once the iterator hits the first should result in
the whole tree path ending
+ * at that node disappearing. If this happens after the iterator
stops at that leaf, it should
+ * result in a few NodeExists errors (handled silently) as the
iterator fails back up the tree
+ * to the next path.
+ */
+ Set<Long> toRemove = new TreeSet<>(
+ Arrays.asList(
+ 3394498498348983841L,
+ 3394498498348983842L,
+ 3394498498348993841L));
+
+ long first = 2345678901234567890L;
+ // Nodes which should be listed anyway
+ Set<Long> mustHave = new TreeSet<>(
+ Arrays.asList(
+ first,
+ 6334994393848474732L));
+
+ Set<Long> ids = new TreeSet<>();
+ ids.addAll(toRemove);
+ ids.addAll(mustHave);
+ for (Long id: ids) {
+ createLedger(lm, id, Optional.of(BKException.Code.OK));
+ }
+
+ Set<Long> found = new TreeSet<>();
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ while (lri.hasNext()) {
+ LedgerManager.LedgerRange lr = lri.next();
+ found.addAll(lr.getLedgers());
+
+ if (lr.getLedgers().contains(first)) {
+ for (long id: toRemove) {
+ removeLedger(lm, id, Optional.of(BKException.Code.OK));
+ }
+ toRemove.clear();
+ }
+ }
+
+ for (long id: mustHave) {
+ assertTrue(found.contains(id));
+ }
+ }
+
+ @Test
+ public void validateEmptyL4PathSkipped() throws Throwable {
+ if (baseConf.getLedgerManagerFactoryClass()
+ != LongHierarchicalLedgerManagerFactory.class) {
+ return;
+ }
+ LedgerManager lm = getLedgerManager();
+
+ Set<Long> ids = new TreeSet<>(
+ Arrays.asList(
+ 2345678901234567890L,
+ 3394498498348983841L,
+ 6334994393848474732L,
+ 7349370101927398483L));
+ for (Long id: ids) {
+ createLedger(lm, id, Optional.of(BKException.Code.OK));
+ }
+
+ String paths[] = {
+ "/ledgers/633/4994/3938/4948", // Empty L4 path, must be
skipped
+
+ };
+
+ for (String path : paths) {
+ ZkUtils.createFullPathOptimistic(
+ zkc,
+ path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ assertNotNull(lri);
+ Set<Long> returnedIds = ledgerRangeToSet(lri);
+ assertEquals(ids, returnedIds);
+
+ lri = lm.getLedgerRanges();
+ int emptyRanges = 0;
+ while (lri.hasNext()) {
+ if (lri.next().getLedgers().isEmpty()) {
+ emptyRanges++;
+ }
+ }
+ assertEquals(0, emptyRanges);
+ }
+
+ @Test
+ public void testWithSeveralIncompletePaths() throws Throwable {
+ if (baseConf.getLedgerManagerFactoryClass()
+ != LongHierarchicalLedgerManagerFactory.class) {
+ return;
+ }
+ LedgerManager lm = getLedgerManager();
+
+ Set<Long> ids = new TreeSet<>(
+ Arrays.asList(
+ 2345678901234567890L,
+ 3394498498348983841L,
+ 6334994393848474732L,
+ 7349370101927398483L));
+ for (Long id: ids) {
+ createLedger(lm, id, Optional.of(BKException.Code.OK));
+ }
+
+ String paths[] = {
+ "/ledgers/000/0000/0000", // top level, W-4292762
+ "/ledgers/234/5678/9999", // shares two path segments with the
first one, comes after
+ "/ledgers/339/0000/0000", // shares one path segment with the
second one, comes first
+ "/ledgers/633/4994/3938/0000", // shares three path segments
with the third one, comes first
+ "/ledgers/922/3372/0000/0000", // close to max long, at end
+
+ };
+ for (String path : paths) {
+ ZkUtils.createFullPathOptimistic(
+ zkc,
+ path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ LedgerRangeIterator lri = lm.getLedgerRanges();
+ assertNotNull(lri);
+ Set<Long> returnedIds = ledgerRangeToSet(lri);
+ assertEquals(ids, returnedIds);
+ }
+
+ @Test
+ public void checkConcurrentModifications() throws Throwable {
+ final int numWriters = 10;
+ final int numCheckers = 10;
+ final int numLedgers = 100;
+ final long runtime = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
+ final boolean longRange =
+ baseConf.getLedgerManagerFactoryClass() ==
LongHierarchicalLedgerManagerFactory.class;
+
+ final Set<Long> mustExist = new TreeSet<>();
+ LedgerManager lm = getLedgerManager();
+ Random rng = new Random();
+ for (int i = 0; i < numLedgers; ++i) {
+ long lid = Math.abs(rng.nextLong());
+ if (!longRange) {
+ lid %= 1000000;
+ }
+ createLedger(lm, lid, Optional.of(BKException.Code.OK));
+ mustExist.add(lid);
+ }
+
+ final long start = MathUtils.nowInNano();
+ final CountDownLatch latch = new CountDownLatch(1);
+ ArrayList<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < numWriters; ++i) {
+ Thread thread = new Thread(safeWrapper(() -> {
+ LedgerManager writerLM = getIndependentLedgerManager();
+ Random writerRNG = new Random(rng.nextLong());
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ fail("Checker interrupted");
+ }
+ while (MathUtils.elapsedNanos(start) < runtime) {
+ long candidate = 0;
+ do {
+ candidate = Math.abs(writerRNG.nextLong());
+ if (!longRange) {
+ candidate %= 1000000;
+ }
+ } while (mustExist.contains(candidate));
+ try {
+ createLedger(writerLM, candidate, Optional.empty());
+ removeLedger(writerLM, candidate, Optional.empty());
+ } catch (Throwable e) {
+ fail("Got exception thrashing store: " + e.toString());
+ }
+ }
+ }));
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < numCheckers; ++i) {
+ Thread thread = new Thread(safeWrapper(() -> {
+ LedgerManager checkerLM = getIndependentLedgerManager();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ fail("Checker interrupted");
+ e.printStackTrace();
+ }
+ while (MathUtils.elapsedNanos(start) < runtime) {
+ try {
+ LedgerRangeIterator lri = checkerLM.getLedgerRanges();
+ Set<Long> returnedIds = ledgerRangeToSet(lri);
+ for (long id: mustExist) {
+ assertTrue(returnedIds.contains(id));
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Got exception scanning ledgers: " +
e.toString());
+ }
+ }
+ }));
+ thread.start();
+ threads.add(thread);
+ }
+
+ latch.countDown();
+ for (Thread thread: threads) {
+ thread.join();
+ }
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index ea02a30..1ecd9f3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -69,6 +69,10 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
baseClientConf.setLedgerManagerFactoryClass(lmFactoryCls);
}
+ public LedgerManager getIndependentLedgerManager() {
+ return ledgerManagerFactory.newLedgerManager();
+ }
+
public LedgerManager getLedgerManager() {
if (null == ledgerManager) {
ledgerManager = ledgerManagerFactory.newLedgerManager();
@@ -88,6 +92,7 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
return Arrays.asList(new Object[][] {
{ FlatLedgerManagerFactory.class },
{ HierarchicalLedgerManagerFactory.class },
+ { LegacyHierarchicalLedgerManagerFactory.class },
{ LongHierarchicalLedgerManagerFactory.class },
{ MSLedgerManagerFactory.class },
});
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].