This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 387dc83 ISSUE #228: BookKeeper Server: Index Page Management Memory
Growth
387dc83 is described below
commit 387dc83d6d2db6a1d30bac3216a92f875083d144
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 17 13:53:38 2017 -0700
ISSUE #228: BookKeeper Server: Index Page Management Memory Growth
Descriptions of the changes in this PR:
** Improvements **
- Never delete a LedgerEntryPage that has been allocated.
- Track free LedgerEntryPage in a separate list
- Use pages from the free pages list as though they were being freshly
allocated
- This guarantees that the direct buffer allocation never exceeds the
allocated space for index pages
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>
This closes #229 from sijie/lru_related_changes, closes #228
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 10 ++
.../bookkeeper/bookie/IndexInMemPageMgr.java | 186 +++++++++++++++------
.../bookkeeper/bookie/IndexPersistenceMgr.java | 12 +-
.../apache/bookkeeper/bookie/LedgerEntryPage.java | 27 ++-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 2 +-
5 files changed, 180 insertions(+), 57 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index dc16b52..e0690fb 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -86,6 +86,9 @@ public interface BookKeeperServerStats {
// Ledger Storage Stats
String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
String STORAGE_GET_ENTRY = "STORAGE_GET_ENTRY";
+ /** Ledger Cache Stats **/
+ String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
+ /** SkipList Stats **/
String SKIP_LIST_GET_ENTRY = "SKIP_LIST_GET_ENTRY";
String SKIP_LIST_PUT_ENTRY = "SKIP_LIST_PUT_ENTRY";
String SKIP_LIST_SNAPSHOT = "SKIP_LIST_SNAPSHOT";
@@ -95,8 +98,15 @@ public interface BookKeeperServerStats {
String JOURNAL_QUEUE_SIZE = "JOURNAL_QUEUE_SIZE";
String READ_BYTES = "READ_BYTES";
String WRITE_BYTES = "WRITE_BYTES";
+ /** Ledger Cache Counters **/
+ String LEDGER_CACHE_HIT = "LEDGER_CACHE_HIT";
+ String LEDGER_CACHE_MISS = "LEDGER_CACHE_MISS";
+ /** Compaction/Garbage Collection Related Counters **/
String NUM_MINOR_COMP = "NUM_MINOR_COMP";
String NUM_MAJOR_COMP = "NUM_MAJOR_COMP";
+ /** Index Related Counters **/
+ String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
+ String INDEX_INMEM_ILLEGAL_STATE_DELETE =
"INDEX_INMEM_ILLEGAL_STATE_DELETE";
String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index b9b774f..361bc6e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -20,8 +20,12 @@
*/
package org.apache.bookkeeper.bookie;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DirectMemoryUtils;
import org.slf4j.Logger;
@@ -42,6 +46,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_DELETE;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_RESET;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_HIT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_MISS;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_READ_PAGE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_INDEX_PAGES;
class IndexInMemPageMgr {
@@ -51,14 +60,20 @@ class IndexInMemPageMgr {
private static class InMemPageCollection implements LEPStateChangeCallback
{
- ConcurrentMap<Long, ConcurrentMap<Long,LedgerEntryPage>> pages;
+ final ConcurrentMap<Long, ConcurrentMap<Long,LedgerEntryPage>> pages;
+ final Map<EntryKey, LedgerEntryPage> lruCleanPageMap;
+ final ConcurrentLinkedQueue<LedgerEntryPage> listOfFreePages;
- Map<EntryKey, LedgerEntryPage> lruCleanPageMap;
+ // Stats
+ final Counter illegalStateResetCounter;
+ final Counter illegalStateDeleteCounter;
- public InMemPageCollection() {
+ public InMemPageCollection(StatsLogger statsLogger) {
pages = new ConcurrentHashMap<Long,
ConcurrentMap<Long,LedgerEntryPage>>();
- lruCleanPageMap =
- Collections.synchronizedMap(new LinkedHashMap<EntryKey,
LedgerEntryPage>(16, 0.75f, true));
+ lruCleanPageMap = Collections.synchronizedMap(new
LinkedHashMap<EntryKey, LedgerEntryPage>(16, 0.75f, true));
+ listOfFreePages = new ConcurrentLinkedQueue<LedgerEntryPage>();
+ illegalStateResetCounter =
statsLogger.getCounter(INDEX_INMEM_ILLEGAL_STATE_RESET);
+ illegalStateDeleteCounter =
statsLogger.getCounter(INDEX_INMEM_ILLEGAL_STATE_DELETE);
}
/**
@@ -140,18 +155,28 @@ class IndexInMemPageMgr {
* Ledger id
* @returns number of pages removed
*/
- private int removeEntriesForALedger(long ledgerId) {
+ private void removeEntriesForALedger(long ledgerId) {
// remove pages first to avoid page flushed when deleting file info
ConcurrentMap<Long, LedgerEntryPage> lPages =
pages.remove(ledgerId);
if (null != lPages) {
- for (long entryId: lPages.keySet()) {
+ for (Map.Entry<Long, LedgerEntryPage> pageEntry :
lPages.entrySet()) {
+ long entryId = pageEntry.getKey();
synchronized(lruCleanPageMap) {
lruCleanPageMap.remove(new EntryKey(ledgerId,
entryId));
}
+
+ LedgerEntryPage lep = pageEntry.getValue();
+ // Cannot imagine under what circumstances we would have a
null entry here
+ // Just being safe
+ if (null != lep) {
+ if (lep.inUse()) {
+ illegalStateDeleteCounter.inc();
+ }
+ listOfFreePages.add(lep);
+ }
}
- return lPages.size();
+
}
- return 0;
}
/**
@@ -232,7 +257,13 @@ class IndexInMemPageMgr {
* @returns LedgerEntryPage if present
*/
LedgerEntryPage grabCleanPage(long ledgerId, long firstEntry) {
- LedgerEntryPage lep = null;
+ LedgerEntryPage lep = listOfFreePages.poll();
+ if (null != lep) {
+ lep.resetPage();
+ lep.setLedgerAndFirstEntry(ledgerId, firstEntry);
+ lep.usePage();
+ return lep;
+ }
while (lruCleanPageMap.size() > 0) {
lep = null;
synchronized(lruCleanPageMap) {
@@ -286,6 +317,15 @@ class IndexInMemPageMgr {
return lep;
}
+ public void addToListOfFreePages(LedgerEntryPage lep) {
+ if ((null == lep) || lep.inUse()) {
+ illegalStateResetCounter.inc();
+ }
+ if (null != lep) {
+ listOfFreePages.add(lep);
+ }
+ }
+
@Override
public void onSetInUse(LedgerEntryPage lep) {
removeFromCleanPageList(lep);
@@ -325,6 +365,11 @@ class IndexInMemPageMgr {
private final ConcurrentLinkedQueue<Long> ledgersToFlush = new
ConcurrentLinkedQueue<Long>();
private final ConcurrentSkipListSet<Long> ledgersFlushing = new
ConcurrentSkipListSet<Long>();
+ // Stats
+ private final Counter ledgerCacheHitCounter;
+ private final Counter ledgerCacheMissCounter;
+ private final OpStatsLogger ledgerCacheReadPageStats;
+
public IndexInMemPageMgr(int pageSize,
int entriesPerPage,
ServerConfiguration conf,
@@ -333,7 +378,7 @@ class IndexInMemPageMgr {
this.pageSize = pageSize;
this.entriesPerPage = entriesPerPage;
this.indexPersistenceManager = indexPersistenceManager;
- this.pageMapAndList = new InMemPageCollection();
+ this.pageMapAndList = new InMemPageCollection(statsLogger);
long maxDirectMemory = DirectMemoryUtils.maxDirectMemory();
@@ -346,17 +391,23 @@ class IndexInMemPageMgr {
LOG.info("maxDirectMemory = {}, pageSize = {}, pageLimit = {}",
new Object[] { maxDirectMemory, pageSize, pageLimit });
// Expose Stats
- statsLogger.registerGauge(NUM_INDEX_PAGES, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getNumUsedPages();
- }
- });
+ this.ledgerCacheHitCounter = statsLogger.getCounter(LEDGER_CACHE_HIT);
+ this.ledgerCacheMissCounter =
statsLogger.getCounter(LEDGER_CACHE_MISS);
+ this.ledgerCacheReadPageStats =
statsLogger.getOpStatsLogger(LEDGER_CACHE_READ_PAGE);
+ // Export sampled stats for index pages, ledgers.
+ statsLogger.registerGauge(
+ NUM_INDEX_PAGES,
+ new Gauge<Integer>() {
+ @Override
+ public Integer getDefaultValue() {
+ return 0;
+ }
+ @Override
+ public Integer getSample() {
+ return getNumUsedPages();
+ }
+ }
+ );
}
/**
@@ -387,7 +438,31 @@ class IndexInMemPageMgr {
return pageCount.get();
}
- LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean
onlyDirty) {
+ /**
+ * Get the ledger entry page for a given <i>pageEntry</i>.
+ *
+ * @param ledger
+ * ledger id
+ * @param pageEntry
+ * first entry id of a given page
+ * @return ledger entry page
+ * @throws IOException
+ */
+ public LedgerEntryPage getLedgerEntryPage(long ledger,
+ long pageEntry) throws
IOException {
+ LedgerEntryPage lep = getLedgerEntryPageFromCache(ledger, pageEntry,
false);
+ if (lep == null) {
+ ledgerCacheMissCounter.inc();
+ lep = grabLedgerEntryPage(ledger, pageEntry);
+ } else {
+ ledgerCacheHitCounter.inc();
+ }
+ return lep;
+ }
+
+ LedgerEntryPage getLedgerEntryPageFromCache(long ledger,
+ long firstEntry,
+ boolean onlyDirty) {
LedgerEntryPage lep = pageMapAndList.getPage(ledger, firstEntry);
if (onlyDirty && null != lep && lep.isClean()) {
return null;
@@ -415,33 +490,37 @@ class IndexInMemPageMgr {
// should get the up to date page from the persistence manager
// before we put it into table otherwise we would put
// an empty page in it
- indexPersistenceManager.updatePage(lep);
- LedgerEntryPage oldLep;
- if (lep != (oldLep = pageMapAndList.putPage(lep))) {
- lep.releasePage();
- // Decrement the page count because we couldn't put this lep
in the page cache.
- pageCount.decrementAndGet();
- // Increment the use count of the old lep because this is
unexpected
- oldLep.usePage();
- lep = oldLep;
+ Stopwatch readPageStopwatch = Stopwatch.createStarted();
+ boolean isNewPage = indexPersistenceManager.updatePage(lep);
+ if (!isNewPage) {
+ ledgerCacheReadPageStats.registerSuccessfulEvent(
+ readPageStopwatch.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
} catch (IOException ie) {
// if we grab a clean page, but failed to update the page
- // we are exhausting the count of ledger entry pages.
- // since this page will be never used, so we need to decrement
- // page count of ledger cache.
- lep.releasePage();
- pageCount.decrementAndGet();
+ // we should put this page in the free page list so that it
+ // can be reassigned to the next grabPage request
+ lep.releasePageNoCallback();
+ pageMapAndList.addToListOfFreePages(lep);
throw ie;
}
+ LedgerEntryPage oldLep;
+ if (lep != (oldLep = pageMapAndList.putPage(lep))) {
+ // if we grab a clean page, but failed to put it in the cache
+ // we should put this page in the free page list so that it
+ // can be reassigned to the next grabPage request
+ lep.releasePageNoCallback();
+ pageMapAndList.addToListOfFreePages(lep);
+ // Increment the use count of the old lep because this is
unexpected
+ oldLep.usePage();
+ lep = oldLep;
+ }
return lep;
}
void removePagesForLedger(long ledgerId) {
- int removedPageCount =
pageMapAndList.removeEntriesForALedger(ledgerId);
- if (pageCount.addAndGet(-removedPageCount) < 0) {
- throw new RuntimeException("Page count of ledger cache has been
decremented to be less than zero.");
- }
+ pageMapAndList.removeEntriesForALedger(ledgerId);
ledgersToFlush.remove(ledgerId);
}
@@ -524,7 +603,7 @@ class IndexInMemPageMgr {
List<LedgerEntryPage> entries = new
ArrayList<LedgerEntryPage>(firstEntryList.size());
try {
for(Long firstEntry: firstEntryList) {
- LedgerEntryPage lep = getLedgerEntryPage(ledger, firstEntry,
true);
+ LedgerEntryPage lep = getLedgerEntryPageFromCache(ledger,
firstEntry, true);
if (lep != null) {
entries.add(lep);
}
@@ -542,13 +621,16 @@ class IndexInMemPageMgr {
// find the id of the first entry of the page that has the entry
// we are looking for
long pageEntry = entry - offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
- if (lep == null) {
- lep = grabLedgerEntryPage(ledger, pageEntry);
+ LedgerEntryPage lep = null;
+ try {
+ lep = getLedgerEntryPage(ledger, pageEntry);
+ assert lep != null;
+ lep.setOffset(offset, offsetInPage *
LedgerEntryPage.getIndexEntrySize());
+ } finally {
+ if (null != lep) {
+ lep.releasePage();
+ }
}
- assert lep != null;
- lep.setOffset(offset, offsetInPage *
LedgerEntryPage.getIndexEntrySize());
- lep.releasePage();
}
long getEntryOffset(long ledger, long entry) throws IOException {
@@ -556,12 +638,10 @@ class IndexInMemPageMgr {
// find the id of the first entry of the page that has the entry
// we are looking for
long pageEntry = entry - offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+ LedgerEntryPage lep = null;
try {
- if (lep == null) {
- lep = grabLedgerEntryPage(ledger, pageEntry);
- }
- return lep.getOffset(offsetInPage *
LedgerEntryPage.getIndexEntrySize());
+ lep = getLedgerEntryPage(ledger, pageEntry);
+ return lep.getOffset(offsetInPage *
LedgerEntryPage.getIndexEntrySize());
} finally {
if (lep != null) {
lep.releasePage();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 9a34101..6332996 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -629,7 +629,15 @@ public class IndexPersistenceMgr {
}
}
- void updatePage(LedgerEntryPage lep) throws IOException {
+ /**
+ * Update the ledger entry page
+ *
+ * @param lep
+ * ledger entry page
+ * @return true if it is a new page, otherwise false.
+ * @throws IOException
+ */
+ boolean updatePage(LedgerEntryPage lep) throws IOException {
if (!lep.isClean()) {
throw new IOException("Trying to update a dirty page");
}
@@ -639,8 +647,10 @@ public class IndexPersistenceMgr {
long pos = lep.getFirstEntryPosition();
if (pos >= fi.size()) {
lep.zeroPage();
+ return true;
} else {
lep.readPage(fi);
+ return false;
}
} finally {
if (fi != null) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index 5aee2fe..ea96dff 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -43,7 +43,7 @@ public class LedgerEntryPage {
volatile private EntryKey entryKey = new EntryKey(-1,
BookieProtocol.INVALID_ENTRY_ID);
private final ByteBuffer page;
volatile private boolean clean = true;
- private final AtomicInteger useCount = new AtomicInteger();
+ private final AtomicInteger useCount = new AtomicInteger(0);
private final AtomicInteger version = new AtomicInteger(0);
volatile private int last = -1; // Last update position
private final LEPStateChangeCallback callback;
@@ -66,6 +66,21 @@ public class LedgerEntryPage {
}
}
+ // Except for not allocating a new direct byte buffer; this should do
everything that
+ // the constructor does
+ public void resetPage() {
+ page.clear();
+ ZeroBuffer.put(page);
+ last = -1;
+ entryKey = new EntryKey(-1, BookieProtocol.INVALID_ENTRY_ID);
+ clean = true;
+ useCount.set(0);
+ if (null != this.callback) {
+ callback.onResetInUse(this);
+ }
+ }
+
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -84,12 +99,20 @@ public class LedgerEntryPage {
}
}
+ public void releasePageNoCallback() {
+ releasePageInternal(false);
+ }
+
public void releasePage() {
+ releasePageInternal(true);
+ }
+
+ private void releasePageInternal(boolean shouldCallback) {
int newUseCount = useCount.decrementAndGet();
if (newUseCount < 0) {
throw new IllegalStateException("Use count has gone below 0");
}
- if ((null != callback) && (newUseCount == 0)) {
+ if (shouldCallback && (null != callback) && (newUseCount == 0)) {
callback.onResetInUse(this);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 41ab89c..8f4aacd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -374,7 +374,7 @@ public class LedgerCacheTest {
public void testSyncThreadNPE() throws IOException {
newLedgerCache();
try {
- ((LedgerCacheImpl)
ledgerCache).getIndexPageManager().getLedgerEntryPage(0L, 0L, true);
+ ((LedgerCacheImpl)
ledgerCache).getIndexPageManager().getLedgerEntryPageFromCache(0L, 0L, true);
} catch (Exception e) {
LOG.error("Exception when trying to get a ledger entry page", e);
fail("Shouldn't have thrown an exception");
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].