This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 76ba8e2 Refactored LAC cache in DbLedgerStorage
76ba8e2 is described below
commit 76ba8e294e151092abe1e195009314fdd6eaa64a
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 16 09:38:22 2018 -0700
Refactored LAC cache in DbLedgerStorage
The `TransientLedgerInfo` cache that holds the LAC (or explicit lac) for
all ledgers has showed up in the profiler as a big contention point.
### Modifications
* Only add an item in `TransientLedgerInfo` is someone has asked for LAC
or has set explicity LAC. This will save lot of memory if many ledger are not
requiring the LAC info.
* Use `ConcurrentLongHashMap` to store the `TransientLedgerInfo`. There
are few advantages:
- Key in the map is `long` and doesn't need boxing
- The `get()` operation uses a stamped lock which is ideal in cases the
number of `get()` is way bigger than `put()` operations, since it only needs a
volatile variable read. In this case the map is only updated when new ledgers
are added / removed, but we do a `get()` on each `addEntry()` operation.
- Schedule periodical cleanup of the map for stale ledger info entries
Author: Matteo Merli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1264 from merlimat/refactor-lac-cache-db-ledger-storage
---
.../bookie/storage/ldb/DbLedgerStorage.java | 131 +++++++++++++--------
.../util/collections/ConcurrentLongHashMap.java | 54 +++++++++
.../collections/ConcurrentLongHashMapTest.java | 17 +++
3 files changed, 152 insertions(+), 50 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 4beec22..ff9cd31 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -25,28 +25,27 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
@@ -74,6 +73,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +83,8 @@ import org.slf4j.LoggerFactory;
*/
public class DbLedgerStorage implements CompactableLedgerStorage {
+ private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
/**
* This class borrows the logic from FileInfo.
*
@@ -93,7 +95,7 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
implements AutoCloseable {
// lac
- private Long lac = null;
+ private volatile long lac = NOT_ASSIGNED_LAC;
// request from explicit lac requests
private ByteBuffer explicitLac = null;
// is the ledger info closed?
@@ -103,6 +105,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
// reference to LedgerMetadataIndex
private final LedgerMetadataIndex ledgerIndex;
+ private long lastAccessed;
+
/**
* Construct an Watchable with zero watchers.
*/
@@ -110,19 +114,21 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
super(WATCHER_RECYCLER);
this.ledgerId = ledgerId;
this.ledgerIndex = ledgerIndex;
+ this.lastAccessed = System.currentTimeMillis();
}
- synchronized Long getLastAddConfirmed() {
+ long getLastAddConfirmed() {
return lac;
}
- Long setLastAddConfirmed(long lac) {
+ long setLastAddConfirmed(long lac) {
long lacToReturn;
boolean changed = false;
synchronized (this) {
- if (null == this.lac || this.lac < lac) {
+ if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
this.lac = lac;
changed = true;
+ lastAccessed = System.currentTimeMillis();
}
lacToReturn = this.lac;
}
@@ -135,8 +141,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- if ((null != lac && lac > previousLAC)
- || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+ lastAccessed = System.currentTimeMillis();
+ if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed ||
ledgerIndex.get(ledgerId).getFenced()) {
return false;
}
@@ -171,10 +177,17 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
explicitLac.getLong();
explicitLacValue = explicitLac.getLong();
explicitLac.rewind();
+
+ lastAccessed = System.currentTimeMillis();
}
setLastAddConfirmed(explicitLacValue);
}
+ boolean isStale() {
+ return (lastAccessed +
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+ .currentTimeMillis();
+ }
+
void notifyWatchers(long lastAddConfirmed) {
notifyWatchers(LastAddConfirmedUpdateNotification.FUNC,
lastAddConfirmed);
}
@@ -197,7 +210,9 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
private LedgerMetadataIndex ledgerIndex;
private EntryLocationIndex entryLocationIndex;
- private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;
+
+ private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+ private ConcurrentLongHashMap<TransientLedgerInfo>
transientLedgerInfoCache;
private GarbageCollectorThread gcThread;
@@ -221,8 +236,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
// Executor used to for db index cleanup
- private final ExecutorService cleanupExecutor = Executors
- .newSingleThreadExecutor(new
DefaultThreadFactory("db-storage-cleanup"));
+ private final ScheduledExecutorService cleanupExecutor = Executors
+ .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("db-storage-cleanup"));
static final String WRITE_CACHE_MAX_SIZE_MB =
"dbStorage_writeCacheMaxSizeMb";
static final String READ_AHEAD_CACHE_BATCH_SIZE =
"dbStorage_readAheadCacheBatchSize";
@@ -287,23 +302,10 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
ledgerIndex = new LedgerMetadataIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, stats);
- // build the ledger info cache
- int concurrencyLevel = Math.max(1,
Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
- RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener =
this::handleLedgerEviction;
- CacheBuilder<Long, TransientLedgerInfo> builder =
CacheBuilder.newBuilder()
- .initialCapacity(conf.getFileInfoCacheInitialCapacity())
- .maximumSize(conf.getOpenFileLimit())
- .concurrencyLevel(concurrencyLevel)
- .removalListener(ledgerInfoRemovalListener);
- if (conf.getFileInfoMaxIdleTime() > 0) {
- builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(),
TimeUnit.SECONDS);
- }
- transientLedgerInfoCache = builder.build(new CacheLoader<Long,
TransientLedgerInfo>() {
- @Override
- public TransientLedgerInfo load(Long key) throws Exception {
- return new TransientLedgerInfo(key, ledgerIndex);
- }
- });
+ transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
+ Runtime.getRuntime().availableProcessors() * 2);
+
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
LEDGER_INFO_CACHING_TIME_MINUTES,
+ LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
entryLogger = new EntryLogger(conf, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this,
statsLogger);
@@ -312,14 +314,17 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
}
/**
- * When a ledger is evicted from transient ledger info cache, we can just
simply discard the object.
+ * Evict all the ledger info object that were not used recently.
*/
- private void handleLedgerEviction(RemovalNotification<Long,
TransientLedgerInfo> notification) {
- TransientLedgerInfo ledgerInfo = notification.getValue();
- if (null == ledgerInfo || null == notification.getKey()) {
- return;
- }
- ledgerInfo.close();
+ private void cleanupStaleTransientLedgerInfo() {
+ transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+ boolean isStale = ledgerInfo.isStale();
+ if (isStale) {
+ ledgerInfo.close();
+ }
+
+ return isStale;
+ });
}
public void registerStats() {
@@ -437,7 +442,7 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
boolean changed = ledgerIndex.setFenced(ledgerId);
if (changed) {
// notify all the watchers if a ledger is fenced
- TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.getIfPresent(ledgerId);
+ TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.get(ledgerId);
if (null != ledgerInfo) {
ledgerInfo.notifyWatchers(Long.MAX_VALUE);
}
@@ -487,8 +492,7 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
}
// after successfully insert the entry, update LAC and notify the
watchers
- transientLedgerInfoCache.getUnchecked(ledgerId)
- .setLastAddConfirmed(lac);
+ updateCachedLacIfNeeded(ledgerId, lac);
recordSuccessfulEvent(addEntryStats, startTime);
return entryId;
@@ -854,6 +858,11 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
listener.ledgerDeleted(ledgerId);
}
+
+ TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
+ if (tli != null) {
+ tli.close();
+ }
}
@Override
@@ -876,14 +885,14 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
- TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.getIfPresent(ledgerId);
- Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() :
null;
- if (null == lac) {
+ TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.get(ledgerId);
+ long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() :
NOT_ASSIGNED_LAC;
+ if (lac == NOT_ASSIGNED_LAC) {
ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
try {
bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
lac = bb.readLong();
- lac =
transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
+ lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
} finally {
bb.release();
}
@@ -894,19 +903,17 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId, long
previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws
IOException {
- return transientLedgerInfoCache.getUnchecked(ledgerId)
- .waitForLastAddConfirmedUpdate(previousLAC, watcher);
+ return
getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC,
watcher);
}
@Override
public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
- transientLedgerInfoCache.getUnchecked(ledgerId)
- .setExplicitLac(lac);
+ getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
}
@Override
public ByteBuf getExplicitLac(long ledgerId) {
- TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.getIfPresent(ledgerId);
+ TransientLedgerInfo ledgerInfo =
transientLedgerInfoCache.get(ledgerId);
if (null == ledgerInfo) {
return null;
} else {
@@ -914,6 +921,30 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
}
}
+ private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
+ TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+ if (tli != null) {
+ return tli;
+ } else {
+ TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId,
ledgerIndex);
+ tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
+ if (tli != null) {
+ newTli.close();
+ return tli;
+ } else {
+ return newTli;
+ }
+ }
+ }
+
+ private void updateCachedLacIfNeeded(long ledgerId, long lac) {
+ TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+ if (tli != null) {
+ tli.setLastAddConfirmed(lac);
+ }
+ }
+
+
@Override
public void flushEntriesLocationsIndex() throws IOException {
// No-op. Location index is already flushed in
updateEntriesLocations() call
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 011a666..bdfaf03 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -52,6 +52,15 @@ public class ConcurrentLongHashMap<V> {
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ /**
+ * Predicate specialization for (long, V) types.
+ *
+ * @param <V>
+ */
+ public interface LongObjectPredicate<V> {
+ boolean test(long key, V value);
+ }
+
private final Section<V>[] sections;
public ConcurrentLongHashMap() {
@@ -149,6 +158,17 @@ public class ConcurrentLongHashMap<V> {
return getSection(h).remove(key, value, (int) h) != null;
}
+ public int removeIf(LongObjectPredicate<V> predicate) {
+ checkNotNull(predicate);
+
+ int removedCount = 0;
+ for (Section<V> s : sections) {
+ removedCount += s.removeIf(predicate);
+ }
+
+ return removedCount;
+ }
+
private Section<V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
@@ -375,6 +395,40 @@ public class ConcurrentLongHashMap<V> {
}
}
+ int removeIf(LongObjectPredicate<V> filter) {
+ long stamp = writeLock();
+
+ int removedCount = 0;
+ try {
+ // Go through all the buckets for this section
+ int capacity = this.capacity;
+ for (int bucket = 0; bucket < capacity; bucket++) {
+ long storedKey = keys[bucket];
+ V storedValue = values[bucket];
+
+ if (storedValue != EmptyValue && storedValue !=
DeletedValue) {
+ if (filter.test(storedKey, storedValue)) {
+ // Removing item
+ --size;
+ ++removedCount;
+
+ V nextValueInArray = values[signSafeMod(bucket +
1, capacity)];
+ if (nextValueInArray == EmptyValue) {
+ values[bucket] = (V) EmptyValue;
+ --usedBuckets;
+ } else {
+ values[bucket] = (V) DeletedValue;
+ }
+ }
+ }
+ }
+
+ return removedCount;
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
void clear() {
long stamp = writeLock();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index 702e24c..06c7667 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -117,6 +117,23 @@ public class ConcurrentLongHashMapTest {
}
@Test
+ public void testRemoveIf() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+ map.put(1L, "one");
+ map.put(2L, "two");
+ map.put(3L, "three");
+ map.put(4L, "four");
+
+ map.removeIf((k, v) -> k < 3);
+ assertFalse(map.containsKey(1L));
+ assertFalse(map.containsKey(2L));
+ assertTrue(map.containsKey(3L));
+ assertTrue(map.containsKey(4L));
+ assertEquals(2, map.size());
+ }
+
+ @Test
public void testNegativeUsedBucketCount() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
--
To stop receiving notification emails like this one, please contact
[email protected].