Repository: bookkeeper Updated Branches: refs/heads/master a59bd5687 -> 5662416d8
BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5662416d Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5662416d Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5662416d Branch: refs/heads/master Commit: 5662416d8ecef535fb089baa0a10e0dae08ae805 Parents: a59bd56 Author: Sijie Guo <si...@apache.org> Authored: Tue Sep 15 01:38:26 2015 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Tue Sep 15 01:38:26 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/bookkeeper/client/BKException.java | 9 ++ .../apache/bookkeeper/client/BookKeeper.java | 9 +- .../bookkeeper/client/LedgerCreateOp.java | 41 ++++-- .../bookkeeper/conf/AbstractConfiguration.java | 2 +- .../meta/AbstractZkLedgerManager.java | 28 ++++ .../bookkeeper/meta/CleanupLedgerManager.java | 6 +- .../bookkeeper/meta/FlatLedgerManager.java | 37 ------ .../meta/FlatLedgerManagerFactory.java | 5 + .../meta/HierarchicalLedgerManager.java | 95 +------------- .../meta/HierarchicalLedgerManagerFactory.java | 9 +- .../bookkeeper/meta/LedgerIdGenerator.java | 41 ++++++ .../apache/bookkeeper/meta/LedgerManager.java | 43 +++--- .../bookkeeper/meta/LedgerManagerFactory.java | 8 ++ .../bookkeeper/meta/MSLedgerManagerFactory.java | 130 +++++++------------ .../bookkeeper/meta/ZkLedgerIdGenerator.java | 120 +++++++++++++++++ .../bookkeeper/bookie/CompactionTest.java | 2 +- .../client/TestWatchEnsembleChange.java | 91 +++++++------ .../apache/bookkeeper/meta/GcLedgersTest.java | 40 ++++-- .../bookkeeper/meta/LedgerManagerTestCase.java | 15 ++- .../meta/TestZkLedgerIdGenerator.java | 122 +++++++++++++++++ 21 files changed, 556 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 046fab3..36d4372 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -100,6 +100,8 @@ Trunk (unreleased changes) BOOKKEEPER-760: Don't close PCBC proactively if bookies disappeared from zookeeper znodes (sijie via fpj) + BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie) + bookkeeper-server: BOOKKEEPER-695: Some entry logs are not removed from the bookie storage (Matteo Merli via sijie) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index c5be32d..b43506d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -124,6 +124,7 @@ public abstract class BKException extends Exception { int MetadataVersionException = -17; int MetaStoreException = -18; int ClientClosedException = -19; + int LedgerExistException = -20; int IllegalOpException = -100; int LedgerFencedException = -101; @@ -170,6 +171,8 @@ public abstract class BKException extends Exception { return "Error while using ZooKeeper"; case Code.MetaStoreException: return "Error while using MetaStore"; + case Code.LedgerExistException: + return "Ledger existed"; case Code.LedgerRecoveryException: return "Error while recovering ledger"; case Code.LedgerClosedException: @@ -301,6 +304,12 @@ public abstract class BKException extends Exception { } } + public static class BKLedgerExistException extends BKException { + public BKLedgerExistException() { + super(Code.LedgerExistException); + } + } + public static class BKLedgerRecoveryException extends BKException { public BKLedgerRecoveryException() { super(Code.LedgerRecoveryException); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index f74639b..6fe1371 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.meta.CleanupLedgerManager; +import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.proto.BookieClient; @@ -48,7 +49,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.configuration.ConfigurationException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; -import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.slf4j.Logger; @@ -100,6 +100,7 @@ public class BookKeeper { // Ledger manager responsible for how to store ledger meta data final LedgerManagerFactory ledgerManagerFactory; final LedgerManager ledgerManager; + final LedgerIdGenerator ledgerIdGenerator; // Ensemble Placement Policy final EnsemblePlacementPolicy placementPolicy; @@ -305,6 +306,7 @@ public class BookKeeper { // initialize ledger manager this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk); this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()); + this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator(); } private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf) @@ -333,6 +335,10 @@ public class BookKeeper { return ledgerManager; } + LedgerIdGenerator getLedgerIdGenerator() { + return ledgerIdGenerator; + } + /** * There are 2 digest types that can be used for verification. The CRC32 is * cheap to compute but does not protect against byzantine bookies (i.e., a @@ -809,6 +815,7 @@ public class BookKeeper { // Close ledger manage so all pending metadata requests would be failed // which will reject any incoming metadata requests. ledgerManager.close(); + ledgerIdGenerator.close(); ledgerManagerFactory.uninitialize(); } catch (IOException ie) { LOG.error("Failed to close ledger manager : ", ie); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index 9cda8ca..7c181b5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -39,13 +40,14 @@ import org.slf4j.LoggerFactory; * Encapsulates asynchronous ledger create operation * */ -class LedgerCreateOp implements GenericCallback<Long> { +class LedgerCreateOp implements GenericCallback<Void> { static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class); CreateCallback cb; LedgerMetadata metadata; LedgerHandle lh; + Long ledgerId; Object ctx; byte[] passwd; BookKeeper bk; @@ -60,12 +62,14 @@ class LedgerCreateOp implements GenericCallback<Long> { * BookKeeper object * @param ensembleSize * ensemble size - * @param quorumSize - * quorum size + * @param writeQuorumSize + * write quorum size + * @param ackQuorumSize + * ack quorum size * @param digestType * digest type, either MAC or CRC32 * @param passwd - * passowrd + * password * @param cb * callback implementation * @param ctx @@ -111,16 +115,37 @@ class LedgerCreateOp implements GenericCallback<Long> { */ metadata.addEnsemble(0L, ensemble); - // create a ledger with metadata - bk.getLedgerManager().createLedger(metadata, this); + createLedger(); + } + + void createLedger() { + // generate a ledger id and then create the ledger with metadata + final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator(); + ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() { + @Override + public void operationComplete(int rc, Long ledgerId) { + if (BKException.Code.OK != rc) { + createComplete(rc, null); + return; + } + + LedgerCreateOp.this.ledgerId = ledgerId; + // create a ledger with metadata + bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this); + } + }); } /** * Callback when created ledger. */ @Override - public void operationComplete(int rc, Long ledgerId) { - if (BKException.Code.OK != rc) { + public void operationComplete(int rc, Void result) { + if (BKException.Code.LedgerExistException == rc) { + // retry to generate a new ledger id + createLedger(); + return; + } else if (BKException.Code.OK != rc) { createComplete(rc, null); return; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 3ec2b5a..2e66806 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -178,7 +178,7 @@ public abstract class AbstractConfiguration extends CompositeConfiguration { public String getZkAvailableBookiesPath() { return getZkLedgersRootPath() + "/" + AVAILABLE_NODE; } - + /** * Set the max entries to keep in fragment for re-replication. If fragment * has more entries than this count, then the original fragment will be http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index f3f680d..6636506 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -42,11 +42,14 @@ import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -213,6 +216,31 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher { } } + @Override + public void createLedgerMetadata(final long ledgerId, final LedgerMetadata metadata, + final GenericCallback<Void> ledgerCb) { + String ledgerPath = getLedgerPath(ledgerId); + StringCallback scb = new StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (rc == Code.OK.intValue()) { + // update version + metadata.setVersion(new ZkVersion(0)); + ledgerCb.operationComplete(BKException.Code.OK, null); + } else if (rc == Code.NODEEXISTS.intValue()) { + LOG.warn("Failed to create ledger metadata for {} which already exist", ledgerId); + ledgerCb.operationComplete(BKException.Code.LedgerExistException, null); + } else { + LOG.error("Could not create node for ledger {}", ledgerId, + KeeperException.create(Code.get(rc), path)); + ledgerCb.operationComplete(BKException.Code.ZKException, null); + } + } + }; + ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, scb, null); + } + /** * Removes ledger metadata from ZooKeeper if version matches. * http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java index a7fbcf5..961e0d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java @@ -98,15 +98,15 @@ public class CleanupLedgerManager implements LedgerManager { } @Override - public void createLedger(LedgerMetadata metadata, - GenericCallback<Long> cb) { + public void createLedgerMetadata(long lid, LedgerMetadata metadata, + GenericCallback<Void> cb) { closeLock.readLock().lock(); try { if (closed) { cb.operationComplete(BKException.Code.ClientClosedException, null); return; } - underlying.createLedger(metadata, new CleanupGenericCallback<Long>(cb)); + underlying.createLedgerMetadata(lid, metadata, new CleanupGenericCallback<Void>(cb)); } finally { closeLock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java index 2bc4258..6bd3216 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java @@ -22,19 +22,11 @@ import java.io.IOException; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.conf.AbstractConfiguration; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.AsyncCallback.StringCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +52,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager { * Configuration object * @param zk * ZooKeeper Client Handle - * @param ledgerRootPath - * ZooKeeper Path to store ledger metadata * @throws IOException when version is not compatible */ public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { @@ -71,33 +61,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager { } @Override - public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) { - StringCallback scb = new StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, - String name) { - if (Code.OK.intValue() != rc) { - LOG.error("Could not create node for ledger", - KeeperException.create(KeeperException.Code.get(rc), path)); - cb.operationComplete(BKException.Code.ZKException, null); - } else { - // update znode status - metadata.setVersion(new ZkVersion(0)); - try { - long ledgerId = getLedgerId(name); - cb.operationComplete(BKException.Code.OK, ledgerId); - } catch (IOException ie) { - LOG.error("Could not extract ledger-id from path:" + name, ie); - cb.operationComplete(BKException.Code.ZKException, null); - } - } - } - }; - ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, metadata.serialize(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null); - } - - @Override public String getLedgerPath(long ledgerId) { StringBuilder sb = new StringBuilder(); sb.append(ledgerPrefix) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java index db16d26..46f8b9b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java @@ -64,6 +64,11 @@ public class FlatLedgerManagerFactory extends LedgerManagerFactory { } @Override + public LedgerIdGenerator newLedgerIdGenerator() { + return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), null); + } + + @Override public LedgerManager newLedgerManager() { return new FlatLedgerManager(conf, zk); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java index 7f2df73..bc62af4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java @@ -23,23 +23,16 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.NoSuchElementException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.conf.AbstractConfiguration; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.AsyncCallback.StringCallback; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,10 +41,7 @@ import org.slf4j.LoggerFactory; * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes. * * <p> - * Hierarchical Ledger Manager first obtain a global unique id from zookeeper using a EPHEMERAL_SEQUENTIAL - * znode <i>(ledgersRootPath)/ledgers/idgen/ID-</i>. - * Since zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 (zero) padding, i.e. - * "<path>0000000001", HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4): + * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4): * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre> * These 3 parts are used to form the actual ledger node path used to store ledger metadata: * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre> @@ -64,13 +54,9 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager { static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class); static final String IDGEN_ZNODE = "idgen"; - static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-"; private static final String MAX_ID_SUFFIX = "9999"; private static final String MIN_ID_SUFFIX = "0000"; - // Path to generate global id - private final String idGenPath; - /** * Constructor * @@ -81,83 +67,6 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager { */ public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { super(conf, zk); - - this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX; - } - - @Override - public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) { - ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, final String idPathName) { - if (rc != KeeperException.Code.OK.intValue()) { - LOG.error("Could not generate new ledger id", - KeeperException.create(KeeperException.Code.get(rc), path)); - ledgerCb.operationComplete(BKException.Code.ZKException, null); - return; - } - /* - * Extract ledger id from gen path - */ - long ledgerId; - try { - ledgerId = getLedgerIdFromGenPath(idPathName); - } catch (IOException e) { - LOG.error("Could not extract ledger-id from id gen path:" + path, e); - ledgerCb.operationComplete(BKException.Code.ZKException, null); - return; - } - String ledgerPath = getLedgerPath(ledgerId); - final long lid = ledgerId; - StringCallback scb = new StringCallback() { - @Override - public void processResult(int rc, String path, - Object ctx, String name) { - if (rc != KeeperException.Code.OK.intValue()) { - LOG.error("Could not create node for ledger", - KeeperException.create(KeeperException.Code.get(rc), path)); - ledgerCb.operationComplete(BKException.Code.ZKException, null); - } else { - // update version - metadata.setVersion(new ZkVersion(0)); - ledgerCb.operationComplete(BKException.Code.OK, lid); - } - } - }; - ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null); - // delete the znode for id generation - scheduler.submit(new Runnable() { - @Override - public void run() { - zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (rc != KeeperException.Code.OK.intValue()) { - LOG.warn("Exception during deleting znode for id generation : ", - KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - LOG.debug("Deleting znode for id generation : {}", idPathName); - } - } - }, null); - } - }); - } - }, null); - } - - // get ledger id from generation path - private long getLedgerIdFromGenPath(String nodeName) throws IOException { - long ledgerId; - try { - String parts[] = nodeName.split(IDGENERATION_PREFIX); - ledgerId = Long.parseLong(parts[parts.length - 1]); - } catch (NumberFormatException e) { - throw new IOException(e); - } - return ledgerId; } @Override @@ -304,7 +213,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager { * Callback to process element of list when success * @param finalCb * Final callback to be called after all elements in the list are processed - * @param contxt + * @param context * Context of final callback * @param successRc * RC passed to final callback on success http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java index b843e99..a165b0d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java @@ -64,6 +64,11 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory { } @Override + public LedgerIdGenerator newLedgerIdGenerator() { + return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE); + } + + @Override public LedgerManager newLedgerManager() { return new HierarchicalLedgerManager(conf, zk); } @@ -81,8 +86,7 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory { String ledgersRootPath = conf.getZkLedgersRootPath(); List<String> children = zk.getChildren(ledgersRootPath, false); for (String child : children) { - if (!HierarchicalLedgerManager.IDGEN_ZNODE.equals(child) - && ledgerManager.isSpecialZnode(child)) { + if (ledgerManager.isSpecialZnode(child)) { continue; } ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child); @@ -90,4 +94,5 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory { // Delete and recreate the LAYOUT information. super.format(conf, zk); } + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java new file mode 100644 index 0000000..24d1f01 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.Closeable; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; + +/** + * The interface for global unique ledger ID generation + */ +public interface LedgerIdGenerator extends Closeable { + + /** + * generate a global unique ledger id + * + * @param cb + * Callback when a new ledger id is generated, return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.ZKException} when can't generate new ledger id</li> + * </ul> + */ + public void generateLedgerId(GenericCallback<Long> cb); + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java index 7229028..fe3c2cf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java @@ -42,16 +42,21 @@ import org.apache.bookkeeper.versioning.Version; public interface LedgerManager extends Closeable { /** - * Create a new ledger with provided metadata + * Create a new ledger with provided ledger id and metadata * + * @param ledgerId + * Ledger id provided to be created * @param metadata - * Metadata provided when creating a new ledger + * Metadata provided when creating the new ledger * @param cb - * Callback when creating a new ledger. - * {@link BKException.Code.ZKException} return code when can't generate - * or extract new ledger id + * Callback when creating a new ledger. Return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.LedgerExistException} if given ledger id exist</li> + * <li>{@link BKException.Code.ZKException}/{@link BKException.Code.MetaStoreException} + * for other issue</li> + * </ul> */ - public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb); + public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb); /** * Remove a specified ledger metadata by ledgerId and version. @@ -61,10 +66,12 @@ public interface LedgerManager extends Closeable { * @param version * Ledger metadata version * @param cb - * Callback when removed ledger metadata. - * {@link BKException.Code.MetadataVersionException} return code when version doesn't match, - * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist, - * {@link BKException.Code.ZKException} return code when other issues happen. + * Callback when remove ledger metadata. Return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.MetadataVersionException} if version doesn't match</li> + * <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li> + * <li>{@link BKException.Code.ZKException} for other issue</li> + * </ul> */ public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb); @@ -74,9 +81,11 @@ public interface LedgerManager extends Closeable { * @param ledgerId * Ledger Id * @param readCb - * Callback when read ledger metadata. - * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist, - * {@link BKException.Code.ZKException} return code when other issues happen. + * Callback when read ledger metadata. Return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li> + * <li>{@link BKException.Code.ZKException} for other issue</li> + * </ul> */ public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb); @@ -88,9 +97,11 @@ public interface LedgerManager extends Closeable { * @param metadata * Ledger Metadata to write * @param cb - * Callback when finished writing ledger metadata. - * {@link BKException.Code.MetadataVersionException} return code when version doesn't match, - * {@link BKException.Code.ZKException} return code when other issues happen. + * Callback when finished writing ledger metadata. Return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.MetadataVersionException} if version in metadata doesn't match</li> + * <li>{@link BKException.Code.ZKException} for other issue</li> + * </ul> */ public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java index 7c3cf5c..3a53623 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java @@ -68,6 +68,14 @@ public abstract class LedgerManagerFactory { public abstract void uninitialize() throws IOException; /** + * Return the ledger id generator, which is used for global unique ledger id + * generation. + * + * @return ledger id generator. + */ + public abstract LedgerIdGenerator newLedgerIdGenerator(); + + /** * return ledger manager for client-side to manage ledger metadata. * * @return ledger manager http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java index 2510b89..9f7ef38 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.conf.AbstractConfiguration; -import org.apache.bookkeeper.meta.AbstractZkLedgerManager.ReadLedgerMetadataTask; import org.apache.bookkeeper.metastore.MSException; import org.apache.bookkeeper.metastore.MSWatchedEvent; import org.apache.bookkeeper.metastore.MetaStore; @@ -46,23 +45,20 @@ import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback; import org.apache.bookkeeper.metastore.MetastoreException; import org.apache.bookkeeper.metastore.MetastoreFactory; import org.apache.bookkeeper.metastore.MetastoreScannableTable; +import org.apache.bookkeeper.metastore.MetastoreTable; import org.apache.bookkeeper.metastore.MetastoreTableItem; +import org.apache.bookkeeper.metastore.MetastoreUtils; import org.apache.bookkeeper.metastore.MetastoreWatcher; -import org.apache.bookkeeper.metastore.MSWatchedEvent.EventType; import org.apache.bookkeeper.metastore.Value; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.util.StringUtils; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.AsyncCallback.StringCallback; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +176,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { } } + @Override + public LedgerIdGenerator newLedgerIdGenerator() { + return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), MsLedgerManager.IDGEN_ZNODE); + } + static class MsLedgerManager implements LedgerManager, MetastoreWatcher { final ZooKeeper zk; final AbstractConfiguration conf; @@ -189,15 +190,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { final int maxEntriesPerScan; static final String IDGEN_ZNODE = "ms-idgen"; - static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-"; // ledger metadata listeners protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners = new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>(); - // Path to generate global id - private final String idGenPath; - // we use this to prevent long stack chains from building up in // callbacks ScheduledExecutorService scheduler; @@ -266,7 +263,6 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { // configuration settings maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan(); - this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX; ThreadFactoryBuilder tfb = new ThreadFactoryBuilder() .setNameFormat("MSLedgerManagerScheduler-%d"); this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb @@ -346,76 +342,28 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { } @Override - public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) { - ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, final String idPathName) { - if (rc != KeeperException.Code.OK.intValue()) { - LOG.error("Could not generate new ledger id", - KeeperException.create(KeeperException.Code.get(rc), path)); - ledgerCb.operationComplete(BKException.Code.ZKException, null); - return; - } - /* - * Extract ledger id from gen path - */ - long ledgerId; - try { - ledgerId = getLedgerIdFromGenPath(idPathName); - } catch (IOException e) { - LOG.error("Could not extract ledger-id from id gen path:" + path, e); - ledgerCb.operationComplete(BKException.Code.ZKException, null); - return; - } - - final long lid = ledgerId; - MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() { - @Override - public void complete(int rc, Version version, Object ctx) { - if (MSException.Code.BadVersion.getCode() == rc) { - ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null); - return; - } - if (MSException.Code.OK.getCode() != rc) { - ledgerCb.operationComplete(BKException.Code.MetaStoreException, null); - return; - } - LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid, - version }); - // update version - metadata.setVersion(version); - ledgerCb.operationComplete(BKException.Code.OK, lid); - } - }; - - ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()), - Version.NEW, msCallback, null); - zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (rc != KeeperException.Code.OK.intValue()) { - LOG.warn("Exception during deleting znode for id generation : ", - KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - LOG.debug("Deleting znode for id generation : {}", idPathName); - } - } - }, null); - } - }, null); - } + public void createLedgerMetadata(final long lid, final LedgerMetadata metadata, + final GenericCallback<Void> ledgerCb) { + MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() { + @Override + public void complete(int rc, Version version, Object ctx) { + if (MSException.Code.BadVersion.getCode() == rc) { + ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null); + return; + } + if (MSException.Code.OK.getCode() != rc) { + ledgerCb.operationComplete(BKException.Code.MetaStoreException, null); + return; + } + LOG.debug("Create ledger {} with version {} successfully.", lid, version); + // update version + metadata.setVersion(version); + ledgerCb.operationComplete(BKException.Code.OK, null); + } + }; - // get ledger id from generation path - private long getLedgerIdFromGenPath(String nodeName) throws IOException { - long ledgerId; - try { - String parts[] = nodeName.split(IDGENERATION_PREFIX); - ledgerId = Long.parseLong(parts[parts.length - 1]); - } catch (NumberFormatException e) { - throw new IOException(e); - } - return ledgerId; + ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()), + Version.NEW, msCallback, null); } @Override @@ -688,7 +636,7 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { * @param finalCb * Final callback to be called after all elements in the list * are processed - * @param contxt + * @param context * Context of final callback * @param successRc * RC passed to final callback on success @@ -730,4 +678,24 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory { } } + @Override + public void format(AbstractConfiguration conf, ZooKeeper zk) throws InterruptedException, + KeeperException, IOException { + MetastoreTable ledgerTable; + try { + ledgerTable = metastore.createScannableTable(TABLE_NAME); + } catch (MetastoreException mse) { + throw new IOException("Failed to instantiate table " + TABLE_NAME + " in metastore " + + metastore.getName()); + } + try { + MetastoreUtils.cleanTable(ledgerTable, conf.getMetastoreMaxEntriesPerScan()); + } catch (MSException mse) { + throw new IOException("Exception when cleanning up table " + TABLE_NAME, mse); + } + LOG.info("Finished cleaning up table {}.", TABLE_NAME); + // Delete and recreate the LAYOUT information. + super.format(conf, zk); + } + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java new file mode 100644 index 0000000..a6c5b7b --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.IOException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper based ledger id generator class, which using EPHEMERAL_SEQUENTIAL + * with <i>(ledgerIdGenPath)/ID-</i> prefix to generate ledger id. Note + * zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 + * (zero) padding, i.e. "<path>0000000001", so ledger id space is + * fundamentally limited to 9 billion. + */ +public class ZkLedgerIdGenerator implements LedgerIdGenerator { + static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class); + + final ZooKeeper zk; + final String ledgerIdGenPath; + final String ledgerPrefix; + + public ZkLedgerIdGenerator(ZooKeeper zk, + String ledgersPath, + String idGenZnodeName) { + this.zk = zk; + if (StringUtils.isBlank(idGenZnodeName)) { + this.ledgerIdGenPath = ledgersPath; + } else { + this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName; + } + this.ledgerPrefix = this.ledgerIdGenPath + "/ID-"; + } + + @Override + public void generateLedgerId(final GenericCallback<Long> cb) { + ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, + new StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, final String idPathName) { + if (rc != KeeperException.Code.OK.intValue()) { + LOG.error("Could not generate new ledger id", + KeeperException.create(KeeperException.Code.get(rc), path)); + cb.operationComplete(BKException.Code.ZKException, null); + return; + } + + /* + * Extract ledger id from generated path + */ + long ledgerId; + try { + ledgerId = getLedgerIdFromGenPath(idPathName); + cb.operationComplete(BKException.Code.OK, ledgerId); + } catch (IOException e) { + LOG.error("Could not extract ledger-id from id gen path:" + path, e); + cb.operationComplete(BKException.Code.ZKException, null); + return; + } + + // delete the znode for id generation + zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != KeeperException.Code.OK.intValue()) { + LOG.warn("Exception during deleting znode for id generation : ", + KeeperException.create(KeeperException.Code.get(rc), path)); + } else { + LOG.debug("Deleting znode for id generation : {}", idPathName); + } + } + }, null); + } + }, null); + } + + // get ledger id from generation path + private long getLedgerIdFromGenPath(String nodeName) throws IOException { + long ledgerId; + try { + String parts[] = nodeName.split(ledgerPrefix); + ledgerId = Long.parseLong(parts[parts.length - 1]); + } catch (NumberFormatException e) { + throw new IOException(e); + } + return ledgerId; + } + + @Override + public void close() throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 101fdac..16bd4da 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -465,7 +465,7 @@ public class CompactionTest extends BookKeeperClusterTestCase { private LedgerManager getLedgerManager(final Set<Long> ledgers) { LedgerManager manager = new LedgerManager() { @Override - public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) { + public void createLedgerMetadata(long lid, LedgerMetadata metadata, GenericCallback<Void> cb) { unsupported(); } @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java index eb833a3..df74339 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java @@ -23,11 +23,13 @@ package org.apache.bookkeeper.client; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.meta.FlatLedgerManagerFactory; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.MSLedgerManagerFactory; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.ReflectionUtils; @@ -104,45 +106,54 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase { @Test(timeout = 60000) public void testWatchMetadataRemoval() throws Exception { - LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls); - factory.initialize(baseConf, super.zkc, factory.getCurrentVersion()); - LedgerManager manager = factory.newLedgerManager(); - final ByteBuffer bbLedgerId = ByteBuffer.allocate(8); - final CountDownLatch createLatch = new CountDownLatch(1); - final CountDownLatch removeLatch = new CountDownLatch(1); - - manager.createLedger( new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()), - new BookkeeperInternalCallbacks.GenericCallback<Long>(){ - - @Override - public void operationComplete(int rc, Long result) { - bbLedgerId.putLong(result); - bbLedgerId.flip(); - createLatch.countDown(); - } - }); - assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS)); - final long createdLid = bbLedgerId.getLong(); - - manager.registerLedgerMetadataListener( createdLid, - new LedgerMetadataListener() { - - @Override - public void onChanged( long ledgerId, LedgerMetadata metadata ) { - assertEquals(ledgerId, createdLid); - assertEquals(metadata, null); - removeLatch.countDown(); - } - }); - - manager.removeLedgerMetadata( createdLid, Version.ANY, - new BookkeeperInternalCallbacks.GenericCallback<Void>() { - - @Override - public void operationComplete(int rc, Void result) { - assertEquals(rc, BKException.Code.OK); - } - }); - assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS)); + LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls); + factory.initialize(baseConf, super.zkc, factory.getCurrentVersion()); + final LedgerManager manager = factory.newLedgerManager(); + LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator(); + + final ByteBuffer bbLedgerId = ByteBuffer.allocate(8); + final CountDownLatch createLatch = new CountDownLatch(1); + final CountDownLatch removeLatch = new CountDownLatch(1); + + idGenerator.generateLedgerId(new GenericCallback<Long>() { + @Override + public void operationComplete(int rc, final Long lid) { + manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()), + new BookkeeperInternalCallbacks.GenericCallback<Void>(){ + + @Override + public void operationComplete(int rc, Void result) { + bbLedgerId.putLong(lid); + bbLedgerId.flip(); + createLatch.countDown(); + } + }); + + } + }); + + assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS)); + final long createdLid = bbLedgerId.getLong(); + + manager.registerLedgerMetadataListener( createdLid, + new LedgerMetadataListener() { + + @Override + public void onChanged( long ledgerId, LedgerMetadata metadata ) { + assertEquals(ledgerId, createdLid); + assertEquals(metadata, null); + removeLatch.countDown(); + } + }); + + manager.removeLedgerMetadata( createdLid, Version.ANY, + new BookkeeperInternalCallbacks.GenericCallback<Void>() { + + @Override + public void operationComplete(int rc, Void result) { + assertEquals(rc, BKException.Code.OK); + } + }); + assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index 19aab44..de352b5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.meta; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -63,23 +64,38 @@ public class GcLedgersTest extends LedgerManagerTestCase { /** * Create ledgers */ - private void createLedgers(int numLedgers, final Set<Long> createdLedgers) { + private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException { final AtomicInteger expected = new AtomicInteger(numLedgers); for (int i=0; i<numLedgers; i++) { - getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), - new GenericCallback<Long>() { + getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() { @Override - public void operationComplete(int rc, Long ledgerId) { - if (rc == BKException.Code.OK) { - activeLedgers.put(ledgerId, true); - createdLedgers.add(ledgerId); - } - synchronized (expected) { - int num = expected.decrementAndGet(); - if (num == 0) { - expected.notify(); + public void operationComplete(int rc, final Long ledgerId) { + if (BKException.Code.OK != rc) { + synchronized (expected) { + int num = expected.decrementAndGet(); + if (num == 0) { + expected.notify(); + } } + return; } + + getLedgerManager().createLedgerMetadata(ledgerId, + new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() { + @Override + public void operationComplete(int rc, Void result) { + if (rc == BKException.Code.OK) { + activeLedgers.put(ledgerId, true); + createdLedgers.add(ledgerId); + } + synchronized (expected) { + int num = expected.decrementAndGet(); + if (num == 0) { + expected.notify(); + } + } + } + }); } }); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index b95d2db..dae61bc 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -21,20 +21,19 @@ package org.apache.bookkeeper.meta; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.SnapshotMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test case to run over serveral ledger managers @@ -45,6 +44,7 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { LedgerManagerFactory ledgerManagerFactory; LedgerManager ledgerManager = null; + LedgerIdGenerator ledgerIdGenerator = null; SnapshotMap<Long, Boolean> activeLedgers = null; public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) { @@ -60,6 +60,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { return ledgerManager; } + public LedgerIdGenerator getLedgerIdGenerator() throws IOException { + if (null == ledgerIdGenerator) { + ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator(); + } + return ledgerIdGenerator; + } + @Parameters public static Collection<Object[]> configs() { return Arrays.asList(new Object[][] { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java new file mode 100644 index 0000000..708fbc7 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.TestCase; + +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.test.ZooKeeperUtil; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestZkLedgerIdGenerator extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestZkLedgerIdGenerator.class); + + ZooKeeperUtil zkutil; + ZooKeeper zk; + + LedgerIdGenerator ledgerIdGenerator; + + @Override + @Before + public void setUp() throws Exception { + LOG.info("Setting up test"); + super.setUp(); + + zkutil = new ZooKeeperUtil(); + zkutil.startServer(); + zk = zkutil.getZooKeeperClient(); + + ledgerIdGenerator = new ZkLedgerIdGenerator(zk, + "/test-zk-ledger-id-generator", "idgen"); + } + + @Override + @After + public void tearDown() throws Exception { + LOG.info("Tearing down test"); + ledgerIdGenerator.close(); + zk.close(); + zkutil.killServer(); + + super.tearDown(); + } + + @Test(timeout=60000) + public void testGenerateLedgerId() throws Exception { + // Create *nThread* threads each generate *nLedgers* ledger id, + // and then check there is no identical ledger id. + final int nThread = 2; + final int nLedgers = 2000; + final CountDownLatch countDownLatch = new CountDownLatch(nThread*nLedgers); + + final AtomicInteger errCount = new AtomicInteger(0); + final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>(); + final GenericCallback<Long> cb = new GenericCallback<Long>() { + @Override + public void operationComplete(int rc, Long result) { + if (Code.OK.intValue() == rc) { + ledgerIds.add(result); + } else { + errCount.incrementAndGet(); + } + countDownLatch.countDown(); + } + }; + + long start = System.currentTimeMillis(); + + for (int i = 0; i < nThread; i++) { + new Thread() { + @Override + public void run() { + for (int j = 0; j < nLedgers; j++) { + ledgerIdGenerator.generateLedgerId(cb); + } + } + }.start(); + } + + assertTrue("Wait ledger id generation threads to stop timeout : ", + countDownLatch.await(30, TimeUnit.SECONDS)); + LOG.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(), + System.currentTimeMillis() - start); + assertEquals("Error occur during ledger id generation : ", 0, errCount.get()); + + Set<Long> ledgers = new HashSet<Long>(); + while (!ledgerIds.isEmpty()) { + Long ledger = ledgerIds.poll(); + assertNotNull("Generated ledger id is null : ", ledger); + assertFalse("Ledger id [" + ledger + "] conflict : ", ledgers.contains(ledger)); + ledgers.add(ledger); + } + } + +}