Repository: bookkeeper Updated Branches: refs/heads/branch-4.3 ebbb68ccd -> 86538192a
BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Sijie via mmerli) [missing files] Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/86538192 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/86538192 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/86538192 Branch: refs/heads/branch-4.3 Commit: 86538192a08b8629e9c1487dcc9afae95a51705f Parents: ebbb68c Author: Matteo Merli <[email protected]> Authored: Tue Nov 17 11:30:45 2015 -0800 Committer: Matteo Merli <[email protected]> Committed: Tue Nov 17 11:30:45 2015 -0800 ---------------------------------------------------------------------- .../bookkeeper/meta/LedgerIdGenerator.java | 41 +++++++ .../bookkeeper/meta/ZkLedgerIdGenerator.java | 120 ++++++++++++++++++ .../meta/TestZkLedgerIdGenerator.java | 122 +++++++++++++++++++ 3 files changed, 283 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/86538192/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java new file mode 100644 index 0000000..24d1f01 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.Closeable; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; + +/** + * The interface for global unique ledger ID generation + */ +public interface LedgerIdGenerator extends Closeable { + + /** + * generate a global unique ledger id + * + * @param cb + * Callback when a new ledger id is generated, return code:<ul> + * <li>{@link BKException.Code.OK} if success</li> + * <li>{@link BKException.Code.ZKException} when can't generate new ledger id</li> + * </ul> + */ + public void generateLedgerId(GenericCallback<Long> cb); + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/86538192/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java new file mode 100644 index 0000000..a6c5b7b --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.IOException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper based ledger id generator class, which using EPHEMERAL_SEQUENTIAL + * with <i>(ledgerIdGenPath)/ID-</i> prefix to generate ledger id. Note + * zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 + * (zero) padding, i.e. "<path>0000000001", so ledger id space is + * fundamentally limited to 9 billion. + */ +public class ZkLedgerIdGenerator implements LedgerIdGenerator { + static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class); + + final ZooKeeper zk; + final String ledgerIdGenPath; + final String ledgerPrefix; + + public ZkLedgerIdGenerator(ZooKeeper zk, + String ledgersPath, + String idGenZnodeName) { + this.zk = zk; + if (StringUtils.isBlank(idGenZnodeName)) { + this.ledgerIdGenPath = ledgersPath; + } else { + this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName; + } + this.ledgerPrefix = this.ledgerIdGenPath + "/ID-"; + } + + @Override + public void generateLedgerId(final GenericCallback<Long> cb) { + ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, + new StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, final String idPathName) { + if (rc != KeeperException.Code.OK.intValue()) { + LOG.error("Could not generate new ledger id", + KeeperException.create(KeeperException.Code.get(rc), path)); + cb.operationComplete(BKException.Code.ZKException, null); + return; + } + + /* + * Extract ledger id from generated path + */ + long ledgerId; + try { + ledgerId = getLedgerIdFromGenPath(idPathName); + cb.operationComplete(BKException.Code.OK, ledgerId); + } catch (IOException e) { + LOG.error("Could not extract ledger-id from id gen path:" + path, e); + cb.operationComplete(BKException.Code.ZKException, null); + return; + } + + // delete the znode for id generation + zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != KeeperException.Code.OK.intValue()) { + LOG.warn("Exception during deleting znode for id generation : ", + KeeperException.create(KeeperException.Code.get(rc), path)); + } else { + LOG.debug("Deleting znode for id generation : {}", idPathName); + } + } + }, null); + } + }, null); + } + + // get ledger id from generation path + private long getLedgerIdFromGenPath(String nodeName) throws IOException { + long ledgerId; + try { + String parts[] = nodeName.split(ledgerPrefix); + ledgerId = Long.parseLong(parts[parts.length - 1]); + } catch (NumberFormatException e) { + throw new IOException(e); + } + return ledgerId; + } + + @Override + public void close() throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/86538192/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java new file mode 100644 index 0000000..708fbc7 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.TestCase; + +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.test.ZooKeeperUtil; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestZkLedgerIdGenerator extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestZkLedgerIdGenerator.class); + + ZooKeeperUtil zkutil; + ZooKeeper zk; + + LedgerIdGenerator ledgerIdGenerator; + + @Override + @Before + public void setUp() throws Exception { + LOG.info("Setting up test"); + super.setUp(); + + zkutil = new ZooKeeperUtil(); + zkutil.startServer(); + zk = zkutil.getZooKeeperClient(); + + ledgerIdGenerator = new ZkLedgerIdGenerator(zk, + "/test-zk-ledger-id-generator", "idgen"); + } + + @Override + @After + public void tearDown() throws Exception { + LOG.info("Tearing down test"); + ledgerIdGenerator.close(); + zk.close(); + zkutil.killServer(); + + super.tearDown(); + } + + @Test(timeout=60000) + public void testGenerateLedgerId() throws Exception { + // Create *nThread* threads each generate *nLedgers* ledger id, + // and then check there is no identical ledger id. + final int nThread = 2; + final int nLedgers = 2000; + final CountDownLatch countDownLatch = new CountDownLatch(nThread*nLedgers); + + final AtomicInteger errCount = new AtomicInteger(0); + final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>(); + final GenericCallback<Long> cb = new GenericCallback<Long>() { + @Override + public void operationComplete(int rc, Long result) { + if (Code.OK.intValue() == rc) { + ledgerIds.add(result); + } else { + errCount.incrementAndGet(); + } + countDownLatch.countDown(); + } + }; + + long start = System.currentTimeMillis(); + + for (int i = 0; i < nThread; i++) { + new Thread() { + @Override + public void run() { + for (int j = 0; j < nLedgers; j++) { + ledgerIdGenerator.generateLedgerId(cb); + } + } + }.start(); + } + + assertTrue("Wait ledger id generation threads to stop timeout : ", + countDownLatch.await(30, TimeUnit.SECONDS)); + LOG.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(), + System.currentTimeMillis() - start); + assertEquals("Error occur during ledger id generation : ", 0, errCount.get()); + + Set<Long> ledgers = new HashSet<Long>(); + while (!ledgerIds.isEmpty()) { + Long ledger = ledgerIds.poll(); + assertNotNull("Generated ledger id is null : ", ledger); + assertFalse("Ledger id [" + ledger + "] conflict : ", ledgers.contains(ledger)); + ledgers.add(ledger); + } + } + +}
