sijie closed pull request #875: Issue 865: LongHierarchicalLedgerManager: 
refactor to fix races
URL: https://github.com/apache/bookkeeper/pull/875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index a155c4d28..26f55dc9e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -27,6 +27,7 @@
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,6 +110,8 @@ private synchronized void preload() throws IOException {
                     zkActiveLedgers = ledgerListToSet(
                             ZkUtils.getChildrenInSingleNode(zk, 
ledgerRootPath), ledgerRootPath);
                     nextRange = new LedgerRange(zkActiveLedgers);
+                } catch (KeeperException.NoNodeException e) {
+                    throw new IOException("Path does not exist: " + 
ledgerRootPath, e);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     throw new IOException("Error when get child nodes from 
zk", ie);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 3d8b693eb..6b56a3e6e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -202,7 +202,7 @@ public Long end() {
         /**
          * Get the next element.
          *
-         * @return the next element.
+         * @return the next element, the LedgerRange returned must be non-empty
          * @throws IOException thrown when there is a problem accessing the 
ledger
          * metadata store. It is critical that it doesn't return false in the 
case
          * in the case it fails to access the ledger metadata store. Otherwise 
it
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index 11e2314f3..ea85a8da6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -261,6 +262,10 @@ LedgerRange getLedgerRangeByLevel(final String level1, 
final String level2)
             List<String> ledgerNodes = null;
             try {
                 ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (KeeperException.NoNodeException e) {
+                /* If the node doesn't exist, we must have raced with a 
recursive node removal, just
+                 * return an empty list. */
+                ledgerNodes = new ArrayList<>();
             } catch (InterruptedException e) {
                 throw new IOException("Error when get child nodes from zk", e);
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index d9f2c1f21..e5bdb1c41 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -22,8 +22,8 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -61,9 +61,6 @@
     static final Logger LOG = 
LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
 
     static final String IDGEN_ZNODE = "idgen-long";
-    private static final String MAX_ID_SUFFIX = "9999";
-    private static final String MIN_ID_SUFFIX = "0000";
-
 
     /**
      * Constructor.
@@ -95,41 +92,6 @@ public String getLedgerPath(long ledgerId) {
     // Active Ledger Manager
     //
 
-    /**
-     * Get the smallest cache id in a specified node 
/level0/level1/level2/level3.
-     *
-     * @param level0
-     *            1st level node name
-     * @param level1
-     *            2nd level node name
-     * @param level2
-     *            3rd level node name
-     * @param level3
-     *            4th level node name
-     * @return the smallest ledger id
-     */
-    private long getStartLedgerIdByLevel(String level0, String level1, String 
level2, String level3)
-            throws IOException {
-        return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
-    }
-
-    /**
-     * Get the largest cache id in a specified node 
/level0/level1/level2/level3.
-     *
-     * @param level0
-     *            1st level node name
-     * @param level1
-     *            2nd level node name
-     * @param level2
-     *            3rd level node name
-     * @param level3
-     *            4th level node name
-     * @return the largest ledger id
-     */
-    private long getEndLedgerIdByLevel(String level0, String level1, String 
level2, String level3) throws IOException {
-        return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
-    }
-
     @Override
     public void asyncProcessLedgers(final Processor<Long> processor, final 
AsyncCallback.VoidCallback finalCb,
             final Object context, final int successRc, final int failureRc) {
@@ -188,170 +150,165 @@ public LedgerRangeIterator getLedgerRanges() {
         return new LongHierarchicalLedgerRangeIterator();
     }
 
+
     /**
-     * Iterator through each metadata bucket with hierarchical mode.
+     * Iterates recursively through each metadata bucket.
      */
     private class LongHierarchicalLedgerRangeIterator implements 
LedgerRangeIterator {
-        private List<Iterator<String>> levelNodesIter;
-        private List<String> curLevelNodes;
-
-        private boolean initialized = false;
-        private boolean iteratorDone = false;
-        private LedgerRange nextRange = null;
+        LedgerRangeIterator rootIterator;
 
-        private LongHierarchicalLedgerRangeIterator() {
-            levelNodesIter = new 
ArrayList<Iterator<String>>(Collections.nCopies(4, (Iterator<String>) null));
-            curLevelNodes = new ArrayList<String>(Collections.nCopies(4, 
(String) null));
+        /**
+         * Returns all children with path as a parent.  If path is 
non-existent,
+         * returns an empty list anyway (after all, there are no children 
there).
+         * Maps all exceptions (other than NoNode) to IOException in keeping 
with
+         * LedgerRangeIterator.
+         *
+         * @param path
+         * @return Iterator into set of all children with path as a parent
+         * @throws IOException
+         */
+        List<String> getChildrenAt(String path) throws IOException {
+            try {
+                List<String> children = ZkUtils.getChildrenInSingleNode(zk, 
path);
+                Collections.sort(children);
+                return children;
+            } catch (KeeperException.NoNodeException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("NoNodeException at path {}, assumed race with 
deletion", path);
+                }
+                return new ArrayList<>();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while reading ledgers at 
path " + path, ie);
+            }
         }
 
-        private synchronized void initialize(String path, int level) throws 
KeeperException, InterruptedException,
-                IOException {
-            List<String> levelNodes = zk.getChildren(path, null);
-            Collections.sort(levelNodes);
-            if (level == 0) {
-                Iterator<String> l0NodesIter = levelNodes.iterator();
-                levelNodesIter.set(0, l0NodesIter);
-                while (l0NodesIter.hasNext()) {
-                    String curL0Node = l0NodesIter.next();
-                    if (!isSpecialZnode(curL0Node)) {
-                        curLevelNodes.set(0, curL0Node);
-                        break;
-                    }
-                }
-            } else {
-                Iterator<String> lNodesIter = levelNodes.iterator();
-                levelNodesIter.set(level, lNodesIter);
-                if (lNodesIter.hasNext()) {
-                    String curLNode = lNodesIter.next();
-                    curLevelNodes.set(level, curLNode);
+        /**
+         * Represents the ledger range rooted at a leaf node, returns at most 
one LedgerRange.
+         */
+        class LeafIterator implements LedgerRangeIterator {
+            // Null iff iteration is complete
+            LedgerRange range;
+
+            LeafIterator(String path) throws IOException {
+                List<String> ledgerLeafNodes = getChildrenAt(path);
+                Set<Long> ledgerIds = ledgerListToSet(ledgerLeafNodes, path);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("All active ledgers from ZK for hash node {}: 
{}", path, ledgerIds);
                 }
+                if (!ledgerIds.isEmpty()) {
+                    range = new LedgerRange(ledgerIds);
+                } // else, hasNext() should return false so that advance will 
skip us and move on
             }
-            String curLNode = curLevelNodes.get(level);
-            if (curLNode != null) {
-                // Traverse down through levels 0-3
-                // The nextRange becomes a listing of the children
-                // in the level4 directory.
-                if (level != 3) {
-                    String nextLevelPath = path + "/" + curLNode;
-                    initialize(nextLevelPath, level + 1);
-                } else {
-                    nextRange = getLedgerRangeByLevel(curLevelNodes);
-                    initialized = true;
+
+            @Override
+            public boolean hasNext() throws IOException {
+                return range != null;
+            }
+
+            @Override
+            public LedgerRange next() throws IOException {
+                if (range == null) {
+                    throw new NoSuchElementException(
+                            "next() must only be called if hasNext() is true");
                 }
-            } else {
-                iteratorDone = true;
+                LedgerRange ret = range;
+                range = null;
+                return ret;
             }
         }
 
-        private void clearHigherLevels(int level) {
-            for (int i = level + 1; i < 4; i++) {
-                curLevelNodes.set(i, null);
+
+        /**
+         * The main constraint is that between calls one of two things must be 
true.
+         * 1) nextLevelIterator is null and thisLevelIterator.hasNext() == 
false: iteration complete, hasNext()
+         *    returns false
+         * 2) nextLevelIterator is non-null: nextLevelIterator.hasNext() must 
return true and nextLevelIterator.next()
+         *    must return the next LedgerRange
+         * The above means that nextLevelIterator != null ==> 
nextLevelIterator.hasNext()
+         * It also means that hasNext() iff nextLevelIterator != null
+         */
+        private class InnerIterator implements LedgerRangeIterator {
+            final String path;
+            final int level;
+
+            // Always non-null
+            final Iterator<String> thisLevelIterator;
+            // non-null iff nextLevelIterator.hasNext() is true
+            LedgerRangeIterator nextLevelIterator;
+
+            /**
+             * Builds InnerIterator.
+             *
+             * @param path Subpath for thisLevelIterator
+             * @param level Level of thisLevelIterator (must be <= 3)
+             * @throws IOException
+             */
+            InnerIterator(String path, int level) throws IOException {
+                this.path = path;
+                this.level = level;
+                thisLevelIterator = getChildrenAt(path).iterator();
+                advance();
             }
-        }
 
-        private synchronized boolean moveToNext(int level) throws 
KeeperException, InterruptedException {
-            Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
-            boolean movedToNextNode = false;
-            if (level == 0) {
-                while (curLevelNodesIter.hasNext()) {
-                    String nextNode = curLevelNodesIter.next();
-                    if (isSpecialZnode(nextNode)) {
+            /**
+             * Resolves the difference between cases 1 and 2 after 
nextLevelIterator is exhausted.
+             * Pre-condition: nextLevelIterator == null, thisLevelIterator != 
null
+             * Post-condition: nextLevelIterator == null && 
!thisLevelIterator.hasNext() OR
+             *                 nextLevelIterator.hasNext() == true and 
nextLevelIterator.next()
+             *                 yields the next result of next()
+             * @throws IOException Exception representing error
+             */
+            void advance() throws IOException {
+                while (thisLevelIterator.hasNext()) {
+                    String node = thisLevelIterator.next();
+                    if (level == 0 && isSpecialZnode(node)) {
                         continue;
-                    } else {
-                        curLevelNodes.set(level, nextNode);
-                        clearHigherLevels(level);
-                        movedToNextNode = true;
-                        break;
                     }
-                }
-            } else {
-                if (curLevelNodesIter.hasNext()) {
-                    String nextNode = curLevelNodesIter.next();
-                    curLevelNodes.set(level, nextNode);
-                    clearHigherLevels(level);
-                    movedToNextNode = true;
-                } else {
-                    movedToNextNode = moveToNext(level - 1);
-                    if (movedToNextNode) {
-                        StringBuilder path = new StringBuilder(ledgerRootPath);
-                        for (int i = 0; i < level; i++) {
-                            path = 
path.append("/").append(curLevelNodes.get(i));
-                        }
-                        List<String> newCurLevelNodesList = 
zk.getChildren(path.toString(), null);
-                        Collections.sort(newCurLevelNodesList);
-                        Iterator<String> newCurLevelNodesIter = 
newCurLevelNodesList.iterator();
-                        levelNodesIter.set(level, newCurLevelNodesIter);
-                        if (newCurLevelNodesIter.hasNext()) {
-                            curLevelNodes.set(level, 
newCurLevelNodesIter.next());
-                            clearHigherLevels(level);
-                            movedToNextNode = true;
-                        }
+                    LedgerRangeIterator nextIterator = level < 3
+                            ? new InnerIterator(path + "/" + node, level + 1)
+                            : new LeafIterator(path + "/" + node);
+                    if (nextIterator.hasNext()) {
+                        nextLevelIterator = nextIterator;
+                        break;
                     }
                 }
             }
-            return movedToNextNode;
-        }
 
-        private synchronized void preload() throws IOException, 
KeeperException, InterruptedException {
-            if (!iteratorDone && !initialized) {
-                initialize(ledgerRootPath, 0);
+            @Override
+            public boolean hasNext() throws IOException {
+                return nextLevelIterator != null;
             }
-            while (((nextRange == null) || (nextRange.size() == 0)) && 
!iteratorDone) {
-                boolean movedToNextNode = moveToNext(3);
-                if (movedToNextNode) {
-                    nextRange = getLedgerRangeByLevel(curLevelNodes);
-                } else {
-                    iteratorDone = true;
+
+            @Override
+            public LedgerRange next() throws IOException {
+                LedgerRange ret = nextLevelIterator.next();
+                if (!nextLevelIterator.hasNext()) {
+                    nextLevelIterator = null;
+                    advance();
                 }
+                return ret;
             }
         }
 
-        @Override
-        public synchronized boolean hasNext() throws IOException {
-            try {
-                preload();
-            } catch (KeeperException ke) {
-                throw new IOException("Error preloading next range", ke);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted while preloading", ie);
+        private LongHierarchicalLedgerRangeIterator() {}
+
+        private void bootstrap() throws IOException {
+            if (rootIterator == null) {
+                rootIterator = new InnerIterator(ledgerRootPath, 0);
             }
-            return nextRange != null && !iteratorDone;
         }
 
         @Override
-        public synchronized LedgerRange next() throws IOException {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            LedgerRange r = nextRange;
-            nextRange = null;
-            return r;
+        public synchronized boolean hasNext() throws IOException {
+            bootstrap();
+            return rootIterator.hasNext();
         }
 
-        private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) 
throws IOException {
-            String level0 = curLevelNodes.get(0);
-            String level1 = curLevelNodes.get(1);
-            String level2 = curLevelNodes.get(2);
-            String level3 = curLevelNodes.get(3);
-
-            StringBuilder nodeBuilder = new StringBuilder();
-            
nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
-                    .append(level2).append("/").append(level3);
-            String nodePath = nodeBuilder.toString();
-            List<String> ledgerNodes = null;
-            try {
-                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
-            } catch (InterruptedException e) {
-                throw new IOException("Error when get child nodes from zk", e);
-            }
-            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, 
nodePath);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node " + level0 
+ "/" + level1 + "/" + level2 + "/"
-                        + level3 + " : " + zkActiveLedgers);
-            }
-            return new 
LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, 
level2, level3), true,
-                    getEndLedgerIdByLevel(level0, level1, level2, level3), 
true));
+        @Override
+        public synchronized LedgerRange next() throws IOException {
+            bootstrap();
+            return rootIterator.next();
         }
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
index 7fceff686..e03777b1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
@@ -223,7 +223,7 @@ public void processResult(int rc2, String path, Object ctx) 
{
      * @throws IOException
      */
     public static List<String> getChildrenInSingleNode(final ZooKeeper zk, 
final String node)
-            throws InterruptedException, IOException {
+            throws InterruptedException, IOException, 
KeeperException.NoNodeException {
         final GetChildrenCtx ctx = new GetChildrenCtx();
         getChildrenInSingleNode(zk, node, new GenericCallback<List<String>>() {
             @Override
@@ -244,11 +244,12 @@ public void operationComplete(int rc, List<String> 
ledgers) {
                 ctx.wait();
             }
         }
-        if (Code.OK.intValue() != ctx.rc) {
+        if (Code.NONODE.intValue() == ctx.rc) {
+            throw new KeeperException.NoNodeException("Got NoNode on call to 
getChildren on path " + node);
+        } else if (Code.OK.intValue() != ctx.rc) {
             throw new IOException("Error on getting children from node " + 
node);
         }
         return ctx.children;
-
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index ce76e3e84..239bc0e9d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -22,11 +22,37 @@
 package org.apache.bookkeeper.meta;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
 import org.junit.Test;
 
+
 /**
  * Test the ledger manager iterator.
  */
@@ -35,6 +61,104 @@ public LedgerManagerIteratorTest(Class<? extends 
LedgerManagerFactory> lmFactory
         super(lmFactoryCls);
     }
 
+    final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+
+    Runnable safeWrapper(Runnable r) {
+        return () -> {
+            try {
+                r.run();
+            } catch (Throwable e) {
+                exceptions.add(e);
+            }
+        };
+    }
+
+    @After
+    public void throwAsyncErrors() throws Throwable {
+        while (exceptions.peek() != null) {
+            throw exceptions.remove();
+        }
+    }
+
+    class RCCheckCB implements GenericCallback<Void> {
+        private final String opType;
+        private final CountDownLatch latch;
+        private final Optional<Integer> rcExpected;
+        private final long ledgerId;
+
+        public RCCheckCB(String opType, CountDownLatch latch, 
Optional<Integer> rcExpected, long ledgerId) {
+            this.opType = opType;
+            this.latch = latch;
+            this.rcExpected = rcExpected;
+            this.ledgerId = ledgerId;
+        }
+
+        @Override
+        public void operationComplete(int rc, Void result) {
+            safeWrapper(() -> {
+                try {
+                    rcExpected.map((Integer expected) -> {
+                        assertEquals(
+                                "Incorrect rc on ledger: " + ledgerId + ", op 
type: " + opType,
+                                expected.longValue(), rc);
+                        return null;
+                    });
+                } finally {
+                    latch.countDown();
+                }
+            }).run();
+        }
+    }
+
+    /**
+     * Remove ledger using lm syncronously.
+     *
+     * @param lm
+     * @param ledgerId
+     * @param rcExpected return value expected, -1 to ignore
+     * @throws InterruptedException
+     */
+    void removeLedger(LedgerManager lm, Long ledgerId, Optional<Integer> 
rcExpected) throws Throwable {
+        CountDownLatch latch = new CountDownLatch(1);
+        lm.removeLedgerMetadata(
+                ledgerId, Version.ANY, new RCCheckCB("removeLedger", latch, 
rcExpected, ledgerId));
+        latch.await();
+        throwAsyncErrors();
+
+    }
+
+    /**
+     * Create ledger using lm syncronously.
+     *
+     * @param lm
+     * @param ledgerId
+     * @param rcExpected return value expected, -1 to ignore
+     * @throws InterruptedException
+     */
+    void createLedger(LedgerManager lm, Long ledgerId, Optional<Integer> 
rcExpected) throws Throwable {
+        LedgerMetadata meta = new LedgerMetadata(
+                3, 3, 2,
+                BookKeeper.DigestType.CRC32, "passwd".getBytes());
+        CountDownLatch latch = new CountDownLatch(1);
+        lm.createLedgerMetadata(
+                ledgerId, meta, new RCCheckCB("createLedger", latch, 
rcExpected, ledgerId));
+        latch.await();
+        throwAsyncErrors();
+    }
+
+    static Set<Long> ledgerRangeToSet(LedgerRangeIterator lri) throws 
IOException {
+        Set<Long> ret = new TreeSet<>();
+        long last = -1;
+        while (lri.hasNext()) {
+            LedgerManager.LedgerRange lr = lri.next();
+            assertFalse("ledger range must not be empty", 
lr.getLedgers().isEmpty());
+            assertTrue("ledger ranges must not overlap", last < lr.start());
+            ret.addAll(lr.getLedgers());
+            last = lr.end();
+        }
+        return ret;
+    }
+
     @Test
     public void testIterateNoLedgers() throws Exception {
         LedgerManager lm = getLedgerManager();
@@ -45,6 +169,270 @@ public void testIterateNoLedgers() throws Exception {
         }
 
         assertEquals(false, lri.hasNext());
-        assertEquals(false, lri.hasNext());
+    }
+
+    @Test
+    public void testSingleLedger() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        long id = 2020202;
+        createLedger(lm, id, Optional.of(BKException.Code.OK));
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> lids = ledgerRangeToSet(lri);
+        assertEquals(lids.size(), 1);
+        assertEquals(lids.iterator().next().longValue(), id);
+    }
+
+    @Test
+    public void testTwoLedgers() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(Arrays.asList(101010101L, 2020340302L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void testSeveralContiguousLedgers() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>();
+        for (long i = 0; i < 2000; ++i) {
+            createLedger(lm, i, Optional.of(BKException.Code.OK));
+            ids.add(i);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void testRemovalOfNodeJustTraversed() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        /* For LHLM, first two should be leaves on the same node, second 
should be on adjacent level 4 node
+         * Removing all 3 once the iterator hits the first should result in 
the whole tree path ending
+         * at that node disappearing.  If this happens after the iterator 
stops at that leaf, it should
+         * result in a few NodeExists errors (handled silently) as the 
iterator fails back up the tree
+         * to the next path.
+         */
+        Set<Long> toRemove = new TreeSet<>(
+                Arrays.asList(
+                        3394498498348983841L,
+                        3394498498348983842L,
+                        3394498498348993841L));
+
+        long first = 2345678901234567890L;
+        // Nodes which should be listed anyway
+        Set<Long> mustHave = new TreeSet<>(
+                Arrays.asList(
+                        first,
+                        6334994393848474732L));
+
+        Set<Long> ids = new TreeSet<>();
+        ids.addAll(toRemove);
+        ids.addAll(mustHave);
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        Set<Long> found = new TreeSet<>();
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        while (lri.hasNext()) {
+            LedgerManager.LedgerRange lr = lri.next();
+            found.addAll(lr.getLedgers());
+
+            if (lr.getLedgers().contains(first)) {
+                for (long id: toRemove) {
+                    removeLedger(lm, id, Optional.of(BKException.Code.OK));
+                }
+                toRemove.clear();
+            }
+        }
+
+        for (long id: mustHave) {
+            assertTrue(found.contains(id));
+        }
+    }
+
+    @Test
+    public void validateEmptyL4PathSkipped() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(
+                Arrays.asList(
+                        2345678901234567890L,
+                        3394498498348983841L,
+                        6334994393848474732L,
+                        7349370101927398483L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        String paths[] = {
+                "/ledgers/633/4994/3938/4948", // Empty L4 path, must be 
skipped
+
+        };
+
+        for (String path : paths) {
+            ZkUtils.createFullPathOptimistic(
+                    zkc,
+                    path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+
+        lri = lm.getLedgerRanges();
+        int emptyRanges = 0;
+        while (lri.hasNext()) {
+            if (lri.next().getLedgers().isEmpty()) {
+                emptyRanges++;
+            }
+        }
+        assertEquals(0, emptyRanges);
+    }
+
+    @Test
+    public void testWithSeveralIncompletePaths() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(
+                Arrays.asList(
+                        2345678901234567890L,
+                        3394498498348983841L,
+                        6334994393848474732L,
+                        7349370101927398483L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        String paths[] = {
+                "/ledgers/000/0000/0000", // top level, W-4292762
+                "/ledgers/234/5678/9999", // shares two path segments with the 
first one, comes after
+                "/ledgers/339/0000/0000", // shares one path segment with the 
second one, comes first
+                "/ledgers/633/4994/3938/0000", // shares three path segments 
with the third one, comes first
+                "/ledgers/922/3372/0000/0000", // close to max long, at end
+
+        };
+        for (String path : paths) {
+            ZkUtils.createFullPathOptimistic(
+                    zkc,
+                    path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void checkConcurrentModifications() throws Throwable {
+        final int numWriters = 10;
+        final int numCheckers = 10;
+        final int numLedgers = 100;
+        final long runtime = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
+        final boolean longRange =
+                baseConf.getLedgerManagerFactoryClass() == 
LongHierarchicalLedgerManagerFactory.class;
+
+        final Set<Long> mustExist = new TreeSet<>();
+        LedgerManager lm = getLedgerManager();
+        Random rng = new Random();
+        for (int i = 0; i < numLedgers; ++i) {
+            long lid = Math.abs(rng.nextLong());
+            if (!longRange) {
+                lid %= 1000000;
+            }
+            createLedger(lm, lid, Optional.of(BKException.Code.OK));
+            mustExist.add(lid);
+        }
+
+        final long start = MathUtils.nowInNano();
+        final CountDownLatch latch = new CountDownLatch(1);
+        ArrayList<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < numWriters; ++i) {
+            Thread thread = new Thread(safeWrapper(() -> {
+                LedgerManager writerLM = getIndependentLedgerManager();
+                Random writerRNG = new Random(rng.nextLong());
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    fail("Checker interrupted");
+                }
+                while (MathUtils.elapsedNanos(start) < runtime) {
+                    long candidate = 0;
+                    do {
+                        candidate = Math.abs(writerRNG.nextLong());
+                        if (!longRange) {
+                            candidate %= 1000000;
+                        }
+                    } while (mustExist.contains(candidate));
+                    try {
+                        createLedger(writerLM, candidate, Optional.empty());
+                        removeLedger(writerLM, candidate, Optional.empty());
+                    } catch (Throwable e) {
+                        fail("Got exception thrashing store: " + e.toString());
+                    }
+                }
+            }));
+            thread.start();
+            threads.add(thread);
+        }
+
+        for (int i = 0; i < numCheckers; ++i) {
+            Thread thread = new Thread(safeWrapper(() -> {
+                LedgerManager checkerLM = getIndependentLedgerManager();
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    fail("Checker interrupted");
+                    e.printStackTrace();
+                }
+                while (MathUtils.elapsedNanos(start) < runtime) {
+                    try {
+                        LedgerRangeIterator lri = checkerLM.getLedgerRanges();
+                        Set<Long> returnedIds = ledgerRangeToSet(lri);
+                        for (long id: mustExist) {
+                            assertTrue(returnedIds.contains(id));
+                        }
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        fail("Got exception scanning ledgers: " + 
e.toString());
+                    }
+                }
+            }));
+            thread.start();
+            threads.add(thread);
+        }
+
+        latch.countDown();
+        for (Thread thread: threads) {
+            thread.join();
+        }
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index ea02a3055..1ecd9f3b2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -69,6 +69,10 @@ public LedgerManagerTestCase(Class<? extends 
LedgerManagerFactory> lmFactoryCls,
         baseClientConf.setLedgerManagerFactoryClass(lmFactoryCls);
     }
 
+    public LedgerManager getIndependentLedgerManager() {
+        return ledgerManagerFactory.newLedgerManager();
+    }
+
     public LedgerManager getLedgerManager() {
         if (null == ledgerManager) {
             ledgerManager = ledgerManagerFactory.newLedgerManager();
@@ -88,6 +92,7 @@ public LedgerIdGenerator getLedgerIdGenerator() throws 
IOException {
         return Arrays.asList(new Object[][] {
             { FlatLedgerManagerFactory.class },
             { HierarchicalLedgerManagerFactory.class },
+            { LegacyHierarchicalLedgerManagerFactory.class },
             { LongHierarchicalLedgerManagerFactory.class },
             { MSLedgerManagerFactory.class },
         });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to