Repository: bookkeeper Updated Branches: refs/heads/master 92722ee9c -> 8729d12be
BOOKKEEPER-883: Test timeout in bookkeeper-benchmark Problem: The BenchReadThroughputLatency is tight with FlatLedgerManager. so lots of assumptions are made based on how the znodes are changed when ledgers are created. There was a change introduced LedgerIdGenerator, which broke the assumptions that made by BenchReadThroughputLatency. Fix: - Use a hashset to cache processed ledgers on reacting on children changes - Remove unpredictable test on next ledger - Fix an error logging on FlatLedgerManager processing ledgers Author: Sijie Guo <[email protected]> Reviewers: Matteo Merli <[email protected]> Closes #10 from sijie/BOOKKEEPER-883 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/8729d12b Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/8729d12b Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/8729d12b Branch: refs/heads/master Commit: 8729d12be50295086e7440dfa5d0256abb7688d5 Parents: 92722ee Author: Sijie Guo <[email protected]> Authored: Mon Feb 8 23:12:03 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Mon Feb 8 23:12:03 2016 -0800 ---------------------------------------------------------------------- .../benchmark/BenchReadThroughputLatency.java | 68 ++++++++++---------- .../bookkeeper/benchmark/TestBenchmark.java | 54 +--------------- .../bookkeeper/meta/FlatLedgerManager.java | 5 ++ .../bookkeeper/meta/ZkLedgerIdGenerator.java | 4 +- 4 files changed, 45 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/8729d12b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java ---------------------------------------------------------------------- diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java index d5baaa4..1cdd564 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java @@ -19,42 +19,34 @@ */ package org.apache.bookkeeper.benchmark; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.AsyncCallback.AddCallback; - +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.PosixParser; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Enumeration; -import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.ArrayList; +import java.util.Set; import java.util.regex.Pattern; import java.util.regex.Matcher; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.cli.ParseException; - import static com.google.common.base.Charsets.UTF_8; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BenchReadThroughputLatency { static final Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class); @@ -90,7 +82,7 @@ public class BenchReadThroughputLatency { try { bk = new BookKeeper(conf); while (true) { - lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, + lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, passwd); long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit); if (lastConfirmed == lastRead) { @@ -154,7 +146,7 @@ public class BenchReadThroughputLatency { @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Options options = new Options(); - options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " + options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " + " Cannot be used with -listen"); options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully"); options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')"); @@ -207,11 +199,12 @@ public class BenchReadThroughputLatency { } } }); + final Set<String> processedLedgers = new HashSet<String>(); try { zk.register(new Watcher() { public void process(WatchedEvent event) { try { - if (event.getState() == Event.KeeperState.SyncConnected + if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.None) { connectedLatch.countDown(); } else if (event.getType() == Event.EventType.NodeCreated @@ -229,22 +222,29 @@ public class BenchReadThroughputLatency { ledgers.add(child); } } - Collections.sort(ledgers, ZK_LEDGER_COMPARE); - String last = ledgers.get(ledgers.size() - 1); - final Matcher m = LEDGER_PATTERN.matcher(last); - if (m.find()) { - int ledgersLeft = numLedgers.decrementAndGet(); - Thread t = new Thread() { - public void run() { - readLedger(conf, Long.valueOf(m.group(1)), passwd); + for (String ledger : ledgers) { + synchronized (processedLedgers) { + if (processedLedgers.contains(ledger)) { + continue; + } + final Matcher m = LEDGER_PATTERN.matcher(ledger); + if (m.find()) { + int ledgersLeft = numLedgers.decrementAndGet(); + final Long ledgerId = Long.valueOf(m.group(1)); + processedLedgers.add(ledger); + Thread t = new Thread() { + public void run() { + readLedger(conf, ledgerId, passwd); + } + }; + t.start(); + if (ledgersLeft <= 0) { + shutdownLatch.countDown(); } - }; - t.start(); - if (ledgersLeft <= 0) { - shutdownLatch.countDown(); + } else { + LOG.error("Cant file ledger id in {}", ledger); + } } - } else { - LOG.error("Cant file ledger id in {}", last); } } else { LOG.warn("Unknown event {}", event); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/8729d12b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java ---------------------------------------------------------------------- diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java index ec3cd61..f5108ec 100644 --- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java +++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java @@ -19,29 +19,16 @@ */ package org.apache.bookkeeper.benchmark; -import org.junit.BeforeClass; -import org.junit.AfterClass; -import org.junit.Test; -import org.junit.Assert; - import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.LocalBookKeeper; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class TestBenchmark extends BookKeeperClusterTestCase { @@ -114,7 +101,7 @@ public class TestBenchmark extends BookKeeperClusterTestCase { if (!t.isAlive()) { break; } - Thread.sleep(1000); // wait for 10 seconds for reading to finish + Thread.sleep(100); } Assert.assertFalse("Thread should be finished", t.isAlive()); @@ -122,40 +109,5 @@ public class TestBenchmark extends BookKeeperClusterTestCase { BenchReadThroughputLatency.main(new String[] { "--zookeeper", zkUtil.getZooKeeperConnectString(), "--ledger", String.valueOf(lastLedgerId)}); - - final long nextLedgerId = lastLedgerId+1; - t = new Thread() { - public void run() { - try { - BenchReadThroughputLatency.main(new String[] { - "--zookeeper", zkUtil.getZooKeeperConnectString(), - "--ledger", String.valueOf(nextLedgerId)}); - } catch (Throwable t) { - LOG.error("Error reading", t); - threwException.set(true); - } - } - }; - t.start(); - - Assert.assertTrue("Thread should be running", t.isAlive()); - BookKeeper bk = new BookKeeper(zkUtil.getZooKeeperConnectString()); - LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes()); - try { - for (int j = 0; j < 100; j++) { - lh.addEntry(data); - } - } finally { - lh.close(); - bk.close(); - } - for (int i = 0; i < 60; i++) { - if (!t.isAlive()) { - break; - } - Thread.sleep(1000); // wait for 10 seconds for reading to finish - } - Assert.assertFalse("Thread should be finished", t.isAlive()); - Assert.assertFalse("A thread has thrown an exception, check logs", threwException.get()); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/8729d12b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java index 6bd3216..3172247 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java @@ -88,6 +88,11 @@ class FlatLedgerManager extends AbstractZkLedgerManager { } @Override + protected boolean isSpecialZnode(String znode) { + return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode); + } + + @Override public LedgerRangeIterator getLedgerRanges() { return new LedgerRangeIterator() { // single iterator, can visit only one time http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/8729d12b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java index a6c5b7b..b54c891 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java @@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory; public class ZkLedgerIdGenerator implements LedgerIdGenerator { static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class); + static final String LEDGER_ID_GEN_PREFIX = "ID-"; + final ZooKeeper zk; final String ledgerIdGenPath; final String ledgerPrefix; @@ -55,7 +57,7 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator { } else { this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName; } - this.ledgerPrefix = this.ledgerIdGenPath + "/ID-"; + this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX; } @Override
