Repository: hbase Updated Branches: refs/heads/0.98 364fa3f7f -> 7ad167875
HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in MemStoreLAB Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7ad16787 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7ad16787 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7ad16787 Branch: refs/heads/0.98 Commit: 7ad167875e3bbef084939fef099106a45ae4ebf5 Parents: 364fa3f Author: Yu Li <[email protected]> Authored: Wed Jul 13 10:05:24 2016 +0800 Committer: Yu Li <[email protected]> Committed: Wed Jul 13 10:05:24 2016 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/MemStoreChunkPool.java | 17 +++++ .../hadoop/hbase/regionserver/MemStoreLAB.java | 28 +++++++- .../hbase/regionserver/TestMemStoreLAB.java | 76 +++++++++++++++++++- 3 files changed, 117 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7ad16787/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index be03488..19dc956 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -121,6 +122,13 @@ public class MemStoreChunkPool { return; } chunks.drainTo(reclaimedChunks, maxNumToPutback); + // clear reference of any non-reclaimable chunks + if (chunks.size() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue"); + } + chunks.clear(); + } } /** @@ -216,4 +224,13 @@ public class MemStoreChunkPool { return globalInstance; } + int getMaxCount() { + return this.maxCount; + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/7ad16787/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 4f776a6..43f58d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -53,9 +55,11 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public class MemStoreLAB { + static final Log LOG = LogFactory.getLog(MemStoreLAB.class); + private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>(); - // A queue of chunks contained by this memstore - private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>(); + // A queue of chunks contained by this memstore, used with chunk pool + private BlockingQueue<Chunk> chunkQueue = null; final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; final static int CHUNK_SIZE_DEFAULT = 2048 * 1024; @@ -89,6 +93,12 @@ public class MemStoreLAB { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); this.chunkPool = pool; + // currently chunkQueue is only used for chunkPool + if (this.chunkPool != null) { + // set queue length to chunk pool max count to avoid keeping reference of + // too many non-reclaimable chunks + chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount()); + } // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument( @@ -164,6 +174,8 @@ public class MemStoreLAB { * Try to retire the current chunk if it is still * <code>c</code>. Postcondition is that curChunk.get() * != c + * @param c the chunk to retire + * @return true if we won the race to retire the chunk */ private void tryRetireChunk(Chunk c) { curChunk.compareAndSet(c, null); @@ -195,7 +207,12 @@ public class MemStoreLAB { // we won race - now we need to actually do the expensive // allocation step c.init(); - this.chunkQueue.add(c); + if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + chunkQueue.size()); + } + } return c; } else if (chunkPool != null) { chunkPool.putbackChunk(c); @@ -210,6 +227,11 @@ public class MemStoreLAB { return this.curChunk.get(); } + @VisibleForTesting + BlockingQueue<Chunk> getChunkQueue() { + return this.chunkQueue; + } + /** * A chunk of memory out of which allocations are sliced. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/7ad16787/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 111be87..ba814ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; @@ -36,6 +38,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; + import org.junit.experimental.categories.Category; @Category(SmallTests.class) @@ -148,7 +151,78 @@ public class TestMemStoreLAB { } } - + + /** + * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure + * there's no memory leak (HBASE-16195) + * @throws Exception if any error occurred + */ + @Test + public void testLABChunkQueue() throws Exception { + MemStoreLAB mslab = new MemStoreLAB(); + // by default setting, there should be no chunk queue initialized + assertNull(mslab.getChunkQueue()); + // reset mslab with chunk pool + Configuration conf = HBaseConfiguration.create(); + conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); + // set chunk size to default max alloc size, so we could easily trigger chunk retirement + conf.setLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); + // reconstruct mslab + MemStoreChunkPool.clearDisableFlag(); + mslab = new MemStoreLAB(conf, MemStoreChunkPool.getPool(conf)); + // launch multiple threads to trigger frequent chunk retirement + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 10; i++) { + threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i)); + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(1000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + while (threadsRunning) { + for (Thread thread : threads) { + if (thread.isAlive()) { + threadsRunning = true; + break; + } + } + threadsRunning = false; + } + // close the mslab + mslab.close(); + // make sure all chunks reclaimed or removed from chunk queue + int queueLength = mslab.getChunkQueue().size(); + assertTrue("All chunks in chunk queue should be reclaimed or removed" + + " after mslab closed but actually: " + queueLength, queueLength == 0); + } + + private Thread getChunkQueueTestThread(final MemStoreLAB mslab, String threadName) { + Thread thread = new Thread() { + boolean stopped = false; + + @Override + public void run() { + while (!stopped) { + // keep triggering chunk retirement + mslab.allocateBytes(MemStoreLAB.MAX_ALLOC_DEFAULT - 1); + } + } + + @Override + public void interrupt() { + this.stopped = true; + } + }; + thread.setName(threadName); + thread.setDaemon(true); + return thread; + } + private static class AllocRecord implements Comparable<AllocRecord>{ private final Allocation alloc; private final int size;
