This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ba8e2  Refactored LAC cache in DbLedgerStorage
76ba8e2 is described below

commit 76ba8e294e151092abe1e195009314fdd6eaa64a
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 16 09:38:22 2018 -0700

    Refactored LAC cache in DbLedgerStorage
    
    The `TransientLedgerInfo` cache that holds the LAC (or explicit lac) for 
all ledgers has showed up in the profiler as a big contention point.
    
    ### Modifications
    
     * Only add an item in `TransientLedgerInfo` is someone has asked for LAC 
or has set explicity LAC. This will save lot of memory if many ledger are not 
requiring the LAC info.
     * Use `ConcurrentLongHashMap` to store the `TransientLedgerInfo`. There 
are few advantages:
       - Key in the map is `long` and doesn't need boxing
       - The `get()` operation uses a stamped lock which is ideal in cases the 
number of `get()` is way bigger than `put()` operations, since it only needs a 
volatile variable read. In this case the map is only updated when new ledgers 
are added / removed, but we do a `get()` on each `addEntry()` operation.
      - Schedule periodical cleanup of the map for stale ledger info entries
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1264 from merlimat/refactor-lac-cache-db-ledger-storage
---
 .../bookie/storage/ldb/DbLedgerStorage.java        | 131 +++++++++++++--------
 .../util/collections/ConcurrentLongHashMap.java    |  54 +++++++++
 .../collections/ConcurrentLongHashMapTest.java     |  17 +++
 3 files changed, 152 insertions(+), 50 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 4beec22..ff9cd31 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -25,28 +25,27 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -74,6 +73,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +83,8 @@ import org.slf4j.LoggerFactory;
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
+    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
     /**
      * This class borrows the logic from FileInfo.
      *
@@ -93,7 +95,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
             implements AutoCloseable {
 
         // lac
-        private Long lac = null;
+        private volatile long lac = NOT_ASSIGNED_LAC;
         // request from explicit lac requests
         private ByteBuffer explicitLac = null;
         // is the ledger info closed?
@@ -103,6 +105,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         // reference to LedgerMetadataIndex
         private final LedgerMetadataIndex ledgerIndex;
 
+        private long lastAccessed;
+
         /**
          * Construct an Watchable with zero watchers.
          */
@@ -110,19 +114,21 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
             super(WATCHER_RECYCLER);
             this.ledgerId = ledgerId;
             this.ledgerIndex = ledgerIndex;
+            this.lastAccessed = System.currentTimeMillis();
         }
 
-        synchronized Long getLastAddConfirmed() {
+        long getLastAddConfirmed() {
             return lac;
         }
 
-        Long setLastAddConfirmed(long lac) {
+        long setLastAddConfirmed(long lac) {
             long lacToReturn;
             boolean changed = false;
             synchronized (this) {
-                if (null == this.lac || this.lac < lac) {
+                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
                     this.lac = lac;
                     changed = true;
+                    lastAccessed = System.currentTimeMillis();
                 }
                 lacToReturn = this.lac;
             }
@@ -135,8 +141,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
                                                            
Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
-            if ((null != lac && lac > previousLAC)
-                    || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+            lastAccessed = System.currentTimeMillis();
+            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
                 return false;
             }
 
@@ -171,10 +177,17 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
                 explicitLac.getLong();
                 explicitLacValue = explicitLac.getLong();
                 explicitLac.rewind();
+
+                lastAccessed = System.currentTimeMillis();
             }
             setLastAddConfirmed(explicitLacValue);
         }
 
+        boolean isStale() {
+            return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+                    .currentTimeMillis();
+        }
+
         void notifyWatchers(long lastAddConfirmed) {
             notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
         }
@@ -197,7 +210,9 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
     private LedgerMetadataIndex ledgerIndex;
     private EntryLocationIndex entryLocationIndex;
-    private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;
+
+    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+    private ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
 
     private GarbageCollectorThread gcThread;
 
@@ -221,8 +236,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     private final ExecutorService executor = 
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
 
     // Executor used to for db index cleanup
-    private final ExecutorService cleanupExecutor = Executors
-            .newSingleThreadExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
+    private final ScheduledExecutorService cleanupExecutor = Executors
+            .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
 
     static final String WRITE_CACHE_MAX_SIZE_MB = 
"dbStorage_writeCacheMaxSizeMb";
     static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
@@ -287,23 +302,10 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
 
-        // build the ledger info cache
-        int concurrencyLevel = Math.max(1, 
Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
-        RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener = 
this::handleLedgerEviction;
-        CacheBuilder<Long, TransientLedgerInfo> builder = 
CacheBuilder.newBuilder()
-            .initialCapacity(conf.getFileInfoCacheInitialCapacity())
-            .maximumSize(conf.getOpenFileLimit())
-            .concurrencyLevel(concurrencyLevel)
-            .removalListener(ledgerInfoRemovalListener);
-        if (conf.getFileInfoMaxIdleTime() > 0) {
-            builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(), 
TimeUnit.SECONDS);
-        }
-        transientLedgerInfoCache = builder.build(new CacheLoader<Long, 
TransientLedgerInfo>() {
-            @Override
-            public TransientLedgerInfo load(Long key) throws Exception {
-                return new TransientLedgerInfo(key, ledgerIndex);
-            }
-        });
+        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
+                Runtime.getRuntime().availableProcessors() * 2);
+        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, 
LEDGER_INFO_CACHING_TIME_MINUTES,
+                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, 
statsLogger);
@@ -312,14 +314,17 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     }
 
     /**
-     * When a ledger is evicted from transient ledger info cache, we can just 
simply discard the object.
+     * Evict all the ledger info object that were not used recently.
      */
