This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 143bd19 Use ConcurrentLongLongHashMap in EntryLogMetadata
143bd19 is described below
commit 143bd1952851ac70ace9d16c3708ae30dad2a0aa
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 7 14:42:46 2017 -0800
Use ConcurrentLongLongHashMap in EntryLogMetadata
Porting commits
https://github.com/yahoo/bookkeeper/commit/74afc16d1c1e93739edf4a78801c5ccb6afd3883
and
https://github.com/yahoo/bookkeeper/commit/eba3bbfbbaf58118c0c5736b56d3897a5928bc89
from Yahoo branch.
Currently the `EntryLogger` is keeping a `ConcurrentHashMap<Long, Long>` to
keep track of amount of data stored in a single entry log file for each of the
ledgers.
The amount of memory required is high because of the objects boxing. The
`Long` used for key and value are 16 bytes each and the hash-map node will take
another ~24 bytes.
Additionally, all these objects are created, stored for hours and then
GCed, after being tenured.
By using `ConcurrentLongLongHashMap`, we eliminate the object boxing and
the need for the "node" since it's a open hashmap. This helps reducing a lot
the memory used by the bookie when storing a large amount of entries (54 --> 16
bytes).
Also since we store the items in `long[]`, this will not need objects and
it will not affect GC.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>
This closes #821 from merlimat/entrylog-metadata
---
.../apache/bookkeeper/bookie/EntryLogMetadata.java | 36 ++++-----
.../org/apache/bookkeeper/bookie/EntryLogger.java | 90 ++++++++++++++--------
.../bookkeeper/bookie/GarbageCollectorThread.java | 36 ++++-----
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 23 +++---
4 files changed, 103 insertions(+), 82 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
index 461736c..ad6d87d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -21,8 +21,9 @@
package org.apache.bookkeeper.bookie;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.LongPredicate;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
/**
* Records the total size, remaining size and the set of ledgers that comprise
a entry log.
@@ -31,32 +32,19 @@ public class EntryLogMetadata {
private final long entryLogId;
private long totalSize;
private long remainingSize;
- private ConcurrentHashMap<Long, Long> ledgersMap;
+ private final ConcurrentLongLongHashMap ledgersMap;
public EntryLogMetadata(long logId) {
this.entryLogId = logId;
totalSize = remainingSize = 0;
- ledgersMap = new ConcurrentHashMap<Long, Long>();
+ ledgersMap = new ConcurrentLongLongHashMap(256, 1);
}
public void addLedgerSize(long ledgerId, long size) {
totalSize += size;
remainingSize += size;
- Long ledgerSize = ledgersMap.get(ledgerId);
- if (null == ledgerSize) {
- ledgerSize = 0L;
- }
- ledgerSize += size;
- ledgersMap.put(ledgerId, ledgerSize);
- }
-
- public void removeLedger(long ledgerId) {
- Long size = ledgersMap.remove(ledgerId);
- if (null == size) {
- return;
- }
- remainingSize -= size;
+ ledgersMap.addAndGet(ledgerId, size);
}
public boolean containsLedger(long ledgerId) {
@@ -86,10 +74,20 @@ public class EntryLogMetadata {
return remainingSize;
}
- Map<Long, Long> getLedgersMap() {
+ ConcurrentLongLongHashMap getLedgersMap() {
return ledgersMap;
}
+ public void removeLedgerIf(LongPredicate predicate) {
+ ledgersMap.removeIf((ledgerId, size) -> {
+ boolean shouldRemove = predicate.test(ledgerId);
+ if (shouldRemove) {
+ remainingSize -= size;
+ }
+ return shouldRemove;
+ });
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index c520b3d..c00c84c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,7 +22,6 @@
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
-
import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
import static
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
@@ -52,7 +51,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -62,9 +60,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import
org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,7 +107,7 @@ public class EntryLogger {
entryLogMetadata.addLedgerSize(ledgerId, entrySize);
}
- public Map<Long, Long> getLedgersMap() {
+ public ConcurrentLongLongHashMap getLedgersMap() {
return entryLogMetadata.getLedgersMap();
}
}
@@ -507,38 +509,60 @@ public class EntryLogger {
private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws
IOException {
long ledgerMapOffset = entryLogChannel.position();
- Map<Long, Long> ledgersMap = entryLogChannel.getLedgersMap();
-
- Iterator<Entry<Long, Long>> iterator =
ledgersMap.entrySet().iterator();
- int numberOfLedgers = ledgersMap.size();
- int remainingLedgers = numberOfLedgers;
+ ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
+ int numberOfLedgers = (int) ledgersMap.size();
// Write the ledgers map into several batches
- while (iterator.hasNext()) {
- // Start new batch
- int batchSize = Math.min(remainingLedgers,
LEDGERS_MAP_MAX_BATCH_SIZE);
- int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * batchSize;
- ByteBuffer serializedMap = ByteBuffer.allocate(ledgerMapSize);
-
- serializedMap.putInt(ledgerMapSize - 4);
- serializedMap.putLong(INVALID_LID);
- serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
- serializedMap.putInt(batchSize);
-
- // Dump all ledgers for this batch
- for (int i = 0; i < batchSize; i++) {
- Entry<Long, Long> entry = iterator.next();
- long ledgerId = entry.getKey();
- long size = entry.getValue();
-
- serializedMap.putLong(ledgerId);
- serializedMap.putLong(size);
- --remainingLedgers;
- }
+ final AtomicLong currentOffset = new AtomicLong(ledgerMapOffset);
+ final int maxMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+ final ByteBuffer serializedMap = ByteBuffer.allocate(maxMapSize);
+
+ try {
+ ledgersMap.forEach(new BiConsumerLong() {
+ int remainingLedgers = numberOfLedgers;
+ boolean startNewBatch = true;
+ int remainingInBatch = 0;
+
+ @Override
+ public void accept(long ledgerId, long size) {
+ if (startNewBatch) {
+ int batchSize = Math.min(remainingLedgers,
LEDGERS_MAP_MAX_BATCH_SIZE);
+ int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+ serializedMap.clear();
+ serializedMap.putInt(ledgerMapSize - 4);
+ serializedMap.putLong(INVALID_LID);
+ serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
+ serializedMap.putInt(batchSize);
+
+ startNewBatch = false;
+ remainingInBatch = batchSize;
+ }
+ // Dump the ledger in the current batch
+ serializedMap.putLong(ledgerId);
+ serializedMap.putLong(size);
+ --remainingLedgers;
+
+ if (--remainingInBatch == 0) {
+ // Close current batch
+ serializedMap.flip();
+ try {
+ int written =
entryLogChannel.fileChannel.write(serializedMap, currentOffset.get());
+ currentOffset.addAndGet(written);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
- // Close current batch
- serializedMap.flip();
- entryLogChannel.fileChannel.write(serializedMap);
+ startNewBatch = true;
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw e;
+ }
}
// Update the headers with the map offset and count of ledgers
@@ -1278,7 +1302,7 @@ public class EntryLogger {
@Override
public boolean accept(long ledgerId) {
- return true;
+ return ledgerId > 0;
}
});
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 570011c..617a656 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -323,26 +323,24 @@ public class GarbageCollectorThread extends SafeRunnable {
*/
private void doGcEntryLogs() {
// Loop through all of the entry logs and remove the non-active
ledgers.
- for (Map.Entry<Long, EntryLogMetadata> entry :
entryLogMetaMap.entrySet()) {
- long entryLogId = entry.getKey();
- EntryLogMetadata meta = entry.getValue();
- for (Long entryLogLedger : meta.getLedgersMap().keySet()) {
+ entryLogMetaMap.forEach((entryLogId, meta) -> {
+ meta.removeLedgerIf((entryLogLedger) -> {
// Remove the entry log ledger from the set if it isn't active.
- try {
- if (!ledgerStorage.ledgerExists(entryLogLedger)) {
- meta.removeLedger(entryLogLedger);
- }
- } catch (IOException e) {
- LOG.error("Error reading from ledger storage", e);
- }
- }
- if (meta.isEmpty()) {
- // This means the entry log is not associated with any active
ledgers anymore.
- // We can remove this entry log file now.
- LOG.info("Deleting entryLogId " + entryLogId + " as it has no
active ledgers!");
- removeEntryLog(entryLogId);
- }
- }
+ try {
+ return !ledgerStorage.ledgerExists(entryLogLedger);
+ } catch (IOException e) {
+ LOG.error("Error reading from ledger storage", e);
+ return false;
+ }
+ });
+
+ if (meta.isEmpty()) {
+ // This means the entry log is not associated with any active
ledgers anymore.
+ // We can remove this entry log file now.
+ LOG.info("Deleting entryLogId " + entryLogId + " as it has no
active ledgers!");
+ removeEntryLog(entryLogId);
+ }
+ });
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index db161f5..527762b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -95,9 +96,9 @@ public class EntryLogTest {
EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
LOG.info("Extracted Meta From Entry Log {}", meta);
- assertNotNull(meta.getLedgersMap().get(1L));
- assertNull(meta.getLedgersMap().get(2L));
- assertNotNull(meta.getLedgersMap().get(3L));
+ assertTrue(meta.getLedgersMap().containsKey(1L));
+ assertFalse(meta.getLedgersMap().containsKey(2L));
+ assertTrue(meta.getLedgersMap().containsKey(3L));
}
private ByteBuf generateEntry(long ledger, long entry) {
@@ -251,10 +252,10 @@ public class EntryLogTest {
EntryLogMetadata meta = logger.extractEntryLogMetadataFromIndex(0L);
LOG.info("Extracted Meta From Entry Log {}", meta);
- assertEquals(60, meta.getLedgersMap().get(1L).longValue());
- assertEquals(30, meta.getLedgersMap().get(2L).longValue());
- assertEquals(30, meta.getLedgersMap().get(3L).longValue());
- assertNull(meta.getLedgersMap().get(4L));
+ assertEquals(60, meta.getLedgersMap().get(1L));
+ assertEquals(30, meta.getLedgersMap().get(2L));
+ assertEquals(30, meta.getLedgersMap().get(3L));
+ assertFalse(meta.getLedgersMap().containsKey(4L));
assertEquals(120, meta.getTotalSize());
assertEquals(120, meta.getRemainingSize());
}
@@ -303,10 +304,10 @@ public class EntryLogTest {
// Public method should succeed by falling back to scanning the file
EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
LOG.info("Extracted Meta From Entry Log {}", meta);
- assertEquals(60, meta.getLedgersMap().get(1L).longValue());
- assertEquals(30, meta.getLedgersMap().get(2L).longValue());
- assertEquals(30, meta.getLedgersMap().get(3L).longValue());
- assertNull(meta.getLedgersMap().get(4L));
+ assertEquals(60, meta.getLedgersMap().get(1L));
+ assertEquals(30, meta.getLedgersMap().get(2L));
+ assertEquals(30, meta.getLedgersMap().get(3L));
+ assertFalse(meta.getLedgersMap().containsKey(4L));
assertEquals(120, meta.getTotalSize());
assertEquals(120, meta.getRemainingSize());
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].