BOOKKEEPER-552: 64 Bits Ledger ID Generation This PR supersedes #112
Instead of moving LongHierarchicalLedgerManager to HierarchicalLedgerManager, LongHierarchicalLedgerManager is still a stand-alone manager. HierarchicalLedgerManager has been moved to LegacyHierarchicalLedgerManager, and a new HierarchicalLedgerManager class has been put in its place, which is backwards-compatible with the original (now legacy) HierarchicalLedgerManager. This new HierarchicalLedgerManager leverages the new LongZkLedgerIdGenerator to generate Ids, and uses the LongHierarchicalLedgerManager to manage metadata for ledger IDs > 31 bits long. For shorter ledger ids, the LegacyHierarchicalLedgerManager is used, to keep backwards-compatibility. Author: Kyle Nusbaum <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Matteo Merli <[email protected]>, Venkateswararao Jujjuri (JV) <None> Closes #114 from knusbaum/BOOKKEEPER-552 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/057af8db Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/057af8db Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/057af8db Branch: refs/heads/master Commit: 057af8dbce6c08794eb8b46ca52ca13f222d9bbb Parents: 9c79e07 Author: Kyle Nusbaum <[email protected]> Authored: Tue May 2 14:04:57 2017 -0700 Committer: Matteo Merli <[email protected]> Committed: Tue May 2 14:04:57 2017 -0700 ---------------------------------------------------------------------- .../apache/bookkeeper/client/BKException.java | 12 + .../bookkeeper/conf/AbstractConfiguration.java | 3 + .../meta/AbstractHierarchicalLedgerManager.java | 213 +++++++++++ .../meta/AbstractZkLedgerManager.java | 3 +- .../bookkeeper/meta/FlatLedgerManager.java | 6 +- .../meta/HierarchicalLedgerManager.java | 375 +++---------------- .../meta/HierarchicalLedgerManagerFactory.java | 80 +--- .../bookkeeper/meta/LedgerManagerFactory.java | 5 +- .../meta/LegacyHierarchicalLedgerManager.java | 281 ++++++++++++++ .../LegacyHierarchicalLedgerManagerFactory.java | 100 +++++ .../meta/LongHierarchicalLedgerManager.java | 101 +++-- .../meta/LongZkLedgerIdGenerator.java | 333 ++++++++++++++++ .../bookkeeper/meta/ZkLedgerIdGenerator.java | 27 +- .../org/apache/bookkeeper/util/StringUtils.java | 11 +- .../bookkeeper/client/BookieRecoveryTest.java | 6 +- .../meta/TestLongZkLedgerIdGenerator.java | 145 +++++++ .../MultiLedgerManagerMultiDigestTestCase.java | 1 + .../test/MultiLedgerManagerTestCase.java | 1 + .../bookkeeper/test/TestBackwardCompat.java | 53 +++ 19 files changed, 1310 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index 2377c1c..aa3ec08 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -102,6 +102,8 @@ public abstract class BKException extends Exception { return new BKDuplicateEntryIdException(); case Code.TimeoutException: return new BKTimeoutException(); + case Code.LedgerIdOverflowException: + return new BKLedgerIdOverflowException(); default: return new BKUnexpectedConditionException(); } @@ -142,6 +144,8 @@ public abstract class BKException extends Exception { int UnauthorizedAccessException = -102; int UnclosedFragmentException = -103; int WriteOnReadOnlyBookieException = -104; + //-105 reserved for TooManyRequestsException + int LedgerIdOverflowException = -106; // generic exception code used to propagate in replication pipeline int ReplicationException = -200; @@ -210,6 +214,8 @@ public abstract class BKException extends Exception { return "Attempting to use an unclosed fragment; This is not safe"; case Code.WriteOnReadOnlyBookieException: return "Attempting to write on ReadOnly bookie"; + case Code.LedgerIdOverflowException: + return "Next ledgerID is too large."; case Code.ReplicationException: return "Errors in replication pipeline"; case Code.ClientClosedException: @@ -404,4 +410,10 @@ public abstract class BKException extends Exception { super(Code.TimeoutException); } } + + public static class BKLedgerIdOverflowException extends BKException { + public BKLedgerIdOverflowException() { + super(Code.LedgerIdOverflowException); + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 07e5d08..c7c50cd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -76,6 +76,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration { // Zookeeper ACL settings protected final static String ZK_ENABLE_SECURITY = "zkEnableSecurity"; + // Kluge for compatibility testing. Never set this outside tests. + public final static String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck"; + protected AbstractConfiguration() { super(); if (READ_SYSTEM_PROPERTIES) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java new file mode 100644 index 0000000..02359e0 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; +import org.apache.bookkeeper.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException.Code; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractHierarchicalLedgerManager extends AbstractZkLedgerManager { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractHierarchicalLedgerManager.class); + + /** + * Constructor + * + * @param conf + * Configuration object + * @param zk + * ZooKeeper Client Handle + */ + public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { + super(conf, zk); + } + + /** + * Process hash nodes in a given path + */ + void asyncProcessLevelNodes( + final String path, final Processor<String> processor, + final AsyncCallback.VoidCallback finalCb, final Object context, + final int successRc, final int failureRc) { + zk.sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("Error syncing path " + path + " when getting its chidren: ", + KeeperException.create(KeeperException.Code.get(rc), path)); + finalCb.processResult(failureRc, null, context); + return; + } + + zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() { + @Override + public void processResult(int rc, String path, Object ctx, + List<String> levelNodes) { + if (rc != Code.OK.intValue()) { + LOG.error("Error polling hash nodes of " + path, + KeeperException.create(KeeperException.Code.get(rc), path)); + finalCb.processResult(failureRc, null, context); + return; + } + AsyncListProcessor<String> listProcessor = + new AsyncListProcessor<String>(scheduler); + // process its children + listProcessor.process(levelNodes, processor, finalCb, + context, successRc, failureRc); + } + }, null); + } + }, null); + } + + /** + * Process list one by one in asynchronize way. Process will be stopped immediately + * when error occurred. + */ + private static class AsyncListProcessor<T> { + // use this to prevent long stack chains from building up in callbacks + ScheduledExecutorService scheduler; + + /** + * Constructor + * + * @param scheduler + * Executor used to prevent long stack chains + */ + public AsyncListProcessor(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + /** + * Process list of items + * + * @param data + * List of data to process + * @param processor + * Callback to process element of list when success + * @param finalCb + * Final callback to be called after all elements in the list are processed + * @param context + * Context of final callback + * @param successRc + * RC passed to final callback on success + * @param failureRc + * RC passed to final callback on failure + */ + public void process(final List<T> data, final Processor<T> processor, + final AsyncCallback.VoidCallback finalCb, final Object context, + final int successRc, final int failureRc) { + if (data == null || data.size() == 0) { + finalCb.processResult(successRc, null, context); + return; + } + final int size = data.size(); + final AtomicInteger current = new AtomicInteger(0); + AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != successRc) { + // terminal immediately + finalCb.processResult(failureRc, null, context); + return; + } + // process next element + int next = current.incrementAndGet(); + if (next >= size) { // reach the end of list + finalCb.processResult(successRc, null, context); + return; + } + final T dataToProcess = data.get(next); + final AsyncCallback.VoidCallback stub = this; + scheduler.submit(new Runnable() { + @Override + public final void run() { + processor.process(dataToProcess, stub); + } + }); + } + }; + T firstElement = data.get(0); + processor.process(firstElement, stubCallback); + } + } + + // get ledger from all level nodes + long getLedgerId(String...levelNodes) throws IOException { + return StringUtils.stringToHierarchicalLedgerId(levelNodes); + } + + /** + * Get all ledger ids in the given zk path. + * + * @param ledgerNodes + * List of ledgers in the given path + * example:- {L1652, L1653, L1650} + * @param path + * The zookeeper path of the ledger ids. The path should start with {@ledgerRootPath} + * example (with ledgerRootPath = /ledgers):- /ledgers/00/0053 + */ + @Override + protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) { + NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>(); + + if (!path.startsWith(ledgerRootPath)) { + LOG.warn("Ledger path [{}] is not a valid path name, it should start wth {}", path, ledgerRootPath); + return zkActiveLedgers; + } + + long ledgerIdPrefix = 0; + char ch; + for (int i = ledgerRootPath.length() + 1; i < path.length(); i++) { + ch = path.charAt(i); + if (ch < '0' || ch > '9') + continue; + ledgerIdPrefix = ledgerIdPrefix * 10 + (ch - '0'); + } + + for (String ledgerNode : ledgerNodes) { + if (isSpecialZnode(ledgerNode)) { + continue; + } + long ledgerId = ledgerIdPrefix; + for (int i = 0; i < ledgerNode.length(); i++) { + ch = ledgerNode.charAt(i); + if (ch < '0' || ch > '9') + continue; + ledgerId = ledgerId * 10 + (ch - '0'); + } + zkActiveLedgers.add(ledgerId); + } + return zkActiveLedgers; + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index 6db3375..f5a60f6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; @@ -468,7 +469,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher { * Znode Name * @return true if the znode is a special znode otherwise false */ - protected boolean isSpecialZnode(String znode) { + protected static boolean isSpecialZnode(String znode) { if (BookKeeperConstants.AVAILABLE_NODE.equals(znode) || BookKeeperConstants.COOKIE_NODE.equals(znode) || BookKeeperConstants.LAYOUT_ZNODE.equals(znode) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java index 3172247..36db62a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java @@ -87,9 +87,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager { asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc); } - @Override - protected boolean isSpecialZnode(String znode) { - return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode); + + protected static boolean isSpecialZnode(String znode) { + return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || AbstractZkLedgerManager.isSpecialZnode(znode); } @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java index bed1627..309762b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java @@ -18,370 +18,105 @@ package org.apache.bookkeeper.meta; import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.NoSuchElementException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.StringUtils; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.zookeeper.ZooKeeper; /** - * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes. + * HierarchicalLedgerManager makes use of both LongHierarchicalLedgerManager and LegacyHierarchicalLedgerManager + * to extend the 31-bit ledger id range of the LegacyHierarchicalLedgerManager to that of the LongHierarchicalLedgerManager + * while remaining backwards-compatible with the legacy manager. + * + * In order to achieve backwards-compatibility, the HierarchicalLedgerManager forwards requests relating to ledger IDs which + * are < Integer.MAX_INT to the LegacyHierarchicalLedgerManager. The new 5-part directory structure will not appear until a + * ledger with an ID >= Integer.MAX_INT is created. * - * <p> - * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4): - * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre> - * These 3 parts are used to form the actual ledger node path used to store ledger metadata: - * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre> - * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in - * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids - * errors during garbage collection due to lists of children that are too long. + * @see LongHierarchicalLedgerManager + * @see LegacyHierarchicalLedgerManager */ -class HierarchicalLedgerManager extends AbstractZkLedgerManager { - +class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager { static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class); - static final String IDGEN_ZNODE = "idgen"; - private static final String MAX_ID_SUFFIX = "9999"; - private static final String MIN_ID_SUFFIX = "0000"; + LegacyHierarchicalLedgerManager legacyLM; + LongHierarchicalLedgerManager longLM; - /** - * Constructor - * - * @param conf - * Configuration object - * @param zk - * ZooKeeper Client Handle - */ public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { super(conf, zk); + legacyLM = new LegacyHierarchicalLedgerManager(conf, zk); + longLM = new LongHierarchicalLedgerManager (conf, zk); } @Override - public String getLedgerPath(long ledgerId) { - return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId); - } - - @Override - public long getLedgerId(String pathName) throws IOException { - if (!pathName.startsWith(ledgerRootPath)) { - throw new IOException("it is not a valid hashed path name : " + pathName); - } - String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1); - return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath); - } - - // get ledger from all level nodes - long getLedgerId(String...levelNodes) throws IOException { - return StringUtils.stringToHierarchicalLedgerId(levelNodes); - } - - // - // Active Ledger Manager - // - - /** - * Get the smallest cache id in a specified node /level1/level2 - * - * @param level1 - * 1st level node name - * @param level2 - * 2nd level node name - * @return the smallest ledger id - */ - private long getStartLedgerIdByLevel(String level1, String level2) throws IOException { - return getLedgerId(level1, level2, MIN_ID_SUFFIX); - } - - /** - * Get the largest cache id in a specified node /level1/level2 - * - * @param level1 - * 1st level node name - * @param level2 - * 2nd level node name - * @return the largest ledger id - */ - private long getEndLedgerIdByLevel(String level1, String level2) throws IOException { - return getLedgerId(level1, level2, MAX_ID_SUFFIX); - } - - @Override - public void asyncProcessLedgers(final Processor<Long> processor, - final AsyncCallback.VoidCallback finalCb, final Object context, - final int successRc, final int failureRc) { - // process 1st level nodes - asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() { - @Override - public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) { - if (isSpecialZnode(l1Node)) { - cb1.processResult(successRc, null, context); - return; - } - final String l1NodePath = ledgerRootPath + "/" + l1Node; - // process level1 path, after all children of level1 process - // it callback to continue processing next level1 node - asyncProcessLevelNodes(l1NodePath, new Processor<String>() { - @Override - public void process(String l2Node, AsyncCallback.VoidCallback cb2) { - // process level1/level2 path - String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node; - // process each ledger - // after all ledger are processed, cb2 will be call to continue processing next level2 node - asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2, - context, successRc, failureRc); - } - }, cb1, context, successRc, failureRc); - } - }, finalCb, context, successRc, failureRc); - } + public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc, + int failureRc) { + // Process the old 31-bit id ledgers first. + legacyLM.asyncProcessLedgers(processor, new VoidCallback(){ - /** - * Process hash nodes in a given path - */ - void asyncProcessLevelNodes( - final String path, final Processor<String> processor, - final AsyncCallback.VoidCallback finalCb, final Object context, - final int successRc, final int failureRc) { - zk.sync(path, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { - if (rc != Code.OK.intValue()) { - LOG.error("Error syncing path " + path + " when getting its chidren: ", - KeeperException.create(KeeperException.Code.get(rc), path)); - finalCb.processResult(failureRc, null, context); - return; + if(rc == failureRc) { + // If it fails, return the failure code to the callback + finalCb.processResult(rc, path, ctx); + } + else { + // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers + longLM.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc); } - - zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() { - @Override - public void processResult(int rc, String path, Object ctx, - List<String> levelNodes) { - if (rc != Code.OK.intValue()) { - LOG.error("Error polling hash nodes of " + path, - KeeperException.create(KeeperException.Code.get(rc), path)); - finalCb.processResult(failureRc, null, context); - return; - } - AsyncListProcessor<String> listProcessor = - new AsyncListProcessor<String>(scheduler); - // process its children - listProcessor.process(levelNodes, processor, finalCb, - context, successRc, failureRc); - } - }, null); } - }, null); - } - - /** - * Process list one by one in asynchronize way. Process will be stopped immediately - * when error occurred. - */ - private static class AsyncListProcessor<T> { - // use this to prevent long stack chains from building up in callbacks - ScheduledExecutorService scheduler; - /** - * Constructor - * - * @param scheduler - * Executor used to prevent long stack chains - */ - public AsyncListProcessor(ScheduledExecutorService scheduler) { - this.scheduler = scheduler; - } + }, context, successRc, failureRc); + } - /** - * Process list of items - * - * @param data - * List of data to process - * @param processor - * Callback to process element of list when success - * @param finalCb - * Final callback to be called after all elements in the list are processed - * @param context - * Context of final callback - * @param successRc - * RC passed to final callback on success - * @param failureRc - * RC passed to final callback on failure - */ - public void process(final List<T> data, final Processor<T> processor, - final AsyncCallback.VoidCallback finalCb, final Object context, - final int successRc, final int failureRc) { - if (data == null || data.size() == 0) { - finalCb.processResult(successRc, null, context); - return; - } - final int size = data.size(); - final AtomicInteger current = new AtomicInteger(0); - AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (rc != successRc) { - // terminal immediately - finalCb.processResult(failureRc, null, context); - return; - } - // process next element - int next = current.incrementAndGet(); - if (next >= size) { // reach the end of list - finalCb.processResult(successRc, null, context); - return; - } - final T dataToProcess = data.get(next); - final AsyncCallback.VoidCallback stub = this; - scheduler.submit(new Runnable() { - @Override - public final void run() { - processor.process(dataToProcess, stub); - } - }); - } - }; - T firstElement = data.get(0); - processor.process(firstElement, stubCallback); - } + @Override + protected String getLedgerPath(long ledgerId) { + return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId); } @Override - protected boolean isSpecialZnode(String znode) { - return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode); + protected long getLedgerId(String ledgerPath) throws IOException { + // TODO Auto-generated method stub + if (!ledgerPath.startsWith(ledgerRootPath)) { + throw new IOException("it is not a valid hashed path name : " + ledgerPath); + } + String hierarchicalPath = ledgerPath.substring(ledgerRootPath.length() + 1); + return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath); } @Override public LedgerRangeIterator getLedgerRanges() { - return new HierarchicalLedgerRangeIterator(); + LedgerRangeIterator legacyLedgerRangeIterator = legacyLM.getLedgerRanges(); + LedgerRangeIterator longLedgerRangeIterator = longLM.getLedgerRanges(); + return new HierarchicalLedgerRangeIterator(legacyLedgerRangeIterator, longLedgerRangeIterator); } - /** - * Iterator through each metadata bucket with hierarchical mode - */ - private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator { - private Iterator<String> l1NodesIter = null; - private Iterator<String> l2NodesIter = null; - private String curL1Nodes = ""; - private boolean iteratorDone = false; - private LedgerRange nextRange = null; + private static class HierarchicalLedgerRangeIterator implements LedgerRangeIterator { - /** - * iterate next level1 znode - * - * @return false if have visited all level1 nodes - * @throws InterruptedException/KeeperException if error occurs reading zookeeper children - */ - private boolean nextL1Node() throws KeeperException, InterruptedException { - l2NodesIter = null; - while (l2NodesIter == null) { - if (l1NodesIter.hasNext()) { - curL1Nodes = l1NodesIter.next(); - } else { - return false; - } - if (isSpecialZnode(curL1Nodes)) { - continue; - } - List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null); - Collections.sort(l2Nodes); - l2NodesIter = l2Nodes.iterator(); - if (!l2NodesIter.hasNext()) { - l2NodesIter = null; - continue; - } - } - return true; - } + LedgerRangeIterator legacyLedgerRangeIterator; + LedgerRangeIterator longLedgerRangeIterator; - synchronized private void preload() throws IOException { - while (nextRange == null && !iteratorDone) { - boolean hasMoreElements = false; - try { - if (l1NodesIter == null) { - List<String> l1Nodes = zk.getChildren(ledgerRootPath, null); - Collections.sort(l1Nodes); - l1NodesIter = l1Nodes.iterator(); - hasMoreElements = nextL1Node(); - } else if (l2NodesIter == null || !l2NodesIter.hasNext()) { - hasMoreElements = nextL1Node(); - } else { - hasMoreElements = true; - } - } catch (KeeperException ke) { - throw new IOException("Error preloading next range", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while preloading", ie); - } - if (hasMoreElements) { - nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next()); - if (nextRange.size() == 0) { - nextRange = null; - } - } else { - iteratorDone = true; - } - } + HierarchicalLedgerRangeIterator(LedgerRangeIterator legacyLedgerRangeIterator, LedgerRangeIterator longLedgerRangeIterator) { + this.legacyLedgerRangeIterator = legacyLedgerRangeIterator; + this.longLedgerRangeIterator = longLedgerRangeIterator; } @Override - synchronized public boolean hasNext() throws IOException { - preload(); - return nextRange != null && !iteratorDone; + public boolean hasNext() throws IOException { + return legacyLedgerRangeIterator.hasNext() || longLedgerRangeIterator.hasNext(); } @Override - synchronized public LedgerRange next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); + public LedgerRange next() throws IOException { + if(legacyLedgerRangeIterator.hasNext()) { + return legacyLedgerRangeIterator.next(); } - LedgerRange r = nextRange; - nextRange = null; - return r; + return longLedgerRangeIterator.next(); } - /** - * Get a single node level1/level2 - * - * @param level1 - * 1st level node name - * @param level2 - * 2nd level node name - * @throws IOException - */ - LedgerRange getLedgerRangeByLevel(final String level1, final String level2) - throws IOException { - StringBuilder nodeBuilder = new StringBuilder(); - nodeBuilder.append(ledgerRootPath).append("/") - .append(level1).append("/").append(level2); - String nodePath = nodeBuilder.toString(); - List<String> ledgerNodes = null; - try { - ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath); - } catch (InterruptedException e) { - throw new IOException("Error when get child nodes from zk", e); - } - NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath); - if (LOG.isDebugEnabled()) { - LOG.debug("All active ledgers from ZK for hash node " - + level1 + "/" + level2 + " : " + zkActiveLedgers); - } - - return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true, - getEndLedgerIdByLevel(level1, level2), true)); - } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java index 084d73d..a74a633 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java @@ -1,5 +1,3 @@ -package org.apache.bookkeeper.meta; - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,89 +15,25 @@ package org.apache.bookkeeper.meta; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.bookkeeper.meta; -import java.io.IOException; import java.util.List; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZKUtil; -import org.apache.bookkeeper.replication.ReplicationException; -import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; /** - * Hierarchical Ledger Manager Factory + * Legacy Hierarchical Ledger Manager Factory */ -public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory { +public class HierarchicalLedgerManagerFactory extends LegacyHierarchicalLedgerManagerFactory { public static final String NAME = "hierarchical"; - public static final int CUR_VERSION = 1; - - AbstractConfiguration conf; - ZooKeeper zk; - - @Override - public int getCurrentVersion() { - return CUR_VERSION; - } - - @Override - public LedgerManagerFactory initialize(final AbstractConfiguration conf, - final ZooKeeper zk, - final int factoryVersion) - throws IOException { - if (CUR_VERSION != factoryVersion) { - throw new IOException("Incompatible layout version found : " - + factoryVersion); - } - this.conf = conf; - this.zk = zk; - return this; - } - - @Override - public void uninitialize() throws IOException { - // since zookeeper instance is passed from outside - // we don't need to close it here - } - + @Override public LedgerIdGenerator newLedgerIdGenerator() { List<ACL> zkAcls = ZkUtils.getACLs(conf); - return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE, zkAcls); - } - - @Override - public LedgerManager newLedgerManager() { - return new HierarchicalLedgerManager(conf, zk); - } - - @Override - public LedgerUnderreplicationManager newLedgerUnderreplicationManager() - throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{ - return new ZkLedgerUnderreplicationManager(conf, zk); - } - - @Override - public void format(AbstractConfiguration conf, ZooKeeper zk) - throws InterruptedException, KeeperException, IOException { - HierarchicalLedgerManager ledgerManager = (HierarchicalLedgerManager) newLedgerManager(); - try { - String ledgersRootPath = conf.getZkLedgersRootPath(); - List<String> children = zk.getChildren(ledgersRootPath, false); - for (String child : children) { - if (ledgerManager.isSpecialZnode(child)) { - continue; - } - ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child); - } - } finally { - ledgerManager.close(); - } - // Delete and recreate the LAYOUT information. - super.format(conf, zk); + ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls); + return new LongZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LongHierarchicalLedgerManager.IDGEN_ZNODE, subIdGenerator, zkAcls); } - + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java index 76d1572..e7cfc4c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java @@ -164,7 +164,8 @@ public abstract class LedgerManagerFactory { // handle V2 layout case if (factoryClass != null && - !layout.getManagerFactoryClass().equals(factoryClass.getName())) { + !layout.getManagerFactoryClass().equals(factoryClass.getName()) && + conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) { // Disable should ONLY happen during compatibility testing. throw new IOException("Configured layout " + factoryClass.getName() + " does not match existing layout " + layout.getManagerFactoryClass()); @@ -210,6 +211,8 @@ public abstract class LedgerManagerFactory { factoryClass = FlatLedgerManagerFactory.class; } else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) { factoryClass = HierarchicalLedgerManagerFactory.class; + } else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) { + factoryClass = LongHierarchicalLedgerManagerFactory.class; } else { throw new IOException("Unknown ledger manager type: " + lmType); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java new file mode 100644 index 0000000..8be23b2 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; +import org.apache.bookkeeper.util.StringUtils; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes. + * + * <p> + * LegacyHierarchicalLedgerManager splits the generated id into 3 parts (2-4-4): + * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre> + * These 3 parts are used to form the actual ledger node path used to store ledger metadata: + * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre> + * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in + * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids + * errors during garbage collection due to lists of children that are too long. + */ +class LegacyHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager { + + static final Logger LOG = LoggerFactory.getLogger(LegacyHierarchicalLedgerManager.class); + + static final String IDGEN_ZNODE = "idgen"; + private static final String MAX_ID_SUFFIX = "9999"; + private static final String MIN_ID_SUFFIX = "0000"; + + private static final ThreadLocal<StringBuilder> threadLocalNodeBuilder = new ThreadLocal<StringBuilder>() { + @Override + protected StringBuilder initialValue() { + return new StringBuilder(); + } + }; + + /** + * Constructor + * + * @param conf + * Configuration object + * @param zk + * ZooKeeper Client Handle + */ + public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { + super(conf, zk); + } + + @Override + public String getLedgerPath(long ledgerId) { + return ledgerRootPath + StringUtils.getShortHierarchicalLedgerPath(ledgerId); + } + + @Override + public long getLedgerId(String pathName) throws IOException { + if (!pathName.startsWith(ledgerRootPath)) { + throw new IOException("it is not a valid hashed path name : " + pathName); + } + String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1); + return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath); + } + + // + // Active Ledger Manager + // + + /** + * Get the smallest cache id in a specified node /level1/level2 + * + * @param level1 + * 1st level node name + * @param level2 + * 2nd level node name + * @return the smallest ledger id + */ + private long getStartLedgerIdByLevel(String level1, String level2) throws IOException { + return getLedgerId(level1, level2, MIN_ID_SUFFIX); + } + + /** + * Get the largest cache id in a specified node /level1/level2 + * + * @param level1 + * 1st level node name + * @param level2 + * 2nd level node name + * @return the largest ledger id + */ + private long getEndLedgerIdByLevel(String level1, String level2) throws IOException { + return getLedgerId(level1, level2, MAX_ID_SUFFIX); + } + + @Override + public void asyncProcessLedgers(final Processor<Long> processor, + final AsyncCallback.VoidCallback finalCb, final Object context, + final int successRc, final int failureRc) { + // process 1st level nodes + asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() { + @Override + public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) { + if (isSpecialZnode(l1Node)) { + cb1.processResult(successRc, null, context); + return; + } + final String l1NodePath = ledgerRootPath + "/" + l1Node; + // process level1 path, after all children of level1 process + // it callback to continue processing next level1 node + asyncProcessLevelNodes(l1NodePath, new Processor<String>() { + @Override + public void process(String l2Node, AsyncCallback.VoidCallback cb2) { + // process level1/level2 path + String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node; + // process each ledger + // after all ledger are processed, cb2 will be call to continue processing next level2 node + asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2, + context, successRc, failureRc); + } + }, cb1, context, successRc, failureRc); + } + }, finalCb, context, successRc, failureRc); + } + + protected static boolean isSpecialZnode(String znode) { + return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode) || AbstractHierarchicalLedgerManager.isSpecialZnode(znode); + } + + @Override + public LedgerRangeIterator getLedgerRanges() { + return new HierarchicalLedgerRangeIterator(); + } + + /** + * Iterator through each metadata bucket with hierarchical mode + */ + private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator { + private Iterator<String> l1NodesIter = null; + private Iterator<String> l2NodesIter = null; + private String curL1Nodes = ""; + private boolean iteratorDone = false; + private LedgerRange nextRange = null; + + /** + * iterate next level1 znode + * + * @return false if have visited all level1 nodes + * @throws InterruptedException/KeeperException if error occurs reading zookeeper children + */ + private boolean nextL1Node() throws KeeperException, InterruptedException { + l2NodesIter = null; + while (l2NodesIter == null) { + if (l1NodesIter.hasNext()) { + curL1Nodes = l1NodesIter.next(); + } else { + return false; + } + // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes) + if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) { + continue; + } + List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null); + Collections.sort(l2Nodes); + l2NodesIter = l2Nodes.iterator(); + if (!l2NodesIter.hasNext()) { + l2NodesIter = null; + continue; + } + } + return true; + } + + synchronized private void preload() throws IOException { + while (nextRange == null && !iteratorDone) { + boolean hasMoreElements = false; + try { + if (l1NodesIter == null) { + List<String> l1Nodes = zk.getChildren(ledgerRootPath, null); + Collections.sort(l1Nodes); + l1NodesIter = l1Nodes.iterator(); + hasMoreElements = nextL1Node(); + } else if (l2NodesIter == null || !l2NodesIter.hasNext()) { + hasMoreElements = nextL1Node(); + } else { + hasMoreElements = true; + } + } catch (KeeperException ke) { + throw new IOException("Error preloading next range", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while preloading", ie); + } + if (hasMoreElements) { + nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next()); + if (nextRange.size() == 0) { + nextRange = null; + } + } else { + iteratorDone = true; + } + } + } + + @Override + synchronized public boolean hasNext() throws IOException { + preload(); + return nextRange != null && !iteratorDone; + } + + @Override + synchronized public LedgerRange next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + LedgerRange r = nextRange; + nextRange = null; + return r; + } + + /** + * Get a single node level1/level2 + * + * @param level1 + * 1st level node name + * @param level2 + * 2nd level node name + * @throws IOException + */ + LedgerRange getLedgerRangeByLevel(final String level1, final String level2) + throws IOException { + StringBuilder nodeBuilder = threadLocalNodeBuilder.get(); + nodeBuilder.setLength(0); + nodeBuilder.append(ledgerRootPath).append("/") + .append(level1).append("/").append(level2); + String nodePath = nodeBuilder.toString(); + List<String> ledgerNodes = null; + try { + ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath); + } catch (InterruptedException e) { + throw new IOException("Error when get child nodes from zk", e); + } + NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath); + if (LOG.isDebugEnabled()) { + LOG.debug("All active ledgers from ZK for hash node " + + level1 + "/" + level2 + " : " + zkActiveLedgers); + } + + return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true, + getEndLedgerIdByLevel(level1, level2), true)); + } + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java new file mode 100644 index 0000000..1ac4038 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java @@ -0,0 +1,100 @@ +package org.apache.bookkeeper.meta; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.List; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZKUtil; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; + +/** + * Hierarchical Ledger Manager Factory + */ +public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory { + + public static final String NAME = "legacyhierarchical"; + public static final int CUR_VERSION = 1; + + AbstractConfiguration conf; + ZooKeeper zk; + + @Override + public int getCurrentVersion() { + return CUR_VERSION; + } + + @Override + public LedgerManagerFactory initialize(final AbstractConfiguration conf, + final ZooKeeper zk, + final int factoryVersion) + throws IOException { + if (CUR_VERSION != factoryVersion) { + throw new IOException("Incompatible layout version found : " + + factoryVersion); + } + this.conf = conf; + this.zk = zk; + return this; + } + + @Override + public void uninitialize() throws IOException { + // since zookeeper instance is passed from outside + // we don't need to close it here + } + + @Override + public LedgerIdGenerator newLedgerIdGenerator() { + List<ACL> zkAcls = ZkUtils.getACLs(conf); + return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls); + } + + @Override + public LedgerManager newLedgerManager() { + return new HierarchicalLedgerManager(conf, zk); + } + + @Override + public LedgerUnderreplicationManager newLedgerUnderreplicationManager() + throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{ + return new ZkLedgerUnderreplicationManager(conf, zk); + } + + @Override + public void format(AbstractConfiguration conf, ZooKeeper zk) + throws InterruptedException, KeeperException, IOException { + String ledgersRootPath = conf.getZkLedgersRootPath(); + List<String> children = zk.getChildren(ledgersRootPath, false); + for (String child : children) { + if (HierarchicalLedgerManager.isSpecialZnode(child)) { + continue; + } + ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child); + } + // Delete and recreate the LAYOUT information. + super.format(conf, zk); + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java index 990297f..f8f7546 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.NoSuchElementException; +import java.util.TreeSet; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; @@ -37,33 +38,36 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes. + * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 5-level hierarchical znodes. * * <p> * LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4): * * <pre> - * <level1 (3 digits)><level2 (4 digits)><level3 (4 digits)><level4 (4 digits)> - * <level5 (4 digits)> + * <level0 (3 digits)><level1 (4 digits)><level2 (4 digits)><level3 (4 digits)> + * <level4 (4 digits)> * </pre> * * These 5 parts are used to form the actual ledger node path used to store ledger metadata: * * <pre> - * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5) + * (ledgersRootPath) / level0 / level1 / level2 / level3 / L(level4) * </pre> * * E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>, * which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers, * which avoids errors during garbage collection due to lists of children that are too long. + * */ -class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { +class LongHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager { static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class); + static final String IDGEN_ZNODE = "idgen-long"; private static final String MAX_ID_SUFFIX = "9999"; private static final String MIN_ID_SUFFIX = "0000"; + /** * Constructor * @@ -77,11 +81,6 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { } @Override - public String getLedgerPath(long ledgerId) { - return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId); - } - - @Override public long getLedgerId(String pathName) throws IOException { if (!pathName.startsWith(ledgerRootPath)) { throw new IOException("it is not a valid hashed path name : " + pathName); @@ -89,54 +88,68 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1); return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath); } + + @Override + public String getLedgerPath(long ledgerId) { + return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId); + } // // Active Ledger Manager // /** - * Get the smallest cache id in a specified node /level1/level2/level3/level4 + * Get the smallest cache id in a specified node /level0/level1/level2/level3 * - * @param level1 + * @param level0 * 1st level node name - * @param level2 + * @param level1 * 2nd level node name - * @param level3 + * @param level2 * 3rd level node name - * @param level4 + * @param level3 * 4th level node name * @return the smallest ledger id */ - private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4) + private long getStartLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException { - return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX); + return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX); } /** - * Get the largest cache id in a specified node /level1/level2/level3/level4 + * Get the largest cache id in a specified node /level0/level1/level2/level3 * - * @param level1 + * @param level0 * 1st level node name - * @param level2 + * @param level1 * 2nd level node name - * @param level3 + * @param level2 * 3rd level node name - * @param level4 + * @param level3 * 4th level node name * @return the largest ledger id */ - private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException { - return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX); + private long getEndLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException { + return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX); } @Override public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) { + + // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers asyncProcessLevelNodes(ledgerRootPath, new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context, successRc, failureRc); } + protected static boolean isSpecialZnode(String znode) { + // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4) + // are at least 3 characters long. This prevents picking up any old-style + // hierarchical paths (2-4-4) + return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3; + } + private class RecursiveProcessor implements Processor<String> { private final int level; private final String path; @@ -167,7 +180,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { context, successRc, failureRc); } else { // process each ledger after all ledger are processed, cb will be call to continue processing next - // level5 node + // level4 node asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc); } } @@ -194,7 +207,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null)); } - private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException { + synchronized private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException { List<String> levelNodes = zk.getChildren(path, null); Collections.sort(levelNodes); if (level == 0) { @@ -217,6 +230,9 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { } String curLNode = curLevelNodes.get(level); if (curLNode != null) { + // Traverse down through levels 0-3 + // The nextRange becomes a listing of the children + // in the level4 directory. if (level != 3) { String nextLevelPath = path + "/" + curLNode; initialize(nextLevelPath, level + 1); @@ -229,7 +245,13 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { } } - private boolean moveToNext(int level) throws KeeperException, InterruptedException { + private void clearHigherLevels(int level) { + for(int i = level+1; i < 4; i++) { + curLevelNodes.set(i, null); + } + } + + synchronized private boolean moveToNext(int level) throws KeeperException, InterruptedException { Iterator<String> curLevelNodesIter = levelNodesIter.get(level); boolean movedToNextNode = false; if (level == 0) { @@ -239,6 +261,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { continue; } else { curLevelNodes.set(level, nextNode); + clearHigherLevels(level); movedToNextNode = true; break; } @@ -247,6 +270,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { if (curLevelNodesIter.hasNext()) { String nextNode = curLevelNodesIter.next(); curLevelNodes.set(level, nextNode); + clearHigherLevels(level); movedToNextNode = true; } else { movedToNextNode = moveToNext(level - 1); @@ -261,6 +285,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { levelNodesIter.set(level, newCurLevelNodesIter); if (newCurLevelNodesIter.hasNext()) { curLevelNodes.set(level, newCurLevelNodesIter.next()); + clearHigherLevels(level); movedToNextNode = true; } } @@ -306,15 +331,15 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { return r; } - LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException { - String level1 = curLevelNodes.get(0); - String level2 = curLevelNodes.get(1); - String level3 = curLevelNodes.get(2); - String level4 = curLevelNodes.get(3); + private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException { + String level0 = curLevelNodes.get(0); + String level1 = curLevelNodes.get(1); + String level2 = curLevelNodes.get(2); + String level3 = curLevelNodes.get(3); StringBuilder nodeBuilder = new StringBuilder(); - nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/") - .append(level3).append("/").append(level4); + nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/") + .append(level2).append("/").append(level3); String nodePath = nodeBuilder.toString(); List<String> ledgerNodes = null; try { @@ -324,11 +349,11 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager { } NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath); if (LOG.isDebugEnabled()) { - LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/" - + level4 + " : " + zkActiveLedgers); + LOG.debug("All active ledgers from ZK for hash node " + level0 + "/" + level1 + "/" + level2 + "/" + + level3 + " : " + zkActiveLedgers); } - return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true, - getEndLedgerIdByLevel(level1, level2, level3, level4), true)); + return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, level2, level3), true, + getEndLedgerIdByLevel(level0, level1, level2, level3), true)); } } }
