Repository: hive Updated Branches: refs/heads/llap 1972e8432 -> fc9f75836
HIVE-11200 : LLAP: Cache BuddyAllocator throws NPE (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc9f7583 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc9f7583 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc9f7583 Branch: refs/heads/llap Commit: fc9f7583660adc498a1985639adb2570cee391df Parents: 1972e84 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Jul 13 11:53:45 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Jul 13 11:53:45 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/BuddyAllocator.java | 87 +++++++++++++++----- .../hive/llap/cache/TestBuddyAllocator.java | 31 +++++++ 2 files changed, 97 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 3631418..fca6249 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -118,6 +118,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca } // First try to quickly lock some of the correct-sized free lists and allocate from them. int arenaCount = allocatedArenas.get(); + if (arenaCount < 0) { + arenaCount = -arenaCount - 1; // Next arena is being allocated. + } long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0; { int startIndex = (int)(threadId % arenaCount), index = startIndex; @@ -317,8 +320,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca FreeList freeList = freeLists[freeListIx]; int remaining = -1; freeList.lock.lock(); - // TODO: write some comments for this method try { + // Try to allocate from target-sized free list, maybe we'll get lucky. ix = allocateFromFreeListUnderLock( arenaIx, freeList, freeListIx, dest, ix, allocationSize); remaining = dest.length - ix; @@ -326,9 +329,12 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca } finally { freeList.lock.unlock(); } - byte headerData = makeHeader(freeListIx, true); - int headerStep = 1 << freeListIx; - int splitListIx = freeListIx + 1; + byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks. + int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block. + int splitListIx = freeListIx + 1; // Next free list from which we will be splitting. + // Each iteration of this loop tries to split blocks from one level of the free list into + // target size blocks; if we cannot satisfy the allocation from the free list containing the + // blocks of a particular size, we'll try to split yet larger blocks, until we run out. while (remaining > 0 && splitListIx < freeLists.length) { int splitWaysLog2 = (splitListIx - freeListIx); assert splitWaysLog2 > 0; @@ -338,28 +344,33 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca FreeList splitList = freeLists[splitListIx]; splitList.lock.lock(); try { - int headerIx = splitList.listHead; + int headerIx = splitList.listHead; // Index of the next free block to split. while (headerIx >= 0 && remaining > 0) { int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset; - int toTake = Math.min(splitWays, remaining); // We split it splitWays and take toTake. + // We will split the block at headerIx [splitWays] ways, and take [toTake] blocks, + // which will leave [lastSplitBlocksRemaining] free blocks of target size. + int toTake = Math.min(splitWays, remaining); remaining -= toTake; lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains. - // Take toTake blocks by splitting the block at origOffset. + // Take toTake blocks by splitting the block at offset. for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) { headers[headerIx] = headerData; // TODO: this could be done out of the lock, we only need to take the blocks out. ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize); } lastSplitNextHeader = headerIx; // If anything remains, this is where it starts. - headerIx = data.getInt(origOffset + 4); // Get next item from the free list. + headerIx = getNextFreeListItem(origOffset); } replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head. } finally { splitList.lock.unlock(); } if (remaining == 0) { - // We have just obtained all we needed by splitting at lastSplitBlockOffset; now - // we need to put the space remaining from that block into lower free lists. + // We have just obtained all we needed by splitting some block; now we need + // to put the space remaining from that block into lower free lists. + // We'll put at most one block into each list, since 2 blocks can always be combined + // to make a larger-level block. Each bit in the remaining target-sized blocks count + // is one block in a list offset from target-sized list by bit index. int newListIndex = freeListIx; while (lastSplitBlocksRemaining > 0) { if ((lastSplitBlocksRemaining & 1) == 1) { @@ -394,17 +405,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca private int allocateWithExpand( int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) { - if (data == null) { - synchronized (this) { - // Never goes from non-null to null, so this is the only place we need sync. - if (data == null) { - init(); - allocatedArenas.incrementAndGet(); - metrics.incrAllocatedArena(); + while (true) { + int arenaCount = allocatedArenas.get(), allocArenaCount = arenaCount; + if (arenaCount < 0) { + allocArenaCount = -arenaCount - 1; // Someone is allocating an arena. + } + if (allocArenaCount > arenaIx) { + // Someone already allocated this arena; just do the usual thing. + return allocateWithSplit(arenaIx, freeListIx, dest, ix, size); + } + if ((arenaIx + 1) == -arenaCount) { + // Someone is allocating this arena. Wait a bit and recheck. + try { + synchronized (this) { + this.wait(100); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt, won't handle here. } + continue; } + // Either this arena is being allocated, or it is already allocated, or it is next. The + // caller should not try to allocate another arena before waiting for the previous one. + assert arenaCount == arenaIx : + "Arena count " + arenaCount + " but " + arenaIx + " is not being allocated"; + if (!allocatedArenas.compareAndSet(arenaCount, -arenaCount - 1)) { + continue; // CAS race, look again. + } + assert data == null; + init(); + boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1); + assert isCommited; + synchronized (this) { + this.notifyAll(); + } + metrics.incrAllocatedArena(); + return allocateWithSplit(arenaIx, freeListIx, dest, ix, size); } - return allocateWithSplit(arenaIx, freeListIx, dest, ix, size); } public int offsetFromHeaderIndex(int lastSplitNextHeader) { @@ -418,7 +455,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca int offset = offsetFromHeaderIndex(current); // Noone else has this either allocated or in a different free list; no sync needed. headers[current] = makeHeader(freeListIx, true); - current = data.getInt(offset + 4); + current = getNextFreeListItem(offset); ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size); ++ix; } @@ -426,6 +463,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca return ix; } + private int getPrevFreeListItem(int offset) { + return data.getInt(offset); + } + + private int getNextFreeListItem(int offset) { + return data.getInt(offset + 4); + } + private byte makeHeader(int freeListIx, boolean isInUse) { return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0)); } @@ -462,7 +507,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) { if (freeList.listHead >= 0) { int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead); - assert data.getInt(oldHeadOffset) == -1; + assert getPrevFreeListItem(oldHeadOffset) == -1; data.putInt(oldHeadOffset, headerIx); } int offset = offsetFromHeaderIndex(headerIx); @@ -473,7 +518,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca private void removeBlockFromFreeList(FreeList freeList, int headerIx) { int bOffset = offsetFromHeaderIndex(headerIx), - bpHeaderIx = data.getInt(bOffset), bnHeaderIx = data.getInt(bOffset + 4); + bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset); if (freeList.listHead == headerIx) { assert bpHeaderIx == -1; freeList.listHead = bnHeaderIx; http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 7d265c1..50d5e19 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -136,6 +136,37 @@ public class TestBuddyAllocator { } } + @Test + public void testMTTArenas() { + final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4; + Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, (1 << min) * minAllocCount); + final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1")); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1); + Callable<Void> testCallable = new Callable<Void>() { + public Void call() throws Exception { + syncThreadStart(cdlIn, cdlOut); + allocSameSize(a, minAllocCount / threadCount, min); + return null; + } + }; + @SuppressWarnings("unchecked") + FutureTask<Void>[] allocTasks = new FutureTask[threadCount]; + for (int i = 0; i < threadCount; ++i) { + allocTasks[i] = new FutureTask<>(testCallable); + executor.execute(allocTasks[i]); + } + try { + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. + for (int i = 0; i < threadCount; ++i) { + allocTasks[i].get(); + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { cdlIn.countDown(); try {