Repository: bookkeeper Updated Branches: refs/heads/master 40af841a3 -> dd575a16d
BOOKKEEPER-895: GC ledgers that are no longer a part of the ensemble Author: Siddharth Boobna <[email protected]> Reviewers: Matteo Merli <[email protected]>, Guo Sijie <[email protected]> Closes #25 from sboobna/BOOKKEEPER-895 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/dd575a16 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/dd575a16 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/dd575a16 Branch: refs/heads/master Commit: dd575a16decbeaab1f230db271bc400d7e434216 Parents: 40af841 Author: Siddharth Boobna <[email protected]> Authored: Wed Apr 27 00:55:19 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Wed Apr 27 00:55:19 2016 -0700 ---------------------------------------------------------------------- bookkeeper-server/conf/bk_server.conf | 5 + .../bookie/GarbageCollectorThread.java | 2 +- .../bookie/ScanAndCompareGarbageCollector.java | 128 +++++++++- .../bookkeeper/conf/ServerConfiguration.java | 32 ++- .../meta/ZkLedgerUnderreplicationManager.java | 119 +++++---- .../bookkeeper/util/BookKeeperConstants.java | 1 + .../bookie/TestGcOverreplicatedLedger.java | 240 +++++++++++++++++++ .../apache/bookkeeper/meta/GcLedgersTest.java | 119 +-------- .../bookkeeper/meta/LedgerManagerTestCase.java | 118 ++++++++- .../test/BookKeeperClusterTestCase.java | 17 ++ 10 files changed, 619 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/conf/bk_server.conf ---------------------------------------------------------------------- diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf index f73e633..b3f1637 100644 --- a/bookkeeper-server/conf/bk_server.conf +++ b/bookkeeper-server/conf/bk_server.conf @@ -162,6 +162,11 @@ ledgerDirectories=/tmp/bk-data # interval if there is enough disk capacity. # gcWaitTime=1000 +# How long the interval to trigger next garbage collection of overreplicated +# ledgers, in milliseconds [Default: 1 day]. This should not be run very frequently since we read +# the metadata for all the ledgers on the bookie from zk +# gcOverreplicatedLedgerWaitTime=86400000 + # How long the interval to flush ledger index pages to disk, in milliseconds # Flushing index files will introduce much random disk I/O. # If separating journal dir and ledger dirs each on different devices, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4d7da25..2821ec8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -244,7 +244,7 @@ public class GarbageCollectorThread extends BookieThread { } }; - this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage); + this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf); // compaction parameters minorCompactionThreshold = conf.getMinorCompactionThreshold(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index 2b4e3c0..05cd958 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -21,12 +21,27 @@ package org.apache.bookkeeper.bookie; +import java.io.IOException; +import java.util.ArrayList; import java.util.NavigableSet; import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,12 +64,32 @@ import com.google.common.collect.Sets; public class ScanAndCompareGarbageCollector implements GarbageCollector{ static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class); + static final int MAX_CONCURRENT_ZK_REQUESTS = 1000; + private final LedgerManager ledgerManager; private final CompactableLedgerStorage ledgerStorage; - - public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage) { + private final ServerConfiguration conf; + private final BookieSocketAddress selfBookieAddress; + private ZooKeeper zk = null; + private boolean enableGcOverReplicatedLedger; + private final long gcOverReplicatedLedgerIntervalMillis; + private long lastOverReplicatedLedgerGcTimeMillis; + private final String zkLedgersRootPath; + + public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage, + ServerConfiguration conf) throws IOException { this.ledgerManager = ledgerManager; this.ledgerStorage = ledgerStorage; + this.conf = conf; + this.selfBookieAddress = Bookie.getBookieAddress(conf); + this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis(); + this.lastOverReplicatedLedgerGcTimeMillis = MathUtils.now(); + if (gcOverReplicatedLedgerIntervalMillis > 0) { + this.enableGcOverReplicatedLedger = true; + } + this.zkLedgersRootPath = conf.getZkLedgersRootPath(); + LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval=" + + gcOverReplicatedLedgerIntervalMillis); } @Override @@ -75,6 +110,22 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{ long lastEnd = -1; + long curTime = MathUtils.now(); + boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime + - lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis); + if (checkOverreplicatedLedgers) { + zk = ZooKeeperClient.newBuilder().connectString(conf.getZkServers()) + .sessionTimeoutMs(conf.getZkTimeout()).build(); + // remove all the overreplicated ledgers from the local bookie + Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner); + if (overReplicatedLedgers.isEmpty()) { + LOG.info("No over-replicated ledgers found."); + } else { + LOG.info("Removed over-replicated ledgers: {}", overReplicatedLedgers); + } + lastOverReplicatedLedgerGcTimeMillis = MathUtils.now(); + } + while(ledgerRangeIterator.hasNext()) { LedgerRange lRange = ledgerRangeIterator.next(); @@ -100,8 +151,79 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{ } catch (Exception e) { // ignore exception, collecting garbage next time LOG.warn("Exception when iterating over the metadata {}", e); + } finally { + if (zk != null) { + try { + zk.close(); + } catch (InterruptedException e) { + LOG.error("Error closing zk session", e); + } + zk = null; + } } } -} + private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner) + throws InterruptedException, KeeperException { + final Set<Long> overReplicatedLedgers = Sets.newHashSet(); + final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS); + final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size()); + for (final Long ledgerId : bkActiveledgers) { + try { + // check if the ledger is being replicated already by the replication worker + if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) { + latch.countDown(); + continue; + } + // we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is + // already being checked for deletion, since that might change the ledger ensemble to include the + // current bookie again and, in that case, we cannot remove the ledger from local storage + ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId); + semaphore.acquire(); + ledgerManager.readLedgerMetadata(ledgerId, new GenericCallback<LedgerMetadata>() { + + @Override + public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { + if (rc == BKException.Code.OK) { + // do not delete a ledger that is not closed, since the ensemble might change again and + // include the current bookie while we are deleting it + if (!ledgerMetadata.isClosed()) { + release(); + return; + } + SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles(); + for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) { + // check if this bookie is supposed to have this ledger + if (ensemble.contains(selfBookieAddress)) { + release(); + return; + } + } + // this bookie is not supposed to have this ledger, thus we can delete this ledger now + overReplicatedLedgers.add(ledgerId); + garbageCleaner.clean(ledgerId); + } + release(); + } + private void release() { + semaphore.release(); + latch.countDown(); + try { + ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(zk, zkLedgersRootPath, + ledgerId); + } catch (Exception e) { + LOG.error("Error removing underreplicated lock for ledger {}", ledgerId, e); + } + } + }); + } catch (Exception e) { + LOG.error("Exception when iterating through the ledgers to check for over-replication", e); + latch.countDown(); + } + } + latch.await(); + bkActiveledgers.removeAll(overReplicatedLedgers); + return overReplicatedLedgers; + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index c717cd8..7d9b697 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -19,8 +19,10 @@ package org.apache.bookkeeper.conf; import java.io.File; import java.util.List; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.Beta; + import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -51,6 +53,7 @@ public class ServerConfiguration extends AbstractConfiguration { // Gc Parameters protected final static String GC_WAIT_TIME = "gcWaitTime"; protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = "isForceGCAllowWhenNoSpace"; + protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime"; // Sync Parameters protected final static String FLUSH_INTERVAL = "flushInterval"; // Bookie death watch interval @@ -210,10 +213,37 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Get wait time in millis for garbage collection of overreplicated ledgers + * + * @return gc wait time + */ + public long getGcOverreplicatedLedgerWaitTimeMillis() { + return this.getLong(GC_OVERREPLICATED_LEDGER_WAIT_TIME, TimeUnit.DAYS.toMillis(1)); + } + + /** + * Set wait time for garbage collection of overreplicated ledgers. Default: 1 day + * + * A ledger can be overreplicated under the following circumstances: + * 1. The ledger with few entries has bk1 and bk2 as its ensemble. + * 2. bk1 crashes. + * 3. bk3 replicates the ledger from bk2 and updates the ensemble to bk2 and bk3. + * 4. bk1 comes back up. + * 5. Now there are 3 copies of the ledger. + * + * @param gcWaitTime + * @return server configuration + */ + public ServerConfiguration setGcOverreplicatedLedgerWaitTime(long gcWaitTime, TimeUnit unit) { + this.setProperty(GC_OVERREPLICATED_LEDGER_WAIT_TIME, Long.toString(unit.toMillis(gcWaitTime))); + return this; + } + + /** * Get flush interval. Default value is 10 second. It isn't useful to decrease * this value, since ledger storage only checkpoints when an entry logger file * is rolled. - * + * * @return flush interval */ public int getFlushInterval() { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index a4600bc..c49c5a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -18,51 +18,45 @@ package org.apache.bookkeeper.meta; +import static com.google.common.base.Charsets.UTF_8; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.net.DNS; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat; +import org.apache.bookkeeper.proto.DataFormats.LockDataFormat; +import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; import org.apache.bookkeeper.replication.ReplicationEnableCb; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.ZkUtils; - -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat; -import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; -import org.apache.bookkeeper.proto.DataFormats.LockDataFormat; -import org.apache.bookkeeper.conf.AbstractConfiguration; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.TextFormat; import com.google.common.base.Joiner; -import static com.google.common.base.Charsets.UTF_8; - -import java.net.UnknownHostException; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.List; -import java.util.Collections; -import java.util.Arrays; -import java.util.Deque; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Queue; -import java.util.ArrayList; - -import java.util.regex.Pattern; -import java.util.regex.Matcher; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.protobuf.TextFormat; /** * ZooKeeper implementation of underreplication manager. @@ -84,6 +78,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa static final String LAYOUT="BASIC"; static final int LAYOUT_VERSION=1; + public static final byte[] LOCK_DATA = getLockData(); + private static class Lock { private final String lockZNode; private final int ledgerZNodeVersion; @@ -103,22 +99,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa private final String urLedgerPath; private final String urLockPath; private final String layoutZNode; - private final LockDataFormat lockData; private final ZooKeeper zkc; public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc) throws KeeperException, InterruptedException, ReplicationException.CompatibilityException { - basePath = conf.getZkLedgersRootPath() + '/' - + BookKeeperConstants.UNDER_REPLICATION_NODE; + basePath = getBasePath(conf.getZkLedgersRootPath()); layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE; urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; - urLockPath = basePath + "/locks"; + urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK; idExtractionPattern = Pattern.compile("urL(\\d+)$"); this.zkc = zkc; + checkLayout(); + } + + public static String getBasePath(String rootPath) { + return String.format("%s/%s", rootPath, BookKeeperConstants.UNDER_REPLICATION_NODE); + } + + public static String getUrLockPath(String rootPath) { + return String.format("%s/%s", getBasePath(rootPath), BookKeeperConstants.UNDER_REPLICATION_LOCK); + } + + public static byte[] getLockData() { LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder(); try { lockDataBuilder.setBookieId(DNS.getDefaultHost("default")); @@ -126,9 +132,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // if we cant get the address, ignore. it's optional // in the data structure in any case } - lockData = lockDataBuilder.build(); - - checkLayout(); + return TextFormat.printToString(lockDataBuilder.build()).getBytes(UTF_8); } private void checkLayout() @@ -212,6 +216,10 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa return String.format("%s/urL%010d", getParentZnodePath(base, ledgerId), ledgerId); } + public static String getUrLedgerLockZnode(String base, long ledgerId) { + return String.format("%s/urL%010d", base, ledgerId); + } + private String getUrLedgerZnode(long ledgerId) { return getUrLedgerZnode(urLedgerPath, ledgerId); } @@ -399,8 +407,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } long ledgerId = getLedgerId(tryChild); - zkc.create(lockPath, TextFormat.printToString(lockData).getBytes(UTF_8), - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zkc.create(lockPath, LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion())); return ledgerId; } catch (KeeperException.NodeExistsException nee) { @@ -627,4 +634,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa "Interrupted while contacting zookeeper", ie); } } + + /** + * Check whether the ledger is being replicated by any bookie + */ + public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) + throws KeeperException, + InterruptedException { + return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null; + } + + /** + * Acquire the underreplicated ledger lock + */ + public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) + throws KeeperException, InterruptedException { + ZkUtils.createFullPathOptimistic(zkc, getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), + LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + + /** + * Release the underreplicated ledger lock if it exists + */ + public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) + throws InterruptedException, KeeperException { + if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) { + zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1); + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java index 2c2ba38..987de7a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java @@ -38,6 +38,7 @@ public class BookKeeperConstants { public static final String AVAILABLE_NODE = "available"; public static final String COOKIE_NODE = "cookies"; public static final String UNDER_REPLICATION_NODE = "underreplication"; + public static final String UNDER_REPLICATION_LOCK = "locks"; public static final String DISABLE_NODE = "disable"; public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers"; public static final String LAYOUT_ZNODE = "LAYOUT"; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java new file mode 100644 index 0000000..5004817 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java @@ -0,0 +1,240 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.FlatLedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerManagerTestCase; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.util.SnapshotMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class TestGcOverreplicatedLedger extends LedgerManagerTestCase { + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + ledgerManager = ledgerManagerFactory.newLedgerManager(); + activeLedgers = new SnapshotMap<Long, Boolean>(); + } + + public TestGcOverreplicatedLedger(Class<? extends LedgerManagerFactory> lmFactoryCls) { + super(lmFactoryCls, 3); + } + + @Parameters + public static Collection<Object[]> configs() { + return Arrays.asList(new Object[][] { { FlatLedgerManagerFactory.class } }); + } + + @Test(timeout = 60000) + public void testGcOverreplicatedLedger() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes()); + activeLedgers.put(lh.getId(), true); + + final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null); + final CountDownLatch latch = new CountDownLatch(1); + ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() { + + @Override + public void operationComplete(int rc, LedgerMetadata result) { + if (rc == BKException.Code.OK) { + newLedgerMetadata.set(result); + } + latch.countDown(); + } + }); + latch.await(); + if (newLedgerMetadata.get() == null) { + Assert.fail("No ledger metadata found"); + } + BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get()); + ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble); + bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS); + + lh.close(); + + final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage(); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage, + bkConf); + Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1); + garbageCollector.gc(new GarbageCleaner() { + + @Override + public void clean(long ledgerId) { + try { + mockLedgerStorage.deleteLedger(ledgerId); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + }); + + Assert.assertFalse(activeLedgers.containsKey(lh.getId())); + } + + @Test(timeout = 60000) + public void testNoGcOfLedger() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes()); + activeLedgers.put(lh.getId(), true); + + final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null); + final CountDownLatch latch = new CountDownLatch(1); + ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() { + + @Override + public void operationComplete(int rc, LedgerMetadata result) { + if (rc == BKException.Code.OK) { + newLedgerMetadata.set(result); + } + latch.countDown(); + } + }); + latch.await(); + if (newLedgerMetadata.get() == null) { + Assert.fail("No ledger metadata found"); + } + BookieSocketAddress address = null; + SortedMap<Long, ArrayList<BookieSocketAddress>> ensembleMap = newLedgerMetadata.get().getEnsembles(); + for (ArrayList<BookieSocketAddress> ensemble : ensembleMap.values()) { + address = ensemble.get(0); + } + ServerConfiguration bkConf = getBkConf(address); + bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS); + + lh.close(); + + final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage(); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage, + bkConf); + Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1); + garbageCollector.gc(new GarbageCleaner() { + + @Override + public void clean(long ledgerId) { + try { + mockLedgerStorage.deleteLedger(ledgerId); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + }); + + Assert.assertTrue(activeLedgers.containsKey(lh.getId())); + } + + @Test(timeout = 60000) + public void testNoGcIfLedgerBeingReplicated() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes()); + activeLedgers.put(lh.getId(), true); + + final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null); + final CountDownLatch latch = new CountDownLatch(1); + ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() { + + @Override + public void operationComplete(int rc, LedgerMetadata result) { + if (rc == BKException.Code.OK) { + newLedgerMetadata.set(result); + } + latch.countDown(); + } + }); + latch.await(); + if (newLedgerMetadata.get() == null) { + Assert.fail("No ledger metadata found"); + } + BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get()); + ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble); + bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS); + + lh.close(); + + ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zkc, baseConf.getZkLedgersRootPath(), + lh.getId()); + + final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage(); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage, + bkConf); + Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1); + garbageCollector.gc(new GarbageCleaner() { + + @Override + public void clean(long ledgerId) { + try { + mockLedgerStorage.deleteLedger(ledgerId); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + }); + + Assert.assertTrue(activeLedgers.containsKey(lh.getId())); + } + + private BookieSocketAddress getBookieNotInEnsemble(LedgerMetadata ledgerMetadata) throws UnknownHostException { + List<BookieSocketAddress> allAddresses = Lists.newArrayList(); + for (BookieServer bk : bs) { + allAddresses.add(bk.getLocalAddress()); + } + SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles(); + for (ArrayList<BookieSocketAddress> fragmentEnsembles : ensembles.values()) { + for (BookieSocketAddress ensemble : fragmentEnsembles) { + allAddresses.remove(ensemble); + } + } + Assert.assertEquals(allAddresses.size(), 1); + return allAddresses.get(0); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index 3875ec4..d5866a5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -21,15 +21,18 @@ package org.apache.bookkeeper.meta; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -39,21 +42,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; -import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.CheckpointSource; -import org.apache.bookkeeper.bookie.BookieException; -import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollector; -import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerMetadata; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -62,8 +56,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; - /** * Test garbage collection ledgers in ledger manager */ @@ -175,7 +167,7 @@ public class GcLedgersTest extends LedgerManagerTestCase { final CountDownLatch endLatch = new CountDownLatch(2); final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage(); final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), - mockLedgerStorage); + mockLedgerStorage, baseConf); Thread gcThread = new Thread() { @Override public void run() { @@ -246,7 +238,7 @@ public class GcLedgersTest extends LedgerManagerTestCase { createLedgers(numLedgers, createdLedgers); final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), - new MockLedgerStorage()); + new MockLedgerStorage(), baseConf); GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() { @Override public void clean(long ledgerId) { @@ -282,7 +274,7 @@ public class GcLedgersTest extends LedgerManagerTestCase { createLedgers(numLedgers, createdLedgers); final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), - new MockLedgerStorage()); + new MockLedgerStorage(), baseConf); GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() { @Override public void clean(long ledgerId) { @@ -309,97 +301,4 @@ public class GcLedgersTest extends LedgerManagerTestCase { assertEquals("Should have cleaned something", 1, cleaned.size()); assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0)); } - - class MockLedgerStorage implements CompactableLedgerStorage { - - @Override - public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, - LedgerDirsManager indexDirsManager, - CheckpointSource checkpointSource, StatsLogger statsLogger) - throws IOException {} - - @Override - public void start() { - } - - @Override - public void shutdown() throws InterruptedException { - } - - @Override - public boolean ledgerExists(long ledgerId) throws IOException { - return false; - } - - @Override - public boolean setFenced(long ledgerId) throws IOException { - return false; - } - - @Override - public boolean isFenced(long ledgerId) throws IOException { - return false; - } - - @Override - public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { - } - - @Override - public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { - return null; - } - - @Override - public long addEntry(ByteBuffer entry) throws IOException { - return 0; - } - - @Override - public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { - return null; - } - - @Override - public void flush() throws IOException { - } - - @Override - public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { - return null; - } - - @Override - public void deleteLedger(long ledgerId) throws IOException { - activeLedgers.remove(ledgerId); - } - - @Override - public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { - NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot(); - Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot - .subMap(firstLedgerId, true, lastLedgerId, false); - - return subBkActiveLedgers.keySet(); - } - - @Override - public BKMBeanInfo getJMXBean() { - return null; - } - - @Override - public EntryLogger getEntryLogger() { - return null; - } - - @Override - public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException { - } - - @Override - public void flushEntriesLocationsIndex() throws IOException { - } - } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index dae61bc..1e7c9a6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -22,9 +22,22 @@ package org.apache.bookkeeper.meta; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Map; +import java.util.NavigableMap; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.jmx.BKMBeanInfo; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.SnapshotMap; import org.junit.After; @@ -42,13 +55,17 @@ import org.slf4j.LoggerFactory; public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class); - LedgerManagerFactory ledgerManagerFactory; - LedgerManager ledgerManager = null; - LedgerIdGenerator ledgerIdGenerator = null; - SnapshotMap<Long, Boolean> activeLedgers = null; + protected LedgerManagerFactory ledgerManagerFactory; + protected LedgerManager ledgerManager = null; + protected LedgerIdGenerator ledgerIdGenerator = null; + protected SnapshotMap<Long, Boolean> activeLedgers = null; public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) { - super(0); + this(lmFactoryCls, 0); + } + + public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls, int numBookies) { + super(numBookies); activeLedgers = new SnapshotMap<Long, Boolean>(); baseConf.setLedgerManagerFactoryClass(lmFactoryCls); } @@ -93,4 +110,95 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { super.tearDown(); } + public class MockLedgerStorage implements CompactableLedgerStorage { + + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException { + } + + @Override + public void start() { + } + + @Override + public void shutdown() throws InterruptedException { + } + + @Override + public boolean ledgerExists(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean setFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean isFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + } + + @Override + public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { + return null; + } + + @Override + public long addEntry(ByteBuffer entry) throws IOException { + return 0; + } + + @Override + public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + return null; + } + + @Override + public void flush() throws IOException { + } + + @Override + public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { + return null; + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + activeLedgers.remove(ledgerId); + } + + @Override + public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { + NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot(); + Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot + .subMap(firstLedgerId, true, lastLedgerId, false); + + return subBkActiveLedgers.keySet(); + } + + @Override + public BKMBeanInfo getJMXBean() { + return null; + } + + @Override + public EntryLogger getEntryLogger() { + return null; + } + + @Override + public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException { + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 28de0b2..efb8375 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -224,6 +224,23 @@ public abstract class BookKeeperClusterTestCase { } /** + * Get bookie configuration for bookie + */ + public ServerConfiguration getBkConf(BookieSocketAddress addr) throws Exception { + int bkIndex = 0; + for (BookieServer server : bs) { + if (server.getLocalAddress().equals(addr)) { + break; + } + ++bkIndex; + } + if (bkIndex < bs.size()) { + return bsConfs.get(bkIndex); + } + return null; + } + + /** * Kill a bookie by its socket address. Also, stops the autorecovery process * for the corresponding bookie server, if isAutoRecoveryEnabled is true. *
