Repository: hive Updated Branches: refs/heads/branch-2 a4b913360 -> bd32deb44
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 390b34b..364897c 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -57,7 +57,7 @@ public class TestBuddyAllocator { isMapped = mmap; } - private static class DummyMemoryManager implements MemoryManager { + static class DummyMemoryManager implements MemoryManager { @Override public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } @@ -76,11 +76,6 @@ public class TestBuddyAllocator { } @Override - public long forceReservedMemory(int allocationSize, int count) { - return allocationSize * count; - } - - @Override public void debugDumpShort(StringBuilder sb) { } } @@ -99,8 +94,9 @@ public class TestBuddyAllocator { @Test public void testSameSizes() throws Exception { int min = 3, max = 8, maxAlloc = 1 << max; - BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc, - tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1")); + BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, + maxAlloc, 0, tmpDir, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1"), null); for (int i = max; i >= min; --i) { allocSameSize(a, 1 << (max - i), i); } @@ -109,16 +105,18 @@ public class TestBuddyAllocator { @Test public void testMultipleArenas() throws Exception { int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5; - BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount, - tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1")); + BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, + maxAlloc * arenaCount, 0, tmpDir, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1"), null); allocSameSize(a, arenaCount * 2, allocLog2); } @Test public void testMTT() { final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3; - final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc * 8, - maxAlloc * 24, tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1")); + final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, + maxAlloc * 8, maxAlloc * 24, 0, tmpDir, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1"), null); ExecutorService executor = Executors.newFixedThreadPool(3); final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1); FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() { @@ -162,8 +160,8 @@ public class TestBuddyAllocator { public void testMTTArenas() { final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4; final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, - (1 << min) * minAllocCount, tmpDir, new DummyMemoryManager(), - LlapDaemonCacheMetrics.create("test", "1")); + (1 << min) * minAllocCount, 0, tmpDir, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1"), null); ExecutorService executor = Executors.newFixedThreadPool(threadCount); final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1); Callable<Void> testCallable = new Callable<Void>() { @@ -189,7 +187,8 @@ public class TestBuddyAllocator { throw new RuntimeException(t); } } - private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + + static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { cdlIn.countDown(); try { cdlOut.await(); @@ -202,8 +201,8 @@ public class TestBuddyAllocator { int allocCount, int arenaSizeMult, int arenaCount) throws Exception { int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult; BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, arenaSize, - arenaSize * arenaCount, tmpDir , new DummyMemoryManager(), - LlapDaemonCacheMetrics.create("test", "1")); + arenaSize * arenaCount, 0, tmpDir, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1"), null); allocateUp(a, min, max, allocCount, true); allocateDown(a, min, max, allocCount, true); allocateDown(a, min, max, allocCount, false); @@ -253,19 +252,34 @@ public class TestBuddyAllocator { try { a.allocateMultiple(allocs[index], size); } catch (AllocatorOutOfMemoryException ex) { - LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDumpForOomInternal()); + LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.testDump()); throw ex; } // LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump()); for (int j = 0; j < allocCount; ++j) { MemoryBuffer mem = allocs[index][j]; long testValue = testValues[index][j] = rdm.nextLong(); - int pos = mem.getByteBufferRaw().position(); - mem.getByteBufferRaw().putLong(pos, testValue); - int halfLength = mem.getByteBufferRaw().remaining() >> 1; - if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) { - mem.getByteBufferRaw().putLong(pos + halfLength, testValue); - } + putTestValue(mem, testValue); + } + } + + public static void putTestValue(MemoryBuffer mem, long testValue) { + int pos = mem.getByteBufferRaw().position(); + mem.getByteBufferRaw().putLong(pos, testValue); + int halfLength = mem.getByteBufferRaw().remaining() >> 1; + if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) { + mem.getByteBufferRaw().putLong(pos + halfLength, testValue); + } + } + + public static void checkTestValue(MemoryBuffer mem, long testValue, String str) { + int pos = mem.getByteBufferRaw().position(); + assertEquals("Failed to match (" + pos + ") on " + str, + testValue, mem.getByteBufferRaw().getLong(pos)); + int halfLength = mem.getByteBufferRaw().remaining() >> 1; + if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) { + assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + str, + testValue, mem.getByteBufferRaw().getLong(pos + halfLength)); } } @@ -286,14 +300,9 @@ public class TestBuddyAllocator { BuddyAllocator a, MemoryBuffer[] allocs, long[] testValues) { for (int j = 0; j < allocs.length; ++j) { LlapDataBuffer mem = (LlapDataBuffer)allocs[j]; - int pos = mem.getByteBufferRaw().position(); - assertEquals("Failed to match (" + pos + ") on " + j + "/" + allocs.length, - testValues[j], mem.getByteBufferRaw().getLong(pos)); - int halfLength = mem.getByteBufferRaw().remaining() >> 1; - if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) { - assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + j + "/" - + allocs.length, testValues[j], mem.getByteBufferRaw().getLong(pos + halfLength)); - } + long testValue = testValues[j]; + String str = j + "/" + allocs.length; + checkTestValue(mem, testValue, str); a.deallocate(mem); } } http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java new file mode 100644 index 0000000..79c44a7 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java @@ -0,0 +1,470 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.llap.cache.TestBuddyAllocator.DummyMemoryManager; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This relies on allocations basically being sequential, no internal reordering. Rather, + * the specific paths it takes do; all the scenarios should work regardless of allocation. + */ +public class TestBuddyAllocatorForceEvict { + private static final Logger LOG = LoggerFactory.getLogger(TestBuddyAllocatorForceEvict.class); + private static final DummyMemoryManager MM = new TestBuddyAllocator.DummyMemoryManager(); + private static final LlapDaemonCacheMetrics METRICS = LlapDaemonCacheMetrics.create("test", "1"); + + @Test(timeout = 6000) + public void testSimple() { + runSimpleTests(false); + runSimpleTests(true); + } + + public void runSimpleTests(boolean isBruteOnly) { + runSimple1to2Discard(create(1024, 1, 1024, true, isBruteOnly), 256); + runSimple1to2Discard(create(1024, 1, 1024, false, isBruteOnly), 256); + runSimple1to2Discard(create(512, 2, 1024, false, isBruteOnly), 256); + } + + @Test(timeout = 6000) + public void testSmallBlocks() { + runSmallBlockersTests(false); + runSmallBlockersTests(true); + } + + public void runSmallBlockersTests(boolean isBruteOnly) { + runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, false); + runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, false); + runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, true); + runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, true); + + runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, false); + runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, false); + runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, true); + runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, true); + } + + @Test(timeout = 6000) + public void testZebra() { + runZebraTests(false); + runZebraTests(true); + } + + public void runZebraTests(boolean isBruteOnly) { + runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 1); + runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 64, 8, 1); + runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 2); + runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 4); + + runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 1); + runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 64, 8, 1); + runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 2); + + runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 2); + runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 4); + } + + @Test(timeout = 6000) + public void testUnevenZebra() { + runUnevenZebraTests(false); + runUnevenZebraTests(true); + } + + public void runUnevenZebraTests(boolean isBruteOnly) { + runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly), + new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512); + runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly), + new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 }, + new int[] { 0, 2, 4, 6, 8 }, 512); + + runCustomDiscard(create(512, 2, 1024, false, isBruteOnly), + new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512); + runCustomDiscard(create(512, 2, 1024, false, isBruteOnly), + new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 }, + new int[] { 0, 2, 4, 6, 8 }, 512); + } + + @Test(timeout = 6000) + public void testComplex1() { + runComplexTests(false); + runComplexTests(true); + } + + public void runComplexTests(boolean isBruteOnly) { + runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly), + new int[] { 256, 128, 64, 64, 256, 64, 64, 128 }, + new int[] { 0, 3, 6, 7 }, 512 ); + runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly), + new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 }, + new int[] { 0, 4, 7, 8 }, 512 ); + + runCustomDiscard(create(512, 2, 1024, false, isBruteOnly), + new int[] { 256, 128, 64, 64, 256, 64, 64, 128 }, + new int[] { 0, 3, 6, 7 }, 512 ); + runCustomDiscard(create(512, 2, 1024, false, isBruteOnly), + new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 }, + new int[] { 0, 4, 7, 8 }, 512 ); + } + + static class MttTestCallableResult { + public int successes, ooms, allocSize; + @Override + public String toString() { + return "allocation size " + allocSize + ": " + successes + " allocations, " + ooms + " OOMs"; + } + } + + static class MttTestCallable implements Callable<MttTestCallableResult> { + private final CountDownLatch cdlIn, cdlOut; + private final int allocSize, allocCount, iterCount; + private final BuddyAllocator a; + + public MttTestCallable(CountDownLatch cdlIn, CountDownLatch cdlOut, BuddyAllocator a, + int allocSize, int allocCount, int iterCount) { + this.cdlIn = cdlIn; + this.cdlOut = cdlOut; + this.a = a; + this.allocSize = allocSize; + this.allocCount = allocCount; + this.iterCount = iterCount; + } + + public MttTestCallableResult call() throws Exception { + LOG.info(Thread.currentThread().getId() + " thread starts"); + TestBuddyAllocator.syncThreadStart(cdlIn, cdlOut); + MttTestCallableResult result = new MttTestCallableResult(); + result.allocSize = allocSize; + List<MemoryBuffer> allocs = new ArrayList<>(allocCount); + LlapAllocatorBuffer[] dest = new LlapAllocatorBuffer[1]; + for (int i = 0; i < iterCount; ++i) { + for (int j = 0; j < allocCount; ++j) { + try { + dest[0] = null; + a.allocateMultiple(dest, allocSize); + LlapAllocatorBuffer buf = dest[0]; + assertTrue(buf.incRef() > 0); + allocs.add(buf); + ++result.successes; + buf.decRef(); + } catch (AllocatorOutOfMemoryException ex) { + ++result.ooms; + } catch (Throwable ex) { + LOG.error("Failed", ex); + throw new Exception(ex); + } + } + for (MemoryBuffer buf : allocs) { + try { + a.deallocate(buf); + } catch (Throwable ex) { + LOG.error("Failed", ex); + throw new Exception(ex); + } + } + allocs.clear(); + } + return result; + } + } + + @Test(timeout = 200000) + public void testMtt() { + final int baseAllocSizeLog2 = 3, maxAllocSizeLog2 = 10, totalSize = 8192, + baseAllocSize = 1 << baseAllocSizeLog2, maxAllocSize = 1 << maxAllocSizeLog2; + final int threadCount = maxAllocSizeLog2 - baseAllocSizeLog2 + 1; + final int iterCount = 500; + final BuddyAllocator a = create(maxAllocSize, 4, totalSize, true, false); + ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1); + CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1); + @SuppressWarnings("unchecked") + FutureTask<MttTestCallableResult>[] allocTasks = new FutureTask[threadCount]; + FutureTask<Void> dumpTask = createAllocatorDumpTask(a); + for (int allocSize = baseAllocSize, i = 0; allocSize <= maxAllocSize; allocSize <<= 1, ++i) { + allocTasks[i] = new FutureTask<>(new MttTestCallable( + cdlIn, cdlOut, a, allocSize, totalSize / allocSize, iterCount)); + executor.execute(allocTasks[i]); + } + executor.execute(dumpTask); + + runMttTest(a, allocTasks, cdlIn, cdlOut, dumpTask, null, null, totalSize, maxAllocSize); + } + + public static void runMttTest(BuddyAllocator a, FutureTask<?>[] allocTasks, + CountDownLatch cdlIn, CountDownLatch cdlOut, FutureTask<Void> dumpTask, + FutureTask<Void> defragTask, AtomicBoolean defragStopped, int totalSize, int maxAllocSize) { + Throwable t = null; + try { + cdlIn.await(); // Wait for all threads to be ready. + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + cdlOut.countDown(); // Release them at the same time. + for (int i = 0; i < allocTasks.length; ++i) { + try { + Object result = allocTasks[i].get(); + LOG.info("" + result); + } catch (Throwable tt) { + LOG.error("Test callable failed", tt); + if (t == null) { + a.dumpTestLog(); + t = tt; + } + } + } + dumpTask.cancel(true); + if (defragTask != null) { + defragStopped.set(true); + try { + defragTask.get(); + } catch (Throwable tt) { + LOG.error("Defragmentation thread failed", t); + if (t == null) { + a.dumpTestLog(); + t = tt; + } + } + } + if (t != null) { + throw new RuntimeException("One of the errors", t); + } + // All the tasks should have deallocated their stuff. Make sure we can allocate everything. + LOG.info("Allocator state after all the tasks: " + a.testDump()); + try { + allocate(a, totalSize / maxAllocSize, maxAllocSize, 0); + } catch (Throwable tt) { + a.dumpTestLog(); + throw tt; + } + } + + public static FutureTask<Void> createAllocatorDumpTask(final BuddyAllocator a) { + return new FutureTask<Void>(new Callable<Void>() { + @Override + public Void call() throws Exception { + int logs = 40000; // Prevent excessive logging in case of deadlocks or slowness. + while ((--logs) >= 0) { + LOG.info("Allocator state (MTT): " + a.testDump()); + Thread.sleep(10); + } + return null; + } + }); + } + + private static void runCustomDiscard(BuddyAllocator a, int[] sizes, int[] dealloc, int size) { + LlapAllocatorBuffer[] initial = prepareCustomFragmentedAllocator(a, sizes, dealloc, true); + LlapAllocatorBuffer after = allocate(a, 1, size, initial.length + 1)[0]; + LOG.info("After: " + a.testDump()); + for (int i = 0; i < initial.length; ++i) { + if (initial[i] == null) continue; + checkTestValue(initial[i], i + 1, null, false); + a.deallocate(initial[i]); + } + checkTestValue(after, initial.length + 1, null, true); + a.deallocate(after); + } + + private static void runZebraDiscard( + BuddyAllocator a, int baseSize, int pairCount, int allocs) { + LlapAllocatorBuffer[] initial = prepareZebraFragmentedAllocator(a, baseSize, pairCount, true); + int allocFraction = allocs * 2; + int bigAllocSize = pairCount * 2 * baseSize / allocFraction; + LlapAllocatorBuffer[] after = allocate(a, allocs, bigAllocSize, 1 + initial.length); + LOG.info("After: " + a.testDump()); + for (int i = 0; i < pairCount; ++i) { + int ix = (i << 1) + 1; + checkTestValue(initial[ix], ix + 1, null, false); + } + checkTestValue(after[0], 1 + initial.length, null, true); + } + + public static LlapAllocatorBuffer[] prepareZebraFragmentedAllocator( + BuddyAllocator a, int baseSize, int pairCount, boolean doIncRef) { + // Allocate 1-1-... xN; free every other one, allocate N/2 (or N/4). + LlapAllocatorBuffer[] initial = allocate(a, pairCount * 2, baseSize, 1, doIncRef); + for (int i = 0; i < pairCount; ++i) { + a.deallocate(initial[i << 1]); + initial[i << 1] = null; + } + LOG.info("Before: " + a.testDump()); + a.setOomLoggingForTest(true); + return initial; + } + + private void runSimple1to2Discard(BuddyAllocator a, int baseSize) { + // Allocate 1-1-1-1; free 0&2; allocate 2 + LlapAllocatorBuffer[] initial = prepareSimpleFragmentedAllocator(a, baseSize, true); + LlapAllocatorBuffer[] after = allocate(a, 1, baseSize * 2, 1 + initial.length); + LOG.info("After: " + a.testDump()); + checkInitialValues(initial, 0, 2); + checkTestValue(after[0], 1 + initial.length, null, true); + a.deallocate(initial[0]); + a.deallocate(initial[2]); + a.deallocate(after[0]); + } + + public static LlapAllocatorBuffer[] prepareSimpleFragmentedAllocator( + BuddyAllocator a, int baseSize, boolean doIncRef) { + LlapAllocatorBuffer[] initial = allocate(a, 4, baseSize, 1, doIncRef); + checkInitialValues(initial, 0, 2); + a.deallocate(initial[1]); + a.deallocate(initial[3]); + LOG.info("Before: " + a.testDump()); + a.setOomLoggingForTest(true); + return initial; + } + + private void runSmallBlockersDiscard(BuddyAllocator a, + int baseSize, boolean deallocOneFirst, boolean deallocOneSecond) { + LlapAllocatorBuffer[] initial = prepareAllocatorWithSmallFragments( + a, baseSize, deallocOneFirst, deallocOneSecond, true); + int bigAllocSize = baseSize * 4; + LlapAllocatorBuffer[] after = allocate(a, 1, bigAllocSize, 1 + initial.length); + LOG.info("After: " + a.testDump()); + checkInitialValues(initial, 2, 4); + checkTestValue(after[0], 1 + initial.length, null, true); + } + + public static LlapAllocatorBuffer[] prepareAllocatorWithSmallFragments(BuddyAllocator a, + int baseSize, boolean deallocOneFirst, boolean deallocOneSecond, boolean doIncRef) { + // Allocate 2-1-1-2-1-1; free 0,3 and optionally 1 or 5; allocate 4 + int offset = 0; + LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[6]; + initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0]; + MemoryBuffer[] tmp = allocate(a, 2, baseSize, offset + 1); + System.arraycopy(tmp, 0, initial, offset, 2); + offset += 2; + initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0]; + tmp = allocate(a, 2, baseSize, offset + 1); + System.arraycopy(tmp, 0, initial, offset, 2); + if (deallocOneFirst) { + a.deallocate(initial[1]); + } + if (deallocOneSecond) { + a.deallocate(initial[5]); + } + a.deallocate(initial[0]); + a.deallocate(initial[3]); + LOG.info("Before: " + a.testDump()); + a.setOomLoggingForTest(true); + return initial; + } + + private static void checkInitialValues(LlapAllocatorBuffer[] bufs, int... indexes) { + for (int index : indexes) { + LlapAllocatorBuffer buf = bufs[index]; + if (!incRefIfNotEvicted(buf, false)) continue; + try { + checkTestValue(buf, index + 1, null, false); + } finally { + buf.decRef(); + } + } + } + + private static boolean incRefIfNotEvicted(LlapAllocatorBuffer buf, boolean mustExist) { + int rc = buf.tryIncRef(); + if (rc == LlapAllocatorBuffer.INCREF_FAILED) { + fail("Failed to incref (bad state) " + buf); + } + if (rc <= 0 && mustExist) { + fail("Failed to incref (evicted) " + buf); + } + return rc > 0; // We expect evicted, but not failed. + } + + private static void checkTestValue( + LlapAllocatorBuffer mem, long testValue, String str, boolean mustExist) { + if (!incRefIfNotEvicted(mem, mustExist)) return; + try { + TestBuddyAllocator.checkTestValue(mem, testValue, str); + } finally { + mem.decRef(); + } + } + + public static BuddyAllocator create(int max, int arenas, int total, boolean isShortcut, + boolean isBruteForceOnly) { + BuddyAllocator result = new BuddyAllocator(false, false, 8, max, arenas, total, 0, + null, MM, METRICS, isBruteForceOnly ? "brute" : null); + if (!isShortcut) { + result.disableDefragShortcutForTest(); + } + result.setOomLoggingForTest(false); + return result; + } + + private static LlapAllocatorBuffer[] allocate( + BuddyAllocator a, int count, int size, int baseValue) { + return allocate(a, count, size, baseValue, true); + } + + public static LlapAllocatorBuffer[] allocate( + BuddyAllocator a, int count, int size, int baseValue, boolean doIncRef) { + LlapAllocatorBuffer[] allocs = new LlapAllocatorBuffer[count]; + try { + a.allocateMultiple(allocs, size); + } catch (AllocatorOutOfMemoryException ex) { + LOG.error("Failed to allocate " + allocs.length + " of " + size + "; " + a.testDump()); + throw ex; + } + for (int i = 0; i < count; ++i) { + // Make sure buffers are eligible for discard. + if (doIncRef) { + int rc = allocs[i].incRef(); + assertTrue(rc > 0); + } + TestBuddyAllocator.putTestValue(allocs[i], baseValue + i); + if (doIncRef) { + allocs[i].decRef(); + } + } + return allocs; + } + + public static LlapAllocatorBuffer[] prepareCustomFragmentedAllocator( + BuddyAllocator a, int[] sizes, int[] dealloc, boolean doIncRef) { + LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[sizes.length]; + for (int i = 0; i < sizes.length; ++i) { + initial[i] = allocate(a, 1, sizes[i], i + 1, doIncRef)[0]; + } + for (int i = 0; i < dealloc.length; ++i) { + a.deallocate(initial[dealloc[i]]); + initial[dealloc[i]] = null; + } + LOG.info("Before: " + a.testDump()); + a.setOomLoggingForTest(true); + return initial; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index e95f807..ab10285 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -59,7 +59,7 @@ public class TestLowLevelCacheImpl { public void allocateMultiple(MemoryBuffer[] dest, int size) { for (int i = 0; i < dest.length; ++i) { LlapDataBuffer buf = new LlapDataBuffer(); - buf.initialize(0, null, -1, size); + buf.initialize(null, -1, size); dest[i] = buf; } } @@ -86,6 +86,12 @@ public class TestLowLevelCacheImpl { public MemoryBuffer createUnallocated() { return new LlapDataBuffer(); } + + @Override + public void allocateMultiple(MemoryBuffer[] dest, int size, + BufferObjectFactory factory) throws AllocatorOutOfMemoryException { + allocateMultiple(dest, size); + } } private static class DummyCachePolicy implements LowLevelCachePolicy { @@ -116,11 +122,6 @@ public class TestLowLevelCacheImpl { } @Override - public long tryEvictContiguousData(int allocationSize, int count) { - return count * allocationSize; - } - - @Override public void debugDumpShort(StringBuilder sb) { } } @@ -249,7 +250,7 @@ Example code to test specific scenarios: evict(cache, fakes[2]); verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]); verifyCacheGet(cache, fn2, 1, 2, dr(1, 2)); - verifyRefcount(fakes, -1, 4, -1); + verifyRefcount(fakes, 0, 4, 0); } @Test @@ -372,8 +373,8 @@ Example code to test specific scenarios: continue; } ++gets; - LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer(); - assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex); + LlapAllocatorBuffer result = (LlapAllocatorBuffer)((CacheChunk)iter).getBuffer(); + assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.getArenaIndex()); cache.decRefBuffer(result); iter = iter.next; } @@ -388,7 +389,7 @@ Example code to test specific scenarios: MemoryBuffer[] buffers = new MemoryBuffer[count]; for (int j = 0; j < offsets.length; ++j) { LlapDataBuffer buf = LowLevelCacheImpl.allocateFake(); - buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]); + buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0); buffers[j] = buf; } long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null); @@ -401,7 +402,7 @@ Example code to test specific scenarios: for (int j = 0; j < offsets.length; ++j) { LlapDataBuffer buf = (LlapDataBuffer)(buffers[j]); if ((maskVal & 1) == 1) { - assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex); + assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.getArenaIndex()); } maskVal >>= 1; cache.decRefBuffer(buf); @@ -415,7 +416,7 @@ Example code to test specific scenarios: } private int makeFakeArenaIndex(int fileIndex, long offset) { - return (int)((fileIndex << 16) + offset); + return (int)((fileIndex << 12) + offset); } }; @@ -438,7 +439,7 @@ Example code to test specific scenarios: if (r instanceof CacheChunk) { LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer(); cache.decRefBuffer(result); - if (victim == null && result.invalidate()) { + if (victim == null && result.invalidate() == LlapCacheableBuffer.INVALIDATE_OK) { ++evictions; victim = result; } @@ -491,7 +492,7 @@ Example code to test specific scenarios: for (int i = 0; i < refCount; ++i) { victimBuffer.decRef(); } - assertTrue(victimBuffer.invalidate()); + assertTrue(LlapCacheableBuffer.INVALIDATE_OK == victimBuffer.invalidate()); cache.notifyEvicted(victimBuffer); } http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 210cbb0..f86a37c 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import java.lang.reflect.Field; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -205,61 +204,6 @@ public class TestLowLevelLrfuCachePolicy { assertNotSame(locked, evicted); unlock(lrfu, locked); } - - - @Test - public void testForceEvictBySize() { - int heapSize = 12; - LOG.info("Testing force-eviction out of order"); - Configuration conf = new Configuration(); - ArrayList<LlapDataBuffer> sizeTwo = new ArrayList<LlapDataBuffer>(4), - sizeOne = new ArrayList<LlapDataBuffer>(4); - conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.45f); // size of LFU heap is 4 - EvictionTracker et = new EvictionTracker(); - LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); - lrfu.setEvictionListener(et); - for (int i = 0; i < 2; ++i) { - sizeTwo.add(cacheSizeTwoFake(et, lrfu)); - for (int j = 0; j < 2; ++j) { - LlapDataBuffer fake = LowLevelCacheImpl.allocateFake(); - assertTrue(cache(null, lrfu, et, fake)); - sizeOne.add(fake); - } - sizeTwo.add(cacheSizeTwoFake(et, lrfu)); - } - // Now we should have two in the heap and two in the list, which is an implementation detail. - // Evict only big blocks. - et.evicted.clear(); - assertEquals(8, lrfu.tryEvictContiguousData(2, 4)); - for (int i = 0; i < sizeTwo.size(); ++i) { - LlapDataBuffer block = et.evicted.get(i); - assertTrue(block.isInvalid()); - assertSame(sizeTwo.get(i), block); - } - et.evicted.clear(); - // Evict small blocks when no big ones are available. - assertEquals(2, lrfu.tryEvictContiguousData(2, 1)); - for (int i = 0; i < 2; ++i) { - LlapDataBuffer block = et.evicted.get(i); - assertTrue(block.isInvalid()); - assertSame(sizeOne.get(i), block); - } - et.evicted.clear(); - // Evict the rest. - assertEquals(2, lrfu.evictSomeBlocks(3)); - for (int i = 2; i < sizeOne.size(); ++i) { - LlapDataBuffer block = et.evicted.get(i - 2); - assertTrue(block.isInvalid()); - assertSame(sizeOne.get(i), block); - } - } - - private LlapDataBuffer cacheSizeTwoFake(EvictionTracker et, LowLevelLrfuCachePolicy lrfu) { - LlapDataBuffer fake = new LlapDataBuffer(); - fake.initialize(-1, ByteBuffer.wrap(new byte[2]), 0, 2); - assertTrue(cache(null, lrfu, et, fake)); - return fake; - } // Buffers in test are fakes not linked to cache; notify cache policy explicitly. public boolean cache(LowLevelCacheMemoryManager mm, http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 1d5954e..630f305 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; @@ -65,11 +66,6 @@ public class TestOrcMetadataCache { } @Override - public long tryEvictContiguousData(int allocationSize, int count) { - return 0; - } - - @Override public void debugDumpShort(StringBuilder sb) { } } @@ -97,11 +93,6 @@ public class TestOrcMetadataCache { } @Override - public long forceReservedMemory(int allocationSize, int count) { - return allocationSize * count; - } - - @Override public void debugDumpShort(StringBuilder sb) { } } http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 5e718c3..7ad457d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -406,7 +406,7 @@ class EncodedReaderImpl implements EncodedReader { hasError = false; } finally { // At this point, everything in the list is going to have a refcount of one. Unless it - // failed between the allocation and the incref for a single item, we should be ok. + // failed between the allocation and the incref for a single item, we should be ok. if (hasError) { releaseInitialRefcounts(toRead.next); if (toRelease != null) { @@ -456,7 +456,7 @@ class EncodedReaderImpl implements EncodedReader { if (sctx.stripeLevelStream == null) { sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; so each RG + // To avoid buffers being unlocked, run refcount one ahead; so each RG // processing will decref once, and the last one will unlock the buffers. sctx.stripeLevelStream.incRef(); // For stripe-level streams we don't need the extra refcount on the block. @@ -706,8 +706,8 @@ class EncodedReaderImpl implements EncodedReader { assert originalCbIndex >= 0; // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put, // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer - // is not in cache. - cacheWrapper.getAllocator().deallocate(getBuffer()); + // is not in cache. releaseBuffer will decref the buffer, and also deallocate. + cacheWrapper.releaseBuffer(this.buffer); cacheWrapper.reuseBuffer(replacementBuffer); // Replace the buffer in our big range list, as well as in current results. this.buffer = replacementBuffer; @@ -959,7 +959,7 @@ class EncodedReaderImpl implements EncodedReader { * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param kind + * @param kind */ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, long streamOffset, long streamEnd, Kind kind) throws IOException { @@ -1137,9 +1137,9 @@ class EncodedReaderImpl implements EncodedReader { private void allocateMultiple(MemoryBuffer[] dest, int size) { if (allocator != null) { - allocator.allocateMultiple(dest, size, isStopped); + allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped); } else { - cacheWrapper.getAllocator().allocateMultiple(dest, size); + cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory()); } } @@ -1477,7 +1477,7 @@ class EncodedReaderImpl implements EncodedReader { ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true); if (compressed.remaining() <= 0 && toRelease.remove(compressed)) { - releaseBuffer(compressed, true); // We copied the entire buffer. + releaseBuffer(compressed, true); // We copied the entire buffer. } // else there's more data to process; will be handled in next call. return cc; } http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java index 2172bd2..3aef6f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java @@ -20,10 +20,11 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; public interface StoppableAllocator extends Allocator { /** Stoppable allocate method specific to branch-2. */ - void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped) + void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped) throws AllocatorOutOfMemoryException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java index 16b9713..775233c 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java @@ -29,6 +29,10 @@ public interface Allocator { private static final long serialVersionUID = 268124648177151761L; } + public interface BufferObjectFactory { + MemoryBuffer create(); + } + /** * Allocates multiple buffers of a given size. * @param dest Array where buffers are placed. Objects are reused if already there @@ -36,14 +40,27 @@ public interface Allocator { * @param size Allocation size. * @throws AllocatorOutOfMemoryException Cannot allocate. */ + @Deprecated void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException; /** + * Allocates multiple buffers of a given size. + * @param dest Array where buffers are placed. Objects are reused if already there + * (see createUnallocated), created otherwise. + * @param size Allocation size. + * @param factory A factory to create the objects in the dest array, if needed. + * @throws AllocatorOutOfMemoryException Cannot allocate. + */ + void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory) + throws AllocatorOutOfMemoryException; + + /** * Creates an unallocated memory buffer object. This object can be passed to allocateMultiple * to allocate; this is useful if data structures are created for separate buffers that can * later be allocated together. * @return a new unallocated memory buffer */ + @Deprecated MemoryBuffer createUnallocated(); /** Deallocates a memory buffer. http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java index e53b737..552f20e 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java @@ -100,4 +100,10 @@ public interface DataCache { * @return the allocator */ Allocator getAllocator(); + + /** + * Gets the buffer object factory associated with this DataCache, to use with allocator. + * @return the factory + */ + Allocator.BufferObjectFactory getDataBufferFactory(); }
