This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 143bd19  Use ConcurrentLongLongHashMap in EntryLogMetadata
143bd19 is described below

commit 143bd1952851ac70ace9d16c3708ae30dad2a0aa
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 7 14:42:46 2017 -0800

    Use ConcurrentLongLongHashMap in EntryLogMetadata
    
    Porting commits 
https://github.com/yahoo/bookkeeper/commit/74afc16d1c1e93739edf4a78801c5ccb6afd3883
 and 
https://github.com/yahoo/bookkeeper/commit/eba3bbfbbaf58118c0c5736b56d3897a5928bc89
 from Yahoo branch.
    
    Currently the `EntryLogger` is keeping a `ConcurrentHashMap<Long, Long>` to 
keep track of amount of data stored in a single entry log file for each of the 
ledgers.
    
    The amount of memory required is high because of the objects boxing. The 
`Long` used for key and value are 16 bytes each and the hash-map node will take 
another ~24 bytes.
    
    Additionally, all these objects are created, stored for hours and then 
GCed, after being tenured.
    
    By using `ConcurrentLongLongHashMap`, we eliminate the object boxing and 
the need for the "node" since it's a open hashmap. This helps reducing a lot 
the memory used by the bookie when storing a large amount of entries (54 --> 16 
bytes).
    Also since we store the items in `long[]`, this will not need objects and 
it will not affect GC.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #821 from merlimat/entrylog-metadata
---
 .../apache/bookkeeper/bookie/EntryLogMetadata.java | 36 ++++-----
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 90 ++++++++++++++--------
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 36 ++++-----
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 23 +++---
 4 files changed, 103 insertions(+), 82 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
