Repository: hive Updated Branches: refs/heads/master a7927b1b8 -> 560e4feba
HIVE-12591 : LLAP cache counters displays -ve value for CacheCapacityUsed (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/560e4feb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/560e4feb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/560e4feb Branch: refs/heads/master Commit: 560e4feba66c2fc507dc2e11ded5de0fd70922a2 Parents: a7927b1 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Dec 7 17:26:03 2015 -0800 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Dec 7 17:32:56 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/BuddyAllocator.java | 1 - .../llap/cache/LowLevelCacheMemoryManager.java | 41 +++++++++------ .../llap/metrics/LlapDaemonCacheMetrics.java | 13 ++--- .../llap/cache/TestLowLevelLrfuCachePolicy.java | 52 +++++++++++++++++--- 4 files changed, 78 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/560e4feb/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 485a145..0c96efa 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -216,7 +216,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) { LlapDataBuffer buf = (LlapDataBuffer)buffer; long memUsage = buf.getMemoryUsage(); - metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity()); arenas[buf.arenaIndex].deallocate(buf); if (doReleaseMemory) { memoryManager.releaseMemory(memUsage); http://git-wip-us.apache.org/repos/asf/hive/blob/560e4feb/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index d584ca8..8788e15 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -43,27 +43,35 @@ public class LowLevelCacheMemoryManager implements MemoryManager { this.evictor = evictor; this.usedMemory = new AtomicLong(0); this.metrics = metrics; - metrics.incrCacheCapacityTotal(maxSize); + metrics.setCacheCapacityTotal(maxSize); if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize); } } @Override - public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) { + public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; int nextLog = 4; - while (memoryToReserve > 0) { - long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve; + long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve; + boolean result = true; + while (remainingToReserve > 0) { + long usedMem = usedMemory.get(), newUsedMem = usedMem + remainingToReserve; if (newUsedMem <= maxSize) { - if (usedMemory.compareAndSet(usedMem, newUsedMem)) break; + if (usedMemory.compareAndSet(usedMem, newUsedMem)) { + reservedTotalMetric += remainingToReserve; + break; + } continue; } // TODO: for one-block case, we could move notification for the last block out of the loop. - long evicted = evictor.evictSomeBlocks(memoryToReserve); + long evicted = evictor.evictSomeBlocks(remainingToReserve); if (evicted == 0) { - if (!waitForEviction) return false; + if (!waitForEviction) { + result = false; + break; + } ++badCallCount; if (badCallCount == nextLog) { LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?"); @@ -72,24 +80,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager { Thread.sleep(Math.min(1000, nextLog)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return false; + result = false; + break; } } continue; } + evictedTotalMetric += evicted; badCallCount = 0; // Adjust the memory - we have to account for what we have just evicted. while (true) { - long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted); - if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) { - memoryToReserve -= reserveWithEviction; + long availableToReserveAfterEvict = maxSize - usedMem + evicted; + long reservedAfterEvict = Math.min(remainingToReserve, availableToReserveAfterEvict); + if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reservedAfterEvict)) { + remainingToReserve -= reservedAfterEvict; + reservedTotalMetric += reservedAfterEvict; break; } usedMem = usedMemory.get(); } } - metrics.incrCacheCapacityUsed(memoryToReserve); - return true; + metrics.incrCacheCapacityUsed(reservedTotalMetric - evictedTotalMetric); + return result; } @@ -103,12 +115,13 @@ public class LowLevelCacheMemoryManager implements MemoryManager { } @Override - public void releaseMemory(long memoryToRelease) { + public void releaseMemory(final long memoryToRelease) { long oldV; do { oldV = usedMemory.get(); assert oldV >= memoryToRelease; } while (!usedMemory.compareAndSet(oldV, oldV - memoryToRelease)); + metrics.incrCacheCapacityUsed(-memoryToRelease); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/560e4feb/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java index 3ffa6e0..52057e4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java @@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import com.google.common.annotations.VisibleForTesting; @@ -53,9 +54,9 @@ public class LlapDaemonCacheMetrics implements MetricsSource { @Metric MutableCounterLong cacheReadRequests; @Metric - MutableCounterLong cacheCapacityTotal; + MutableGaugeLong cacheCapacityTotal; @Metric - MutableCounterLong cacheCapacityUsed; + MutableCounterLong cacheCapacityUsed; // Not using the gauge to avoid races. @Metric MutableCounterLong cacheRequestedBytes; @Metric @@ -77,18 +78,14 @@ public class LlapDaemonCacheMetrics implements MetricsSource { return ms.register(displayName, null, new LlapDaemonCacheMetrics(displayName, sessionId)); } - public void incrCacheCapacityTotal(long delta) { - cacheCapacityTotal.incr(delta); + public void setCacheCapacityTotal(long value) { + cacheCapacityTotal.set(value); } public void incrCacheCapacityUsed(long delta) { cacheCapacityUsed.incr(delta); } - public void decrCacheCapacityUsed(int delta) { - cacheCapacityUsed.incr(-delta); - } - public void incrCacheRequestedBytes(long delta) { cacheRequestedBytes.incr(delta); } http://git-wip-us.apache.org/repos/asf/hive/blob/560e4feb/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index bb530ef..d0abfa3 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.llap.cache; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.mockito.stubbing.Answer; + +import java.util.concurrent.atomic.AtomicLong; + +import org.mockito.invocation.InvocationOnMock; import java.lang.reflect.Field; import java.util.ArrayList; @@ -133,7 +140,7 @@ public class TestLowLevelLrfuCachePolicy { lfu.notifyUnlock(inserted.get(i)); } } - verifyOrder(mm, lfu, et, inserted); + verifyOrder(mm, lfu, et, inserted, null); } private Configuration createConf(int min, int heapSize, Double lambda) { @@ -176,7 +183,7 @@ public class TestLowLevelLrfuCachePolicy { lru.notifyUnlock(inserted.get(i)); } } - verifyOrder(mm, lru, et, inserted); + verifyOrder(mm, lru, et, inserted, null); } @Test @@ -236,6 +243,27 @@ public class TestLowLevelLrfuCachePolicy { lrfu.notifyUnlock(locked); } + private static class MetricsMock { + public MetricsMock(AtomicLong cacheUsed, LlapDaemonCacheMetrics metricsMock) { + this.cacheUsed = cacheUsed; + this.metricsMock = metricsMock; + } + public AtomicLong cacheUsed; + public LlapDaemonCacheMetrics metricsMock; + } + + private MetricsMock createMetricsMock() { + LlapDaemonCacheMetrics metricsMock = mock(LlapDaemonCacheMetrics.class); + final AtomicLong cacheUsed = new AtomicLong(0); + doAnswer(new Answer<Object>() { + public Object answer(InvocationOnMock invocation) throws Throwable { + cacheUsed.addAndGet((Long)invocation.getArguments()[0]); + return null; + } + }).when(metricsMock).incrCacheCapacityUsed(anyLong()); + return new MetricsMock(cacheUsed, metricsMock); + } + private void testHeapSize(int heapSize) { LOG.info("Testing heap size " + heapSize); Random rdm = new Random(1234); @@ -243,8 +271,8 @@ public class TestLowLevelLrfuCachePolicy { conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf); - LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, - LlapDaemonCacheMetrics.create("test", "1")); + MetricsMock m = createMetricsMock(); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, m.metricsMock); lrfu.setEvictionListener(et); // Insert the number of elements plus 2, to trigger 2 evictions. int toEvict = 2; @@ -254,6 +282,7 @@ public class TestLowLevelLrfuCachePolicy { for (int i = 0; i < heapSize + toEvict; ++i) { LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake(); assertTrue(cache(mm, lrfu, et, buffer)); + assertEquals((long)Math.min(i + 1, heapSize), m.cacheUsed.get()); LlapDataBuffer evictedBuf = getOneEvictedBuffer(et); if (i < toEvict) { evicted[i] = buffer; @@ -275,6 +304,7 @@ public class TestLowLevelLrfuCachePolicy { for (LlapDataBuffer buf : inserted) { lock(lrfu, buf); } + assertEquals(heapSize, m.cacheUsed.get()); assertFalse(mm.reserveMemory(1, false)); if (!et.evicted.isEmpty()) { assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); @@ -291,24 +321,34 @@ public class TestLowLevelLrfuCachePolicy { lrfu.notifyUnlock(buf); } } - verifyOrder(mm, lrfu, et, inserted); + verifyOrder(mm, lrfu, et, inserted, m.cacheUsed); } private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu, - EvictionTracker et, ArrayList<LlapDataBuffer> inserted) { + EvictionTracker et, ArrayList<LlapDataBuffer> inserted, AtomicLong cacheUsed) { LlapDataBuffer block; // Evict all blocks. et.evicted.clear(); for (int i = 0; i < inserted.size(); ++i) { assertTrue(mm.reserveMemory(1, false)); + if (cacheUsed != null) { + assertEquals(inserted.size(), cacheUsed.get()); + } } // The map should now be empty. assertFalse(mm.reserveMemory(1, false)); + if (cacheUsed != null) { + assertEquals(inserted.size(), cacheUsed.get()); + } for (int i = 0; i < inserted.size(); ++i) { block = et.evicted.get(i); assertTrue(block.isInvalid()); assertSame(inserted.get(i), block); } + if (cacheUsed != null) { + mm.releaseMemory(inserted.size()); + assertEquals(0, cacheUsed.get()); + } } private String dumpInserted(ArrayList<LlapDataBuffer> inserted) {