-    private void handleLedgerEviction(RemovalNotification<Long, 
TransientLedgerInfo> notification) {
-        TransientLedgerInfo ledgerInfo = notification.getValue();
-        if (null == ledgerInfo || null == notification.getKey()) {
-            return;
-        }
-        ledgerInfo.close();
+    private void cleanupStaleTransientLedgerInfo() {
+        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+            boolean isStale = ledgerInfo.isStale();
+            if (isStale) {
+                ledgerInfo.close();
+            }
+
+            return isStale;
+        });
     }
 
     public void registerStats() {
@@ -437,7 +442,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         boolean changed = ledgerIndex.setFenced(ledgerId);
         if (changed) {
             // notify all the watchers if a ledger is fenced
-            TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.getIfPresent(ledgerId);
+            TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
             if (null != ledgerInfo) {
                 ledgerInfo.notifyWatchers(Long.MAX_VALUE);
             }
@@ -487,8 +492,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         }
 
         // after successfully insert the entry, update LAC and notify the 
watchers
-        transientLedgerInfoCache.getUnchecked(ledgerId)
-            .setLastAddConfirmed(lac);
+        updateCachedLacIfNeeded(ledgerId, lac);
 
         recordSuccessfulEvent(addEntryStats, startTime);
         return entryId;
@@ -854,6 +858,11 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
             LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
             listener.ledgerDeleted(ledgerId);
         }
+
+        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
+        if (tli != null) {
+            tli.close();
+        }
     }
 
     @Override
@@ -876,14 +885,14 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.getIfPresent(ledgerId);
-        Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
null;
-        if (null == lac) {
+        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
+        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
NOT_ASSIGNED_LAC;
+        if (lac == NOT_ASSIGNED_LAC) {
             ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             try {
                 bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
                 lac = bb.readLong();
-                lac = 
transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
+                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
             } finally {
                 bb.release();
             }
@@ -894,19 +903,17 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     @Override
     public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
             Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-        return transientLedgerInfoCache.getUnchecked(ledgerId)
-            .waitForLastAddConfirmedUpdate(previousLAC, watcher);
+        return 
getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, 
watcher);
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        transientLedgerInfoCache.getUnchecked(ledgerId)
-            .setExplicitLac(lac);
+        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.getIfPresent(ledgerId);
+        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
         if (null == ledgerInfo) {
             return null;
         } else {
@@ -914,6 +921,30 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         }
     }
 
+    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            return tli;
+        } else {
+            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, 
ledgerIndex);
+            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
+            if (tli != null) {
+                newTli.close();
+                return tli;
+            } else {
+                return newTli;
+            }
+        }
+    }
+
+    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            tli.setLastAddConfirmed(lac);
+        }
+    }
+
+
     @Override
     public void flushEntriesLocationsIndex() throws IOException {
         // No-op. Location index is already flushed in 
updateEntriesLocations() call
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 011a666..bdfaf03 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -52,6 +52,15 @@ public class ConcurrentLongHashMap<V> {
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    /**
+     * Predicate specialization for (long, V) types.
+     *
+     * @param <V>
+     */
+    public interface LongObjectPredicate<V> {
+        boolean test(long key, V value);
+    }
+
     private final Section<V>[] sections;
 
     public ConcurrentLongHashMap() {
@@ -149,6 +158,17 @@ public class ConcurrentLongHashMap<V> {
         return getSection(h).remove(key, value, (int) h) != null;
     }
 
+    public int removeIf(LongObjectPredicate<V> predicate) {
+        checkNotNull(predicate);
+
+        int removedCount = 0;
+        for (Section<V> s : sections) {
+            removedCount += s.removeIf(predicate);
+        }
+
+        return removedCount;
+    }
+
     private Section<V> getSection(long hash) {
         // Use 32 msb out of long to get the section
         final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
@@ -375,6 +395,40 @@ public class ConcurrentLongHashMap<V> {
             }
         }
 
+        int removeIf(LongObjectPredicate<V> filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                int capacity = this.capacity;
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (storedValue != EmptyValue && storedValue != 
DeletedValue) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+
+                            V nextValueInArray = values[signSafeMod(bucket + 
1, capacity)];
+                            if (nextValueInArray == EmptyValue) {
+                                values[bucket] = (V) EmptyValue;
+                                --usedBuckets;
+                            } else {
+                                values[bucket] = (V) DeletedValue;
+                            }
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
         void clear() {
             long stamp = writeLock();
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index 702e24c..06c7667 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -117,6 +117,23 @@ public class ConcurrentLongHashMapTest {
     }
 
     @Test
+    public void testRemoveIf() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+        map.put(1L, "one");
+        map.put(2L, "two");
+        map.put(3L, "three");
+        map.put(4L, "four");
+
+        map.removeIf((k, v) -> k < 3);
+        assertFalse(map.containsKey(1L));
+        assertFalse(map.containsKey(2L));
+        assertTrue(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
+    @Test
     public void testNegativeUsedBucketCount() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to