index 461736c..ad6d87d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.LongPredicate;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 
 /**
  * Records the total size, remaining size and the set of ledgers that comprise 
a entry log.
@@ -31,32 +32,19 @@ public class EntryLogMetadata {
     private final long entryLogId;
     private long totalSize;
     private long remainingSize;
-    private ConcurrentHashMap<Long, Long> ledgersMap;
+    private final ConcurrentLongLongHashMap ledgersMap;
 
     public EntryLogMetadata(long logId) {
         this.entryLogId = logId;
 
         totalSize = remainingSize = 0;
-        ledgersMap = new ConcurrentHashMap<Long, Long>();
+        ledgersMap = new ConcurrentLongLongHashMap(256, 1);
     }
 
     public void addLedgerSize(long ledgerId, long size) {
         totalSize += size;
         remainingSize += size;
-        Long ledgerSize = ledgersMap.get(ledgerId);
-        if (null == ledgerSize) {
-            ledgerSize = 0L;
-        }
-        ledgerSize += size;
-        ledgersMap.put(ledgerId, ledgerSize);
-    }
-
-    public void removeLedger(long ledgerId) {
-        Long size = ledgersMap.remove(ledgerId);
-        if (null == size) {
-            return;
-        }
-        remainingSize -= size;
+        ledgersMap.addAndGet(ledgerId, size);
     }
 
     public boolean containsLedger(long ledgerId) {
@@ -86,10 +74,20 @@ public class EntryLogMetadata {
         return remainingSize;
     }
 
-    Map<Long, Long> getLedgersMap() {
+    ConcurrentLongLongHashMap getLedgersMap() {
         return ledgersMap;
     }
 
+    public void removeLedgerIf(LongPredicate predicate) {
+        ledgersMap.removeIf((ledgerId, size) -> {
+            boolean shouldRemove = predicate.test(ledgerId);
+            if (shouldRemove) {
+                remainingSize -= size;
+            }
+            return shouldRemove;
+        });
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index c520b3d..c00c84c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,7 +22,6 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
-
 import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
 import static 
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
@@ -52,7 +51,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -62,9 +60,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,7 +107,7 @@ public class EntryLogger {
             entryLogMetadata.addLedgerSize(ledgerId, entrySize);
         }
 
-        public Map<Long, Long> getLedgersMap() {
+        public ConcurrentLongLongHashMap getLedgersMap() {
             return entryLogMetadata.getLedgersMap();
         }
     }
@@ -507,38 +509,60 @@ public class EntryLogger {
     private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
         long ledgerMapOffset = entryLogChannel.position();
 
-        Map<Long, Long> ledgersMap = entryLogChannel.getLedgersMap();
-
-        Iterator<Entry<Long, Long>> iterator = 
ledgersMap.entrySet().iterator();
-        int numberOfLedgers = ledgersMap.size();
-        int remainingLedgers = numberOfLedgers;
+        ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
+        int numberOfLedgers = (int) ledgersMap.size();
 
         // Write the ledgers map into several batches
-        while (iterator.hasNext()) {
-            // Start new batch
-            int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-            ByteBuffer serializedMap = ByteBuffer.allocate(ledgerMapSize);
-
-            serializedMap.putInt(ledgerMapSize - 4);
-            serializedMap.putLong(INVALID_LID);
-            serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
-            serializedMap.putInt(batchSize);
-
-            // Dump all ledgers for this batch
-            for (int i = 0; i < batchSize; i++) {
-                Entry<Long, Long> entry = iterator.next();
-                long ledgerId = entry.getKey();
-                long size = entry.getValue();
-
-                serializedMap.putLong(ledgerId);
-                serializedMap.putLong(size);
-                --remainingLedgers;
-            }
+        final AtomicLong currentOffset = new AtomicLong(ledgerMapOffset);
+        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+        final ByteBuffer serializedMap = ByteBuffer.allocate(maxMapSize);
+
+        try {
+            ledgersMap.forEach(new BiConsumerLong() {
+                int remainingLedgers = numberOfLedgers;
+                boolean startNewBatch = true;
+                int remainingInBatch = 0;
+
+                @Override
+                public void accept(long ledgerId, long size) {
+                    if (startNewBatch) {
+                        int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
+                        int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+                        serializedMap.clear();
+                        serializedMap.putInt(ledgerMapSize - 4);
+                        serializedMap.putLong(INVALID_LID);
+                        serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
+                        serializedMap.putInt(batchSize);
+
+                        startNewBatch = false;
+                        remainingInBatch = batchSize;
+                    }
+                    // Dump the ledger in the current batch
+                    serializedMap.putLong(ledgerId);
+                    serializedMap.putLong(size);
+                    --remainingLedgers;
+
+                    if (--remainingInBatch == 0) {
+                        // Close current batch
+                        serializedMap.flip();
+                        try {
+                            int written = 
entryLogChannel.fileChannel.write(serializedMap, currentOffset.get());
+                            currentOffset.addAndGet(written);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
 
-            // Close current batch
-            serializedMap.flip();
-            entryLogChannel.fileChannel.write(serializedMap);
+                        startNewBatch = true;
+                    }
+                }
+            });
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw e;
+            }
         }
 
         // Update the headers with the map offset and count of ledgers
@@ -1278,7 +1302,7 @@ public class EntryLogger {
 
             @Override
             public boolean accept(long ledgerId) {
-                return true;
+                return ledgerId > 0;
             }
         });
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 570011c..617a656 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -323,26 +323,24 @@ public class GarbageCollectorThread extends SafeRunnable {
      */
     private void doGcEntryLogs() {
         // Loop through all of the entry logs and remove the non-active 
ledgers.
-        for (Map.Entry<Long, EntryLogMetadata> entry :  
entryLogMetaMap.entrySet()) {
-            long entryLogId = entry.getKey();
-            EntryLogMetadata meta = entry.getValue();
-            for (Long entryLogLedger : meta.getLedgersMap().keySet()) {
+        entryLogMetaMap.forEach((entryLogId, meta) -> {
+            meta.removeLedgerIf((entryLogLedger) -> {
                 // Remove the entry log ledger from the set if it isn't active.
-                try {
-                    if (!ledgerStorage.ledgerExists(entryLogLedger)) {
-                        meta.removeLedger(entryLogLedger);
-                    }
-                } catch (IOException e) {
-                    LOG.error("Error reading from ledger storage", e);
-                }
-            }
-            if (meta.isEmpty()) {
-                // This means the entry log is not associated with any active 
ledgers anymore.
-                // We can remove this entry log file now.
-                LOG.info("Deleting entryLogId " + entryLogId + " as it has no 
active ledgers!");
-                removeEntryLog(entryLogId);
-            }
-        }
+               try {
+                   return !ledgerStorage.ledgerExists(entryLogLedger);
+               } catch (IOException e) {
+                   LOG.error("Error reading from ledger storage", e);
+                   return false;
+               }
+           });
+
+           if (meta.isEmpty()) {
+               // This means the entry log is not associated with any active 
ledgers anymore.
+               // We can remove this entry log file now.
+               LOG.info("Deleting entryLogId " + entryLogId + " as it has no 
active ledgers!");
+               removeEntryLog(entryLogId);
+           }
+        });
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index db161f5..527762b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -95,9 +96,9 @@ public class EntryLogTest {
 
         EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
-        assertNotNull(meta.getLedgersMap().get(1L));
-        assertNull(meta.getLedgersMap().get(2L));
-        assertNotNull(meta.getLedgersMap().get(3L));
+        assertTrue(meta.getLedgersMap().containsKey(1L));
+        assertFalse(meta.getLedgersMap().containsKey(2L));
+        assertTrue(meta.getLedgersMap().containsKey(3L));
     }
 
     private ByteBuf generateEntry(long ledger, long entry) {
@@ -251,10 +252,10 @@ public class EntryLogTest {
 
         EntryLogMetadata meta = logger.extractEntryLogMetadataFromIndex(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
-        assertEquals(60, meta.getLedgersMap().get(1L).longValue());
-        assertEquals(30, meta.getLedgersMap().get(2L).longValue());
-        assertEquals(30, meta.getLedgersMap().get(3L).longValue());
-        assertNull(meta.getLedgersMap().get(4L));
+        assertEquals(60, meta.getLedgersMap().get(1L));
+        assertEquals(30, meta.getLedgersMap().get(2L));
+        assertEquals(30, meta.getLedgersMap().get(3L));
+        assertFalse(meta.getLedgersMap().containsKey(4L));
         assertEquals(120, meta.getTotalSize());
         assertEquals(120, meta.getRemainingSize());
     }
@@ -303,10 +304,10 @@ public class EntryLogTest {
         // Public method should succeed by falling back to scanning the file
         EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
-        assertEquals(60, meta.getLedgersMap().get(1L).longValue());
-        assertEquals(30, meta.getLedgersMap().get(2L).longValue());
-        assertEquals(30, meta.getLedgersMap().get(3L).longValue());
-        assertNull(meta.getLedgersMap().get(4L));
+        assertEquals(60, meta.getLedgersMap().get(1L));
+        assertEquals(30, meta.getLedgersMap().get(2L));
+        assertEquals(30, meta.getLedgersMap().get(3L));
+        assertFalse(meta.getLedgersMap().containsKey(4L));
         assertEquals(120, meta.getTotalSize());
         assertEquals(120, meta.getRemainingSize());
     }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to