Repository: bookkeeper Updated Branches: refs/heads/master 0583175de -> bf8ef14bc
BOOKKEEPER-852: Release LedgerDescriptor and master-key objects when not used anymore Maps with ledger descriptors and master-keys are not cleaned after a ledger gets deleted. For this PR, please only take a look at the last commit 18e3455. The other 2 commits are already in separate PRs. I'll rebase this PR once they'll get merged. Author: Matteo Merli <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #78 from merlimat/bk-ledger-descriptor Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/bf8ef14b Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/bf8ef14b Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/bf8ef14b Branch: refs/heads/master Commit: bf8ef14bc1bfdde6c6c8edcd7f380b4beca023f6 Parents: 0583175 Author: Matteo Merli <[email protected]> Authored: Tue Mar 28 13:37:16 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Mar 28 13:37:16 2017 -0700 ---------------------------------------------------------------------- .../org/apache/bookkeeper/bookie/Bookie.java | 31 ++--- .../bookkeeper/bookie/HandleFactoryImpl.java | 55 ++++---- .../bookie/InterleavedLedgerStorage.java | 13 ++ .../apache/bookkeeper/bookie/LedgerStorage.java | 12 ++ .../bookkeeper/bookie/LedgerStorageTest.java | 53 ++++++++ .../bookkeeper/bookie/TestSyncThread.java | 4 + .../apache/bookkeeper/meta/GcLedgersTest.java | 124 ++++++++++++++++++- .../bookkeeper/meta/LedgerManagerTestCase.java | 5 + 8 files changed, 256 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 0338db7..61ba9b1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -32,8 +32,6 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,7 +54,6 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -65,6 +62,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; @@ -141,7 +139,7 @@ public class Bookie extends BookieCriticalThread { BookieBean jmxBookieBean; BKMBeanInfo jmxLedgerStorageBean; - final ConcurrentMap<Long, byte[]> masterKeyCache = new ConcurrentHashMap<Long, byte[]>(); + private final ConcurrentLongHashMap<byte[]> masterKeyCache = new ConcurrentLongHashMap<>(); final protected String zkBookieRegPath; final protected String zkBookieReadOnlyPath; @@ -1345,23 +1343,26 @@ public class Bookie extends BookieCriticalThread { * * @throws BookieException if masterKey does not match the master key of the ledger */ - private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey) + private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, final byte[] masterKey) throws IOException, BookieException { - long ledgerId = entry.getLong(); + final long ledgerId = entry.getLong(); LedgerDescriptor l = handles.getHandle(ledgerId, masterKey); - if (!masterKeyCache.containsKey(ledgerId)) { - // new handle, we should add the key to journal ensure we can rebuild - ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length); - bb.putLong(ledgerId); - bb.putLong(METAENTRY_ID_LEDGER_KEY); - bb.putInt(masterKey.length); - bb.put(masterKey); - bb.flip(); + if (masterKeyCache.get(ledgerId) == null) { + // Force the load into masterKey cache + byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey); + if (oldValue == null) { + // new handle, we should add the key to journal ensure we can rebuild + ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length); + bb.putLong(ledgerId); + bb.putLong(METAENTRY_ID_LEDGER_KEY); + bb.putInt(masterKey.length); + bb.put(masterKey); + bb.flip(); - if (null == masterKeyCache.putIfAbsent(ledgerId, masterKey)) { getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(), null); } } + return l; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java index 45be763..2bc72e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java @@ -22,47 +22,52 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -class HandleFactoryImpl implements HandleFactory { - ConcurrentMap<Long, LedgerDescriptor> ledgers = new ConcurrentHashMap<Long, LedgerDescriptor>(); - ConcurrentMap<Long, LedgerDescriptor> readOnlyLedgers - = new ConcurrentHashMap<Long, LedgerDescriptor>(); +import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; + +class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener { + private final ConcurrentLongHashMap<LedgerDescriptor> ledgers; + private final ConcurrentLongHashMap<LedgerDescriptor> readOnlyLedgers; final LedgerStorage ledgerStorage; HandleFactoryImpl(LedgerStorage ledgerStorage) { this.ledgerStorage = ledgerStorage; + this.ledgers = new ConcurrentLongHashMap<>(); + this.readOnlyLedgers = new ConcurrentLongHashMap<>(); + + ledgerStorage.registerLedgerDeletionListener(this); } @Override - public LedgerDescriptor getHandle(long ledgerId, byte[] masterKey) - throws IOException, BookieException { - LedgerDescriptor handle = null; - if (null == (handle = ledgers.get(ledgerId))) { - // LedgerDescriptor#create sets the master key in the ledger storage, calling it - // twice on the same ledgerId is safe because it eventually puts a value in the ledger cache - // that guarantees synchronized access across all cached entries. - handle = ledgers.putIfAbsent(ledgerId, LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage)); - if (null == handle) { - handle = ledgers.get(ledgerId); - } + public LedgerDescriptor getHandle(final long ledgerId, final byte[] masterKey) throws IOException, BookieException { + LedgerDescriptor handle = ledgers.get(ledgerId); + + if (handle == null) { + handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage); + ledgers.putIfAbsent(ledgerId, handle); } + handle.checkAccess(masterKey); return handle; } @Override - public LedgerDescriptor getReadOnlyHandle(long ledgerId) - throws IOException, Bookie.NoLedgerException { - LedgerDescriptor handle = null; - if (null == (handle = readOnlyLedgers.get(ledgerId))) { - handle = readOnlyLedgers.putIfAbsent(ledgerId, LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage)); - if (null == handle) { - handle = readOnlyLedgers.get(ledgerId); - } + public LedgerDescriptor getReadOnlyHandle(final long ledgerId) throws IOException, Bookie.NoLedgerException { + LedgerDescriptor handle = readOnlyLedgers.get(ledgerId); + + if (handle == null) { + handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage); + readOnlyLedgers.putIfAbsent(ledgerId, handle); } + return handle; } + + @Override + public void ledgerDeleted(long ledgerId) { + ledgers.remove(ledgerId); + readOnlyLedgers.remove(ledgerId); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 308110b..afd65dc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.Map; @@ -46,6 +47,8 @@ import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET; @@ -82,6 +85,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry LedgerCache ledgerCache; private CheckpointSource checkpointSource; protected final CheckpointHolder checkpointHolder = new CheckpointHolder(); + private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists.newCopyOnWriteArrayList(); // A sorted map to stored all active ledger ids protected final SnapshotMap<Long, Boolean> activeLedgers; @@ -374,6 +378,10 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry public void deleteLedger(long ledgerId) throws IOException { activeLedgers.remove(ledgerId); ledgerCache.deleteLedger(ledgerId); + + for (LedgerDeletionListener listener : ledgerDeletionListeners) { + listener.ledgerDeleted(ledgerId); + } } @Override @@ -414,6 +422,11 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry return ledgerCache.getJMXBean(); } + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + ledgerDeletionListeners.add(listener); + } + protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throws IOException { processEntry(ledgerId, entryId, entry, true); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 84a309f..fbdd6b9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -145,6 +145,18 @@ public interface LedgerStorage { */ void deleteLedger(long ledgerId) throws IOException; + public static interface LedgerDeletionListener { + void ledgerDeleted(long ledgerId); + } + + /** + * Register a listener for ledgers deletion notifications + * + * @param listener + * object that will be notified every time a ledger is deleted + */ + void registerLedgerDeletionListener(LedgerDeletionListener listener); + /** * Get the JMX management bean for this LedgerStorage */ http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java new file mode 100644 index 0000000..d81e4d0 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import java.util.concurrent.CountDownLatch; + +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Assert; +import org.junit.Test; + +public class LedgerStorageTest extends BookKeeperClusterTestCase { + public LedgerStorageTest() { + super(1); + } + + @Test(timeout = 20000) + public void testLedgerDeleteNotification() throws Exception { + LedgerStorage ledgerStorage = bs.get(0).getBookie().ledgerStorage; + + long deletedLedgerId = 5; + ledgerStorage.setMasterKey(deletedLedgerId, new byte[0]); + + CountDownLatch counter = new CountDownLatch(1); + + ledgerStorage.registerLedgerDeletionListener(ledgerId -> { + Assert.assertEquals(deletedLedgerId, ledgerId); + + counter.countDown(); + }); + + ledgerStorage.deleteLedger(deletedLedgerId); + + counter.await(); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 1ce30e9..3672985 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -350,6 +350,10 @@ public class TestSyncThread { @Override public BKMBeanInfo getJMXBean() { return null; } + + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + } } private static class DummyLedgerDirsListener http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index c1e8bde..280db05 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -28,11 +28,14 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -42,15 +45,24 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollector; +import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Version; import org.junit.Test; import org.slf4j.Logger; @@ -299,6 +311,116 @@ public class GcLedgersTest extends LedgerManagerTestCase { removeLedger(first); garbageCollector.gc(cleaner); assertEquals("Should have cleaned something", 1, cleaned.size()); - assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0)); + assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0)); + } + + class MockLedgerStorage implements CompactableLedgerStorage { + + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException { + } + + @Override + public void start() { + } + + @Override + public void shutdown() throws InterruptedException { + } + + @Override + public long getLastAddConfirmed(long ledgerId) throws IOException { + return 0; + } + + @Override + public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException { + } + + @Override + public ByteBuffer getExplicitLac(long ledgerId) { + return null; + } + + @Override + public boolean ledgerExists(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean setFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean isFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + } + + @Override + public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { + return null; + } + + @Override + public long addEntry(ByteBuffer entry) throws IOException { + return 0; + } + + @Override + public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + return null; + } + + @Override + public void flush() throws IOException { + } + + @Override + public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { + return null; + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + activeLedgers.remove(ledgerId); + } + + @Override + public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { + NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot(); + Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot + .subMap(firstLedgerId, true, lastLedgerId, false); + + return subBkActiveLedgers.keySet(); + } + + @Override + public BKMBeanInfo getJMXBean() { + return null; + } + + @Override + public EntryLogger getEntryLogger() { + return null; + } + + @Override + public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException { + } + + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf8ef14b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 4c2ddaa..8d13102 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.stats.StatsLogger; @@ -176,6 +177,10 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { } @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + } + + @Override public void deleteLedger(long ledgerId) throws IOException { activeLedgers.remove(ledgerId); }
