HIVE-12171 : LLAP: BuddyAllocator failures when querying uncompressed data (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cdbd1c85 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cdbd1c85 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cdbd1c85 Branch: refs/heads/master-fixed Commit: cdbd1c8517e70614ec9dfd0bfdc978b200a946c2 Parents: a46005c Author: Sergey Shelukhin <[email protected]> Authored: Mon Nov 2 13:16:34 2015 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Nov 2 13:16:34 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../hadoop/hive/llap/cache/BuddyAllocator.java | 89 +++++++++------ .../llap/cache/LowLevelCacheMemoryManager.java | 12 ++ .../hadoop/hive/llap/cache/MemoryManager.java | 1 + .../hive/llap/cache/TestBuddyAllocator.java | 6 +- .../hive/llap/cache/TestOrcMetadataCache.java | 4 + .../ql/io/orc/encoded/EncodedReaderImpl.java | 109 ++++++++++--------- 7 files changed, 144 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5198bb5..3ab73ad 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2308,9 +2308,10 @@ public class HiveConf extends Configuration { LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024, "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" + "the largest expected ORC compression buffer size. Must be power of 2."), - LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.io.cache.orc.arena.size", 128 * 1024 * 1024, - "Arena size for ORC low-level cache; cache will be allocated in arena-sized steps.\n" + - "Must presently be a power of two."), + LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8, + "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" + + "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" + + "not the case, an adjusted size will be used. Using powers of 2 is recommended."), LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024, "Maximum size for ORC low-level cache; must be a multiple of arena size."), LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true, http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 2aca68d..485a145 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -40,33 +40,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca private final long maxSize; private final boolean isDirect; private final LlapDaemonCacheMetrics metrics; - + + // We don't know the acceptable size for Java array, so we'll use 1Gb boundary. + // That is guaranteed to fit any maximum allocation. + private static final int MAX_ARENA_SIZE = 1024*1024*1024; public BuddyAllocator(Configuration conf, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) { isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT); minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC); - arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE); + int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT); long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE); - if (LlapIoImpl.LOGL.isInfoEnabled()) { + int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount); + arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE)); + if (LlapIoImpl.LOG.isInfoEnabled()) { LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers; allocation sizes " + minAllocation + " - " + maxAllocation - + ", arena size " + arenaSize + ". total size " + maxSizeVal); + + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal); } if (minAllocation < 8) { throw new AssertionError("Min allocation must be at least 8: " + minAllocation); } - if (maxSizeVal < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) { + if (maxSizeVal < arenaSizeVal || maxAllocation < minAllocation) { throw new AssertionError("Inconsistent sizes of cache, arena and allocations: " - + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSizeVal); + + minAllocation + ", " + maxAllocation + ", " + arenaSizeVal + ", " + maxSizeVal); + } + if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)) { + throw new AssertionError("Allocation sizes must be powers of two: " + + minAllocation + ", " + maxAllocation); } - if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1) - || (Long.bitCount(arenaSize) != 1)) { - // Technically, arena size only needs to be divisible by maxAlloc - throw new AssertionError("Allocation and arena sizes must be powers of two: " - + minAllocation + ", " + maxAllocation + ", " + arenaSize); + if ((arenaSizeVal % maxAllocation) > 0) { + long oldArenaSize = arenaSizeVal; + arenaSizeVal = (arenaSizeVal / maxAllocation) * maxAllocation; + LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize + + " to be divisible by allocation size " + maxAllocation); } + arenaSize = arenaSizeVal; if ((maxSizeVal % arenaSize) > 0) { long oldMaxSize = maxSizeVal; maxSizeVal = (maxSizeVal / arenaSize) * arenaSize; @@ -111,7 +121,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave? memoryManager.reserveMemory(dest.length << allocLog2, true); - int ix = 0; + int destAllocIx = 0; for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; dest[i] = createUnallocated(); // TODO: pool of objects? @@ -123,22 +133,29 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca } long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0; { - int startIndex = (int)(threadId % arenaCount), index = startIndex; + int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx; do { - int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize); - if (newIx == dest.length) return; - if (newIx != -1) { // TODO: check if it can still happen; count should take care of this. - ix = newIx; - } - ix = newIx; + int newDestIx = arenas[index].allocateFast( + index, freeListIx, dest, destAllocIx, allocationSize); + if (newDestIx == dest.length) return; + assert newDestIx != -1; + destAllocIx = newDestIx; if ((++index) == arenaCount) { index = 0; } - } while (index != startIndex); + } while (index != startArenaIx); } - // TODO: this is very hacky. - // We called reserveMemory so we know that somewhere in there, there's memory waiting for us. + // 1) We can get fragmented on large blocks of uncompressed data. The memory might be + // in there, but it might be in separate small blocks. This is a complicated problem, and + // several solutions (in order of decreasing ugliness and increasing complexity) are: just + // ask to evict the exact-sized block (there may be no such block), evict from a particular + // arena (policy would know allocator internals somewhat), store buffer mapping and ask to + // evict from specific choice of blocks next to each other or next to already-evicted block, + // and finally do a compaction (requires a block mapping and complex sync). For now we'd just + // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare. + // 2) Fragmentation aside (TODO: and this is a very hacky solution for that), + // we called reserveMemory so we know that there's memory waiting for us somewhere. // However, we have a class of rare race conditions related to the order of locking/checking of // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2. // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2; @@ -155,22 +172,32 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca // But for now we will just retry 5 times 0_o for (int attempt = 0; attempt < 5; ++attempt) { // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once - for (int i = 0; i < arenaCount; ++i) { - int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize); - if (newIx == -1) break; // Shouldn't happen. - if (newIx == dest.length) return; - ix = newIx; + { + int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx; + do { + int newDestIx = arenas[arenaIx].allocateWithSplit( + arenaIx, freeListIx, dest, destAllocIx, allocationSize); + if (newDestIx == dest.length) return; + assert newDestIx != -1; + destAllocIx = newDestIx; + if ((++arenaIx) == arenaCount) { + arenaIx = 0; + } + } while (arenaIx != startArenaIx); } + if (attempt == 0) { // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. - for (int i = arenaCount; i < arenas.length; ++i) { - ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize); - if (ix == dest.length) return; + for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) { + destAllocIx = arenas[arenaIx].allocateWithExpand( + arenaIx, freeListIx, dest, destAllocIx, allocationSize); + if (destAllocIx == dest.length) return; } } + memoryManager.forceReservedMemory(allocationSize * (dest.length - destAllocIx)); LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt); } - String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length; + String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length; LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump() + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom()); throw new AllocatorOutOfMemoryException(msg); http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 4a256ee..d584ca8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -71,6 +71,8 @@ public class LowLevelCacheMemoryManager implements MemoryManager { try { Thread.sleep(Math.min(1000, nextLog)); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; } } continue; @@ -90,6 +92,16 @@ public class LowLevelCacheMemoryManager implements MemoryManager { return true; } + + @Override + public void forceReservedMemory(int memoryToEvict) { + while (memoryToEvict > 0) { + long evicted = evictor.evictSomeBlocks(memoryToEvict); + if (evicted == 0) return; + memoryToEvict -= evicted; + } + } + @Override public void releaseMemory(long memoryToRelease) { long oldV; http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index e1b0cb4..6cc262e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -22,4 +22,5 @@ public interface MemoryManager extends LlapOomDebugDump { boolean reserveMemory(long memoryToReserve, boolean waitForEviction); void releaseMemory(long memUsage); void updateMaxSize(long maxSize); + void forceReservedMemory(int memoryToEvict); } http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 6d21997..6375996 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -58,6 +58,10 @@ public class TestBuddyAllocator { @Override public void updateMaxSize(long maxSize) { } + + @Override + public void forceReservedMemory(int memoryToEvict) { + } } @Test @@ -280,7 +284,7 @@ public class TestBuddyAllocator { Configuration conf = new Configuration(); conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min); conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max); - conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, arena); + conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena); conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total); return conf; } http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index b886d77..901e58a 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -78,6 +78,10 @@ public class TestOrcMetadataCache { @Override public void updateMaxSize(long maxSize) { } + + @Override + public void forceReservedMemory(int memoryToEvict) { + } } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/cdbd1c85/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index e0c0743..f789a4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamD import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; import org.apache.hadoop.hive.ql.io.orc.DataReader; +import org.apache.hadoop.hive.ql.io.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.OutStream; import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; @@ -751,7 +752,7 @@ class EncodedReaderImpl implements EncodedReader { /** * To achieve some sort of consistent cache boundaries, we will cache streams deterministically; - * in segments starting w/stream start, and going for either stream size or maximum allocation. + * in segments starting w/stream start, and going for either stream size or some fixed size. * If we are not reading the entire segment's worth of data, then we will not cache the partial * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA * to handle just for this case. @@ -777,87 +778,87 @@ class EncodedReaderImpl implements EncodedReader { } // Account for maximum cache buffer size. long streamLen = streamEnd - streamOffset; - int partSize = cache.getAllocator().getMaxAllocation(), - partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0)); - long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd); + int partSize = determineUncompressedPartSize(), // + partCount = (int)(streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0); CacheChunk lastUncompressed = null; MemoryBuffer[] singleAlloc = new MemoryBuffer[1]; + /* +Starting pre-read for [12187411,17107411) at start: 12187411 end: 12449555 cache buffer: 0x5f64a8f6(2) +Processing uncompressed file data at [12187411, 12449555) + */ for (int i = 0; i < partCount; ++i) { - long hasEntirePartTo = -1; - if (partOffset == current.getOffset()) { - hasEntirePartTo = partOffset; + long partOffset = streamOffset + (i * partSize), + partEnd = Math.min(partOffset + partSize, streamEnd); + long hasEntirePartTo = partOffset; // We have 0 bytes of data for this part, for now. + assert partOffset <= current.getOffset(); + if (partOffset == current.getOffset() && current instanceof CacheChunk) { // We assume cache chunks would always match the way we read, so check and skip it. - if (current instanceof CacheChunk) { - lastUncompressed = (CacheChunk)current; - assert current.getOffset() == partOffset && current.getEnd() == partEnd; - partOffset = partEnd; - partEnd = Math.min(partOffset + partSize, streamEnd); - continue; - } + assert current.getOffset() == partOffset && current.getEnd() == partEnd; + lastUncompressed = (CacheChunk)current; + current = current.next; + continue; } if (current.getOffset() >= partEnd) { - // We have no data at all for this part of the stream (could be unneeded), skip. - partOffset = partEnd; - partEnd = Math.min(partOffset + partSize, streamEnd); - continue; + continue; // We have no data at all for this part of the stream (could be unneeded), skip. } if (toRelease == null && dataReader.isTrackingDiskRanges()) { toRelease = new ArrayList<ByteBuffer>(); } // We have some disk buffers... see if we have entire part, etc. - UncompressedCacheChunk candidateCached = null; + UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part. DiskRangeList next = current; while (true) { - if (next == null || next.getOffset() >= partEnd) { - if (hasEntirePartTo < partEnd && candidateCached != null) { - // We are missing a section at the end of the part... - lastUncompressed = copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc); - candidateCached = null; - } - break; + boolean noMoreDataForPart = (next == null || next.getOffset() >= partEnd); + if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) { + // We are missing a section at the end of the part... copy the start to non-cached. + lastUncompressed = copyAndReplaceCandidateToNonCached( + candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc); + candidateCached = null; } current = next; - boolean wasSplit = (current.getEnd() > partEnd); - if (wasSplit) { + if (noMoreDataForPart) break; // Done with this part. + + boolean wasSplit = false; + if (current.getEnd() > partEnd) { + // If the current buffer contains multiple parts, split it. current = current.split(partEnd); + wasSplit = true; } if (isDebugTracingEnabled) { LOG.info("Processing uncompressed file data at [" + current.getOffset() + ", " + current.getEnd() + ")"); } - BufferChunk bc = (BufferChunk)current; + BufferChunk curBc = (BufferChunk)current; if (!wasSplit && toRelease != null) { - toRelease.add(bc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part? + toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part? } // Track if we still have the entire part. long hadEntirePartTo = hasEntirePartTo; - if (hasEntirePartTo != -1) { - hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1; - } - if (candidateCached != null && hasEntirePartTo == -1) { - lastUncompressed = copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc); - candidateCached = null; - } - - if (hasEntirePartTo != -1) { + // We have data until the end of current block if we had it until the beginning. + hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1; + if (hasEntirePartTo == -1) { + // We don't have the entire part; copy both whatever we intended to cache, and the rest, + // to an allocated buffer. We could try to optimize a bit if we have contiguous buffers + // with gaps, but it's probably not needed. + if (candidateCached != null) { + assert hadEntirePartTo != -1; + copyAndReplaceCandidateToNonCached( + candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc); + candidateCached = null; + } + lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc); + next = lastUncompressed.next; // There may be more data after the gap. + } else { // So far we have all the data from the beginning of the part. if (candidateCached == null) { - candidateCached = new UncompressedCacheChunk(bc); + candidateCached = new UncompressedCacheChunk(curBc); } else { - candidateCached.addChunk(bc); + candidateCached.addChunk(curBc); } - // We will take care of this at the end of the part, or if we find a gap. next = current.next; - continue; } - // We don't have the entire part; just copy to an allocated buffer. We could try to - // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed. - lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc); - next = lastUncompressed.next; } if (candidateCached != null) { if (toCache == null) { @@ -908,6 +909,16 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } + + private int determineUncompressedPartSize() { + // We will break the uncompressed data in the cache in the chunks that are the size + // of the prevalent ORC compression buffer (the default), or maximum allocation (since we + // cannot allocate bigger chunks), whichever is less. + long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue(); + int maxAllocSize = cache.getAllocator().getMaxAllocation(); + return (int)Math.min(maxAllocSize, orcCbSizeDefault); + } + private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) { int startPos = dest.position(), startLim = dest.limit(); dest.put(src); // Copy uncompressed data to cache.
