This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 155c3c9  ISSUE #865: LongHierarchicalLedgerManager: refactor to fix 
races
155c3c9 is described below

commit 155c3c95f3fcc823432e23a93a8cd3f51ebff917
Author: Samuel Just <[email protected]>
AuthorDate: Tue Dec 19 18:42:53 2017 -0800

    ISSUE #865: LongHierarchicalLedgerManager: refactor to fix races
    
    LongHierarchicalLedgerRangeIterator.initialize() could erroneously exit
    with iteratorDone if the lexicographically first path in zk had fewer
    than 4 levels.  This can happen for a few reasons including a case where
    a client creating a ledger on that path crashed during the zk updates or
    the iterator.hasNext() call simply raced with an in-progress node
    creation or removal.  ScanAndCompareGarbageCollector is hasNext()
    returns false will delete all ledgers on the bookie, so this is a fairly
    serious bug.
    
    Ruling out other such bugs was fairly dificult with the structure of the
    code as written, so instead use a simpler recursive iterator design with
    simpler pre/post conditions.
    
    Also, surface KeeperException.NoNodeException from
    ZkUtils.getChildrenInSingleNode so that we can actually handle it in
    LHLM.
    
    Master Issue: #865
    
    Author: Samuel Just <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #875 from athanatos/forupstream/issue-865, closes #865
---
 .../apache/bookkeeper/meta/FlatLedgerManager.java  |   3 +
 .../org/apache/bookkeeper/meta/LedgerManager.java  |   2 +-
 .../meta/LegacyHierarchicalLedgerManager.java      |   5 +
 .../meta/LongHierarchicalLedgerManager.java        | 301 +++++++---------
 .../java/org/apache/bookkeeper/util/ZkUtils.java   |   7 +-
 .../bookkeeper/meta/LedgerManagerIteratorTest.java | 390 ++++++++++++++++++++-
 .../bookkeeper/meta/LedgerManagerTestCase.java     |   5 +
 7 files changed, 536 insertions(+), 177 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index a155c4d..26f55dc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -27,6 +27,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,6 +110,8 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
                     zkActiveLedgers = ledgerListToSet(
                             ZkUtils.getChildrenInSingleNode(zk, 
ledgerRootPath), ledgerRootPath);
                     nextRange = new LedgerRange(zkActiveLedgers);
+                } catch (KeeperException.NoNodeException e) {
+                    throw new IOException("Path does not exist: " + 
ledgerRootPath, e);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     throw new IOException("Error when get child nodes from 
zk", ie);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 3d8b693..6b56a3e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -202,7 +202,7 @@ public interface LedgerManager extends Closeable {
         /**
          * Get the next element.
          *
-         * @return the next element.
+         * @return the next element, the LedgerRange returned must be non-empty
          * @throws IOException thrown when there is a problem accessing the 
ledger
          * metadata store. It is critical that it doesn't return false in the 
case
          * in the case it fails to access the ledger metadata store. Otherwise 
it
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index 11e2314..ea85a8d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -261,6 +262,10 @@ class LegacyHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager
             List<String> ledgerNodes = null;
             try {
                 ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (KeeperException.NoNodeException e) {
+                /* If the node doesn't exist, we must have raced with a 
recursive node removal, just
+                 * return an empty list. */
+                ledgerNodes = new ArrayList<>();
             } catch (InterruptedException e) {
                 throw new IOException("Error when get child nodes from zk", e);
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index d9f2c1f..e5bdb1c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -61,9 +61,6 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
     static final Logger LOG = 
LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
 
     static final String IDGEN_ZNODE = "idgen-long";
-    private static final String MAX_ID_SUFFIX = "9999";
-    private static final String MIN_ID_SUFFIX = "0000";
-
 
     /**
      * Constructor.
@@ -95,41 +92,6 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
     // Active Ledger Manager
     //
 
-    /**
-     * Get the smallest cache id in a specified node 
/level0/level1/level2/level3.
-     *
-     * @param level0
-     *            1st level node name
-     * @param level1
-     *            2nd level node name
-     * @param level2
-     *            3rd level node name
-     * @param level3
-     *            4th level node name
-     * @return the smallest ledger id
-     */
-    private long getStartLedgerIdByLevel(String level0, String level1, String 
level2, String level3)
-            throws IOException {
-        return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
-    }
-
-    /**
-     * Get the largest cache id in a specified node 
/level0/level1/level2/level3.
-     *
-     * @param level0
-     *            1st level node name
-     * @param level1
-     *            2nd level node name
-     * @param level2
-     *            3rd level node name
-     * @param level3
-     *            4th level node name
-     * @return the largest ledger id
-     */
-    private long getEndLedgerIdByLevel(String level0, String level1, String 
level2, String level3) throws IOException {
-        return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
-    }
-
     @Override
     public void asyncProcessLedgers(final Processor<Long> processor, final 
AsyncCallback.VoidCallback finalCb,
             final Object context, final int successRc, final int failureRc) {
@@ -188,170 +150,165 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
         return new LongHierarchicalLedgerRangeIterator();
     }
 
+
     /**
-     * Iterator through each metadata bucket with hierarchical mode.
+     * Iterates recursively through each metadata bucket.
      */
     private class LongHierarchicalLedgerRangeIterator implements 
LedgerRangeIterator {
-        private List<Iterator<String>> levelNodesIter;
-        private List<String> curLevelNodes;
-
-        private boolean initialized = false;
-        private boolean iteratorDone = false;
-        private LedgerRange nextRange = null;
+        LedgerRangeIterator rootIterator;
 
-        private LongHierarchicalLedgerRangeIterator() {
-            levelNodesIter = new 
ArrayList<Iterator<String>>(Collections.nCopies(4, (Iterator<String>) null));
-            curLevelNodes = new ArrayList<String>(Collections.nCopies(4, 
(String) null));
+        /**
+         * Returns all children with path as a parent.  If path is 
non-existent,
+         * returns an empty list anyway (after all, there are no children 
there).
+         * Maps all exceptions (other than NoNode) to IOException in keeping 
with
+         * LedgerRangeIterator.
+         *
+         * @param path
+         * @return Iterator into set of all children with path as a parent
+         * @throws IOException
+         */
+        List<String> getChildrenAt(String path) throws IOException {
+            try {
+                List<String> children = ZkUtils.getChildrenInSingleNode(zk, 
path);
+                Collections.sort(children);
+                return children;
+            } catch (KeeperException.NoNodeException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("NoNodeException at path {}, assumed race with 
deletion", path);
+                }
+                return new ArrayList<>();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while reading ledgers at 
path " + path, ie);
+            }
         }
 
-        private synchronized void initialize(String path, int level) throws 
KeeperException, InterruptedException,
-                IOException {
-            List<String> levelNodes = zk.getChildren(path, null);
-            Collections.sort(levelNodes);
-            if (level == 0) {
-                Iterator<String> l0NodesIter = levelNodes.iterator();
-                levelNodesIter.set(0, l0NodesIter);
-                while (l0NodesIter.hasNext()) {
-                    String curL0Node = l0NodesIter.next();
-                    if (!isSpecialZnode(curL0Node)) {
-                        curLevelNodes.set(0, curL0Node);
-                        break;
-                    }
-                }
-            } else {
-                Iterator<String> lNodesIter = levelNodes.iterator();
-                levelNodesIter.set(level, lNodesIter);
-                if (lNodesIter.hasNext()) {
-                    String curLNode = lNodesIter.next();
-                    curLevelNodes.set(level, curLNode);
+        /**
+         * Represents the ledger range rooted at a leaf node, returns at most 
one LedgerRange.
+         */
+        class LeafIterator implements LedgerRangeIterator {
+            // Null iff iteration is complete
+            LedgerRange range;
+
+            LeafIterator(String path) throws IOException {
+                List<String> ledgerLeafNodes = getChildrenAt(path);
+                Set<Long> ledgerIds = ledgerListToSet(ledgerLeafNodes, path);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("All active ledgers from ZK for hash node {}: 
{}", path, ledgerIds);
                 }
+                if (!ledgerIds.isEmpty()) {
+                    range = new LedgerRange(ledgerIds);
+                } // else, hasNext() should return false so that advance will 
skip us and move on
             }
-            String curLNode = curLevelNodes.get(level);
-            if (curLNode != null) {
-                // Traverse down through levels 0-3
-                // The nextRange becomes a listing of the children
-                // in the level4 directory.
-                if (level != 3) {
-                    String nextLevelPath = path + "/" + curLNode;
-                    initialize(nextLevelPath, level + 1);
-                } else {
-                    nextRange = getLedgerRangeByLevel(curLevelNodes);
-                    initialized = true;
+
+            @Override
+            public boolean hasNext() throws IOException {
+                return range != null;
+            }
+
+            @Override
+            public LedgerRange next() throws IOException {
+                if (range == null) {
+                    throw new NoSuchElementException(
+                            "next() must only be called if hasNext() is true");
                 }
-            } else {
-                iteratorDone = true;
+                LedgerRange ret = range;
+                range = null;
+                return ret;
             }
         }
 
-        private void clearHigherLevels(int level) {
-            for (int i = level + 1; i < 4; i++) {
-                curLevelNodes.set(i, null);
+
+        /**
+         * The main constraint is that between calls one of two things must be 
true.
+         * 1) nextLevelIterator is null and thisLevelIterator.hasNext() == 
false: iteration complete, hasNext()
+         *    returns false
+         * 2) nextLevelIterator is non-null: nextLevelIterator.hasNext() must 
return true and nextLevelIterator.next()
+         *    must return the next LedgerRange
+         * The above means that nextLevelIterator != null ==> 
nextLevelIterator.hasNext()
+         * It also means that hasNext() iff nextLevelIterator != null
+         */
+        private class InnerIterator implements LedgerRangeIterator {
+            final String path;
+            final int level;
+
+            // Always non-null
+            final Iterator<String> thisLevelIterator;
+            // non-null iff nextLevelIterator.hasNext() is true
+            LedgerRangeIterator nextLevelIterator;
+
+            /**
+             * Builds InnerIterator.
+             *
+             * @param path Subpath for thisLevelIterator
+             * @param level Level of thisLevelIterator (must be <= 3)
+             * @throws IOException
+             */
+            InnerIterator(String path, int level) throws IOException {
+                this.path = path;
+                this.level = level;
+                thisLevelIterator = getChildrenAt(path).iterator();
+                advance();
             }
-        }
 
-        private synchronized boolean moveToNext(int level) throws 
KeeperException, InterruptedException {
-            Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
-            boolean movedToNextNode = false;
-            if (level == 0) {
-                while (curLevelNodesIter.hasNext()) {
-                    String nextNode = curLevelNodesIter.next();
-                    if (isSpecialZnode(nextNode)) {
+            /**
+             * Resolves the difference between cases 1 and 2 after 
nextLevelIterator is exhausted.
+             * Pre-condition: nextLevelIterator == null, thisLevelIterator != 
null
+             * Post-condition: nextLevelIterator == null && 
!thisLevelIterator.hasNext() OR
+             *                 nextLevelIterator.hasNext() == true and 
nextLevelIterator.next()
+             *                 yields the next result of next()
+             * @throws IOException Exception representing error
+             */
+            void advance() throws IOException {
+                while (thisLevelIterator.hasNext()) {
+                    String node = thisLevelIterator.next();
+                    if (level == 0 && isSpecialZnode(node)) {
                         continue;
-                    } else {
-                        curLevelNodes.set(level, nextNode);
-                        clearHigherLevels(level);
-                        movedToNextNode = true;
-                        break;
                     }
-                }
-            } else {
-                if (curLevelNodesIter.hasNext()) {
-                    String nextNode = curLevelNodesIter.next();
-                    curLevelNodes.set(level, nextNode);
-                    clearHigherLevels(level);
-                    movedToNextNode = true;
-                } else {
-                    movedToNextNode = moveToNext(level - 1);
-                    if (movedToNextNode) {
-                        StringBuilder path = new StringBuilder(ledgerRootPath);
-                        for (int i = 0; i < level; i++) {
-                            path = 
path.append("/").append(curLevelNodes.get(i));
-                        }
-                        List<String> newCurLevelNodesList = 
zk.getChildren(path.toString(), null);
-                        Collections.sort(newCurLevelNodesList);
-                        Iterator<String> newCurLevelNodesIter = 
newCurLevelNodesList.iterator();
-                        levelNodesIter.set(level, newCurLevelNodesIter);
-                        if (newCurLevelNodesIter.hasNext()) {
-                            curLevelNodes.set(level, 
newCurLevelNodesIter.next());
-                            clearHigherLevels(level);
-                            movedToNextNode = true;
-                        }
+                    LedgerRangeIterator nextIterator = level < 3
+                            ? new InnerIterator(path + "/" + node, level + 1)
+                            : new LeafIterator(path + "/" + node);
+                    if (nextIterator.hasNext()) {
+                        nextLevelIterator = nextIterator;
+                        break;
                     }
                 }
             }
-            return movedToNextNode;
-        }
 
-        private synchronized void preload() throws IOException, 
KeeperException, InterruptedException {
-            if (!iteratorDone && !initialized) {
-                initialize(ledgerRootPath, 0);
+            @Override
+            public boolean hasNext() throws IOException {
+                return nextLevelIterator != null;
             }
-            while (((nextRange == null) || (nextRange.size() == 0)) && 
!iteratorDone) {
-                boolean movedToNextNode = moveToNext(3);
-                if (movedToNextNode) {
-                    nextRange = getLedgerRangeByLevel(curLevelNodes);
-                } else {
-                    iteratorDone = true;
+
+            @Override
+            public LedgerRange next() throws IOException {
+                LedgerRange ret = nextLevelIterator.next();
+                if (!nextLevelIterator.hasNext()) {
+                    nextLevelIterator = null;
+                    advance();
                 }
+                return ret;
             }
         }
 
-        @Override
-        public synchronized boolean hasNext() throws IOException {
-            try {
-                preload();
-            } catch (KeeperException ke) {
-                throw new IOException("Error preloading next range", ke);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted while preloading", ie);
+        private LongHierarchicalLedgerRangeIterator() {}
+
+        private void bootstrap() throws IOException {
+            if (rootIterator == null) {
+                rootIterator = new InnerIterator(ledgerRootPath, 0);
             }
-            return nextRange != null && !iteratorDone;
         }
 
         @Override
-        public synchronized LedgerRange next() throws IOException {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            LedgerRange r = nextRange;
-            nextRange = null;
-            return r;
+        public synchronized boolean hasNext() throws IOException {
+            bootstrap();
+            return rootIterator.hasNext();
         }
 
-        private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) 
throws IOException {
-            String level0 = curLevelNodes.get(0);
-            String level1 = curLevelNodes.get(1);
-            String level2 = curLevelNodes.get(2);
-            String level3 = curLevelNodes.get(3);
-
-            StringBuilder nodeBuilder = new StringBuilder();
-            
nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
-                    .append(level2).append("/").append(level3);
-            String nodePath = nodeBuilder.toString();
-            List<String> ledgerNodes = null;
-            try {
-                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
-            } catch (InterruptedException e) {
-                throw new IOException("Error when get child nodes from zk", e);
-            }
-            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, 
nodePath);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node " + level0 
+ "/" + level1 + "/" + level2 + "/"
-                        + level3 + " : " + zkActiveLedgers);
-            }
-            return new 
LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, 
level2, level3), true,
-                    getEndLedgerIdByLevel(level0, level1, level2, level3), 
true));
+        @Override
+        public synchronized LedgerRange next() throws IOException {
+            bootstrap();
+            return rootIterator.next();
         }
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
index 7fceff6..e03777b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
@@ -223,7 +223,7 @@ public class ZkUtils {
      * @throws IOException
      */
     public static List<String> getChildrenInSingleNode(final ZooKeeper zk, 
final String node)
-            throws InterruptedException, IOException {
+            throws InterruptedException, IOException, 
KeeperException.NoNodeException {
         final GetChildrenCtx ctx = new GetChildrenCtx();
         getChildrenInSingleNode(zk, node, new GenericCallback<List<String>>() {
             @Override
@@ -244,11 +244,12 @@ public class ZkUtils {
                 ctx.wait();
             }
         }
-        if (Code.OK.intValue() != ctx.rc) {
+        if (Code.NONODE.intValue() == ctx.rc) {
+            throw new KeeperException.NoNodeException("Got NoNode on call to 
getChildren on path " + node);
+        } else if (Code.OK.intValue() != ctx.rc) {
             throw new IOException("Error on getting children from node " + 
node);
         }
         return ctx.children;
-
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index ce76e3e..239bc0e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -22,11 +22,37 @@
 package org.apache.bookkeeper.meta;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
 import org.junit.Test;
 
+
 /**
  * Test the ledger manager iterator.
  */
@@ -35,6 +61,104 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         super(lmFactoryCls);
     }
 
+    final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+
+    Runnable safeWrapper(Runnable r) {
+        return () -> {
+            try {
+                r.run();
+            } catch (Throwable e) {
+                exceptions.add(e);
+            }
+        };
+    }
+
+    @After
+    public void throwAsyncErrors() throws Throwable {
+        while (exceptions.peek() != null) {
+            throw exceptions.remove();
+        }
+    }
+
+    class RCCheckCB implements GenericCallback<Void> {
+        private final String opType;
+        private final CountDownLatch latch;
+        private final Optional<Integer> rcExpected;
+        private final long ledgerId;
+
+        public RCCheckCB(String opType, CountDownLatch latch, 
Optional<Integer> rcExpected, long ledgerId) {
+            this.opType = opType;
+            this.latch = latch;
+            this.rcExpected = rcExpected;
+            this.ledgerId = ledgerId;
+        }
+
+        @Override
+        public void operationComplete(int rc, Void result) {
+            safeWrapper(() -> {
+                try {
+                    rcExpected.map((Integer expected) -> {
+                        assertEquals(
+                                "Incorrect rc on ledger: " + ledgerId + ", op 
type: " + opType,
+                                expected.longValue(), rc);
+                        return null;
+                    });
+                } finally {
+                    latch.countDown();
+                }
+            }).run();
+        }
+    }
+
+    /**
+     * Remove ledger using lm syncronously.
+     *
+     * @param lm
+     * @param ledgerId
+     * @param rcExpected return value expected, -1 to ignore
+     * @throws InterruptedException
+     */
+    void removeLedger(LedgerManager lm, Long ledgerId, Optional<Integer> 
rcExpected) throws Throwable {
+        CountDownLatch latch = new CountDownLatch(1);
+        lm.removeLedgerMetadata(
+                ledgerId, Version.ANY, new RCCheckCB("removeLedger", latch, 
rcExpected, ledgerId));
+        latch.await();
+        throwAsyncErrors();
+
+    }
+
+    /**
+     * Create ledger using lm syncronously.
+     *
+     * @param lm
+     * @param ledgerId
+     * @param rcExpected return value expected, -1 to ignore
+     * @throws InterruptedException
+     */
+    void createLedger(LedgerManager lm, Long ledgerId, Optional<Integer> 
rcExpected) throws Throwable {
+        LedgerMetadata meta = new LedgerMetadata(
+                3, 3, 2,
+                BookKeeper.DigestType.CRC32, "passwd".getBytes());
+        CountDownLatch latch = new CountDownLatch(1);
+        lm.createLedgerMetadata(
+                ledgerId, meta, new RCCheckCB("createLedger", latch, 
rcExpected, ledgerId));
+        latch.await();
+        throwAsyncErrors();
+    }
+
+    static Set<Long> ledgerRangeToSet(LedgerRangeIterator lri) throws 
IOException {
+        Set<Long> ret = new TreeSet<>();
+        long last = -1;
+        while (lri.hasNext()) {
+            LedgerManager.LedgerRange lr = lri.next();
+            assertFalse("ledger range must not be empty", 
lr.getLedgers().isEmpty());
+            assertTrue("ledger ranges must not overlap", last < lr.start());
+            ret.addAll(lr.getLedgers());
+            last = lr.end();
+        }
+        return ret;
+    }
+
     @Test
     public void testIterateNoLedgers() throws Exception {
         LedgerManager lm = getLedgerManager();
@@ -45,6 +169,270 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
         }
 
         assertEquals(false, lri.hasNext());
-        assertEquals(false, lri.hasNext());
+    }
+
+    @Test
+    public void testSingleLedger() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        long id = 2020202;
+        createLedger(lm, id, Optional.of(BKException.Code.OK));
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> lids = ledgerRangeToSet(lri);
+        assertEquals(lids.size(), 1);
+        assertEquals(lids.iterator().next().longValue(), id);
+    }
+
+    @Test
+    public void testTwoLedgers() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(Arrays.asList(101010101L, 2020340302L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void testSeveralContiguousLedgers() throws Throwable {
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>();
+        for (long i = 0; i < 2000; ++i) {
+            createLedger(lm, i, Optional.of(BKException.Code.OK));
+            ids.add(i);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void testRemovalOfNodeJustTraversed() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        /* For LHLM, first two should be leaves on the same node, second 
should be on adjacent level 4 node
+         * Removing all 3 once the iterator hits the first should result in 
the whole tree path ending
+         * at that node disappearing.  If this happens after the iterator 
stops at that leaf, it should
+         * result in a few NodeExists errors (handled silently) as the 
iterator fails back up the tree
+         * to the next path.
+         */
+        Set<Long> toRemove = new TreeSet<>(
+                Arrays.asList(
+                        3394498498348983841L,
+                        3394498498348983842L,
+                        3394498498348993841L));
+
+        long first = 2345678901234567890L;
+        // Nodes which should be listed anyway
+        Set<Long> mustHave = new TreeSet<>(
+                Arrays.asList(
+                        first,
+                        6334994393848474732L));
+
+        Set<Long> ids = new TreeSet<>();
+        ids.addAll(toRemove);
+        ids.addAll(mustHave);
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        Set<Long> found = new TreeSet<>();
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        while (lri.hasNext()) {
+            LedgerManager.LedgerRange lr = lri.next();
+            found.addAll(lr.getLedgers());
+
+            if (lr.getLedgers().contains(first)) {
+                for (long id: toRemove) {
+                    removeLedger(lm, id, Optional.of(BKException.Code.OK));
+                }
+                toRemove.clear();
+            }
+        }
+
+        for (long id: mustHave) {
+            assertTrue(found.contains(id));
+        }
+    }
+
+    @Test
+    public void validateEmptyL4PathSkipped() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(
+                Arrays.asList(
+                        2345678901234567890L,
+                        3394498498348983841L,
+                        6334994393848474732L,
+                        7349370101927398483L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        String paths[] = {
+                "/ledgers/633/4994/3938/4948", // Empty L4 path, must be 
skipped
+
+        };
+
+        for (String path : paths) {
+            ZkUtils.createFullPathOptimistic(
+                    zkc,
+                    path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+
+        lri = lm.getLedgerRanges();
+        int emptyRanges = 0;
+        while (lri.hasNext()) {
+            if (lri.next().getLedgers().isEmpty()) {
+                emptyRanges++;
+            }
+        }
+        assertEquals(0, emptyRanges);
+    }
+
+    @Test
+    public void testWithSeveralIncompletePaths() throws Throwable {
+        if (baseConf.getLedgerManagerFactoryClass()
+                != LongHierarchicalLedgerManagerFactory.class) {
+            return;
+        }
+        LedgerManager lm = getLedgerManager();
+
+        Set<Long> ids = new TreeSet<>(
+                Arrays.asList(
+                        2345678901234567890L,
+                        3394498498348983841L,
+                        6334994393848474732L,
+                        7349370101927398483L));
+        for (Long id: ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        String paths[] = {
+                "/ledgers/000/0000/0000", // top level, W-4292762
+                "/ledgers/234/5678/9999", // shares two path segments with the 
first one, comes after
+                "/ledgers/339/0000/0000", // shares one path segment with the 
second one, comes first
+                "/ledgers/633/4994/3938/0000", // shares three path segments 
with the third one, comes first
+                "/ledgers/922/3372/0000/0000", // close to max long, at end
+
+        };
+        for (String path : paths) {
+            ZkUtils.createFullPathOptimistic(
+                    zkc,
+                    path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+        assertNotNull(lri);
+        Set<Long> returnedIds = ledgerRangeToSet(lri);
+        assertEquals(ids, returnedIds);
+    }
+
+    @Test
+    public void checkConcurrentModifications() throws Throwable {
+        final int numWriters = 10;
+        final int numCheckers = 10;
+        final int numLedgers = 100;
+        final long runtime = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
+        final boolean longRange =
+                baseConf.getLedgerManagerFactoryClass() == 
LongHierarchicalLedgerManagerFactory.class;
+
+        final Set<Long> mustExist = new TreeSet<>();
+        LedgerManager lm = getLedgerManager();
+        Random rng = new Random();
+        for (int i = 0; i < numLedgers; ++i) {
+            long lid = Math.abs(rng.nextLong());
+            if (!longRange) {
+                lid %= 1000000;
+            }
+            createLedger(lm, lid, Optional.of(BKException.Code.OK));
+            mustExist.add(lid);
+        }
+
+        final long start = MathUtils.nowInNano();
+        final CountDownLatch latch = new CountDownLatch(1);
+        ArrayList<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < numWriters; ++i) {
+            Thread thread = new Thread(safeWrapper(() -> {
+                LedgerManager writerLM = getIndependentLedgerManager();
+                Random writerRNG = new Random(rng.nextLong());
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    fail("Checker interrupted");
+                }
+                while (MathUtils.elapsedNanos(start) < runtime) {
+                    long candidate = 0;
+                    do {
+                        candidate = Math.abs(writerRNG.nextLong());
+                        if (!longRange) {
+                            candidate %= 1000000;
+                        }
+                    } while (mustExist.contains(candidate));
+                    try {
+                        createLedger(writerLM, candidate, Optional.empty());
+                        removeLedger(writerLM, candidate, Optional.empty());
+                    } catch (Throwable e) {
+                        fail("Got exception thrashing store: " + e.toString());
+                    }
+                }
+            }));
+            thread.start();
+            threads.add(thread);
+        }
+
+        for (int i = 0; i < numCheckers; ++i) {
+            Thread thread = new Thread(safeWrapper(() -> {
+                LedgerManager checkerLM = getIndependentLedgerManager();
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    fail("Checker interrupted");
+                    e.printStackTrace();
+                }
+                while (MathUtils.elapsedNanos(start) < runtime) {
+                    try {
+                        LedgerRangeIterator lri = checkerLM.getLedgerRanges();
+                        Set<Long> returnedIds = ledgerRangeToSet(lri);
+                        for (long id: mustExist) {
+                            assertTrue(returnedIds.contains(id));
+                        }
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        fail("Got exception scanning ledgers: " + 
e.toString());
+                    }
+                }
+            }));
+            thread.start();
+            threads.add(thread);
+        }
+
+        latch.countDown();
+        for (Thread thread: threads) {
+            thread.join();
+        }
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index ea02a30..1ecd9f3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -69,6 +69,10 @@ public abstract class LedgerManagerTestCase extends 
BookKeeperClusterTestCase {
         baseClientConf.setLedgerManagerFactoryClass(lmFactoryCls);
     }
 
+    public LedgerManager getIndependentLedgerManager() {
+        return ledgerManagerFactory.newLedgerManager();
+    }
+
     public LedgerManager getLedgerManager() {
         if (null == ledgerManager) {
             ledgerManager = ledgerManagerFactory.newLedgerManager();
@@ -88,6 +92,7 @@ public abstract class LedgerManagerTestCase extends 
BookKeeperClusterTestCase {
         return Arrays.asList(new Object[][] {
             { FlatLedgerManagerFactory.class },
             { HierarchicalLedgerManagerFactory.class },
+            { LegacyHierarchicalLedgerManagerFactory.class },
             { LongHierarchicalLedgerManagerFactory.class },
             { MSLedgerManagerFactory.class },
         });

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to