Repository: hive Updated Branches: refs/heads/master 87ce36b45 -> 5b2124b99
HIVE-20244 : forward port HIVE-19704 to master (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b2124b9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b2124b9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b2124b9 Branch: refs/heads/master Commit: 5b2124b99fee059d3e8ce71e8ac646a9faed84d5 Parents: 87ce36b Author: sergey <[email protected]> Authored: Tue Jul 31 14:14:18 2018 -0700 Committer: sergey <[email protected]> Committed: Tue Jul 31 14:14:18 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/BuddyAllocator.java | 17 +++-- .../llap/cache/LowLevelCacheMemoryManager.java | 51 +++++++------ .../hadoop/hive/llap/cache/MemoryManager.java | 4 +- .../llap/io/encoded/OrcEncodedDataReader.java | 14 ++-- .../llap/io/encoded/SerDeEncodedDataReader.java | 34 ++++++--- .../hive/llap/io/metadata/MetadataCache.java | 76 ++++++++++++-------- .../hive/llap/cache/TestBuddyAllocator.java | 3 +- .../llap/cache/TestLowLevelLrfuCachePolicy.java | 12 ++-- .../hive/llap/cache/TestOrcMetadataCache.java | 19 +++-- .../hadoop/hive/llap/LlapCacheAwareFs.java | 2 + .../hive/ql/io/orc/encoded/EncodedReader.java | 3 + .../ql/io/orc/encoded/EncodedReaderImpl.java | 31 +++++--- .../ql/io/orc/encoded/StoppableAllocator.java | 30 ++++++++ .../vector/VectorizedParquetRecordReader.java | 5 +- .../hive/common/io/FileMetadataCache.java | 17 +++-- 15 files changed, 223 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index e3ce2e7..a27964f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -45,9 +46,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public final class BuddyAllocator - implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { + implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -224,16 +226,23 @@ public final class BuddyAllocator return (int)arenaSizeVal; } + + @VisibleForTesting @Override public void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException { - allocateMultiple(dest, size, null); + allocateMultiple(dest, size, null, null); } - // TODO: would it make sense to return buffers asynchronously? @Override public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory) throws AllocatorOutOfMemoryException { + allocateMultiple(dest, size, factory, null); + } + + @Override + public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException { assert size > 0 : "size is " + size; if (size > maxAllocation) { throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); @@ -243,7 +252,7 @@ public final class BuddyAllocator int allocationSize = 1 << allocLog2; // If using async, we could also reserve one by one. - memoryManager.reserveMemory(dest.length << allocLog2); + memoryManager.reserveMemory(dest.length << allocLog2, isStopped); for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; // Note: this is backward compat only. Should be removed with createUnallocated. http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 4297cfc..c5b5bf2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.llap.cache; -import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import com.google.common.annotations.VisibleForTesting; + /** * Implementation of memory manager for low level cache. Note that memory is released during * reserve most of the time, by calling the evictor to evict some memory. releaseMemory is @@ -49,21 +49,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager { } } + public static class ReserveFailedException extends RuntimeException { + private static final long serialVersionUID = 1L; + public ReserveFailedException(AtomicBoolean isStopped) { + super("Cannot reserve memory" + + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "") + + ((isStopped != null && isStopped.get()) ? "; thread stopped" : "")); + } + } @Override - public void reserveMemory(final long memoryToReserve) { - boolean result = reserveMemory(memoryToReserve, true); + public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { + boolean result = reserveMemory(memoryToReserve, true, isStopped); if (result) return; // Can only happen if there's no evictor, or if thread is interrupted. - throw new RuntimeException("Cannot reserve memory" - + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")); + throw new ReserveFailedException(isStopped); } @VisibleForTesting - public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { + public boolean reserveMemory(final long memoryToReserve, + boolean waitForEviction, AtomicBoolean isStopped) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; - int nextLog = 4; long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve; boolean result = true; while (remainingToReserve > 0) { @@ -79,21 +86,23 @@ public class LowLevelCacheMemoryManager implements MemoryManager { // TODO: for one-block case, we could move notification for the last block out of the loop. long evicted = evictor.evictSomeBlocks(remainingToReserve); if (evicted == 0) { + ++badCallCount; if (!waitForEviction) { result = false; break; } - ++badCallCount; - if (badCallCount == nextLog) { - LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?"); - nextLog <<= 1; - try { - Thread.sleep(Math.min(1000, nextLog)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - result = false; - break; - } + + if (isStopped != null && isStopped.get()) { + result = false; + break; + } + try { + Thread.sleep(badCallCount > 9 ? 1000 : (1 << badCallCount)); + } catch (InterruptedException e) { + LlapIoImpl.LOG.warn("Thread interrupted"); // We currently don't expect this. + Thread.currentThread().interrupt(); + result = false; + break; } continue; } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 542041d..fedade5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; + public interface MemoryManager { void releaseMemory(long memUsage); void updateMaxSize(long maxSize); - void reserveMemory(long memoryToReserve); + void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index b76b0de..e8a3b40 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -173,7 +174,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> * Contains only stripes that are read, and only columns included. null => read all RGs. */ private boolean[][] stripeRgs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); @SuppressWarnings("unused") private volatile boolean isPaused = false; @@ -240,7 +241,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public void stop() { LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -436,6 +437,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> stripeReader = orcReader.encodedReader( fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + stripeReader.setStopped(isStopped); } private void recordReaderTime(long startTime) { @@ -454,7 +456,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LOG.info("Encoded data reader is stopping"); tracePool.offer(trace); cleanupReaders(); @@ -584,7 +586,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> ensureOrcReader(); ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter(); if (hasCache) { - tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag); + tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag, isStopped); metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer. } FileTail ft = orcReader.getFileTail(); @@ -677,7 +679,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( - stripeKey, footerRange.getData().duplicate(), cacheTag); + stripeKey, footerRange.getData().duplicate(), cacheTag, isStopped); metadataCache.decRefBuffer(cacheBuf); // We don't use this one. } ByteBuffer bb = footerRange.getData().duplicate(); @@ -918,7 +920,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag); } else if (metadataCache != null) { - metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); + metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped); } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 5b54af5..2576175 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -150,7 +152,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final String cacheTag; private final FileSystem fs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); private final Deserializer sourceSerDe; private final InputFormat<?, ?> sourceInputFormat; private final Reporter reporter; @@ -245,7 +247,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> @Override public void stop() { LlapIoImpl.LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -344,16 +346,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final Map<StreamName, OutputReceiver> streams = new HashMap<>(); private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>(); private final boolean doesSourceHaveIncludes; + private final AtomicBoolean isStopped; public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds, boolean[] writerIncludes, boolean doesSourceHaveIncludes, - Allocator.BufferObjectFactory bufferFactory) { + Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) { this.bufferManager = bufferManager; assert writerIncludes != null; // Taken care of on higher level. this.writerIncludes = writerIncludes; this.doesSourceHaveIncludes = doesSourceHaveIncludes; this.columnIds = columnIds; this.bufferFactory = bufferFactory; + this.isStopped = isStopped; startStripe(); } @@ -440,7 +444,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating cache receiver for " + name); } - CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name); + CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, bufferFactory, isStopped); or = cor; List<CacheOutputReceiver> list = colStreams.get(name.getColumn()); if (list == null) { @@ -597,12 +601,17 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private List<MemoryBuffer> buffers = null; private int lastBufferPos = -1; private boolean suppressed = false; + private final AtomicBoolean isStopped; + private final StoppableAllocator allocator; public CacheOutputReceiver(BufferUsageManager bufferManager, - BufferObjectFactory bufferFactory, StreamName name) { + StreamName name, BufferObjectFactory bufferFactory, AtomicBoolean isStopped) { this.bufferManager = bufferManager; this.bufferFactory = bufferFactory; + Allocator alloc = bufferManager.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.name = name; + this.isStopped = isStopped; } public void clear() { @@ -617,6 +626,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> lastBufferPos = -1; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, bufferFactory, isStopped); + } else { + bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory); + } + } + + @Override public void output(ByteBuffer buffer) throws IOException { // TODO: avoid put() by working directly in OutStream? @@ -640,7 +658,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> boolean isNewBuffer = (lastBufferPos == -1); if (isNewBuffer) { MemoryBuffer[] dest = new MemoryBuffer[1]; - bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory); + allocateMultiple(dest, size); LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0]; bb = newBuffer.getByteBufferRaw(); lastBufferPos = bb.position(); @@ -1417,7 +1435,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> // TODO: move this into ctor? EW would need to create CacheWriter then List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds; writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes, - writer.isOnlyWritingIncludedColumns(), bufferFactory), daemonConf, split.getPath()); + writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath()); if (writer instanceof VectorDeserializeOrcWriter) { VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer; asyncWriter.startAsync(new AsyncCacheDataCallback()); @@ -1673,7 +1691,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LlapIoImpl.LOG.info("SerDe-based data reader is stopping"); cleanup(true); return true; http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java index 426d599..2b3bca6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata = @@ -51,10 +54,10 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors; private final MemoryManager memoryManager; private final LowLevelCachePolicy policy; - private final EvictionAwareAllocator allocator; + private final BuddyAllocator allocator; private final LlapDaemonCacheMetrics metrics; - public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManager, + public MetadataCache(BuddyAllocator allocator, MemoryManager memoryManager, LowLevelCachePolicy policy, boolean useEstimateCache, LlapDaemonCacheMetrics metrics) { this.memoryManager = memoryManager; this.allocator = allocator; @@ -64,7 +67,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null; } - public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) { if (estimateErrors == null) return; OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); boolean isNew = false; @@ -76,7 +79,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { errorData.addError(range.getOffset(), range.getLength(), baseOffset); } long memUsage = errorData.estimateMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); if (old != null) { errorData = old; @@ -150,34 +153,49 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { } @Override - public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) { - return putInternal(fileKey, tailBuffer, null); + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer) { + return putInternal(fileKey, tailBuffer, null, null); } @Override - public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) { - return putInternal(fileKey, tailBuffer, tag); + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer, String tag) { + return putInternal(fileKey, tailBuffer, tag, null); + } + + @Override + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is) throws IOException { + return putFileMetadata(fileKey, length, is, null, null); } public LlapBufferOrBuffers putStripeTail( - OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) { - return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag); + OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped); } @Override - public LlapBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is) throws IOException { - return putFileMetadata(fileKey, length, is, null); + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is, String tag) throws IOException { + return putFileMetadata(fileKey, length, is, tag, null); + } + + + @Override + public LlapBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + return putInternal(fileKey, tailBuffer, tag, isStopped); } @Override - public LlapBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is, String tag) throws IOException { + public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is, + String tag, AtomicBoolean isStopped) throws IOException { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(fileKey); if (oldVal == null) { - result = wrapBbForFile(result, fileKey, length, is, tag); + result = wrapBbForFile(result, fileKey, length, is, tag, isStopped); if (!lockBuffer(result, false)) { throw new AssertionError("Cannot lock a newly created value " + result); } @@ -198,7 +216,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { @SuppressWarnings({ "rawtypes", "unchecked" }) private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, - Object fileKey, int length, InputStream stream, String tag) throws IOException { + Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException { if (result != null) return result; int maxAlloc = allocator.getMaxAllocation(); LlapMetadataBuffer<Object>[] largeBuffers = null; @@ -207,7 +225,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { for (int i = 0; i < largeBuffers.length; ++i) { largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey, tag); } - allocator.allocateMultiple(largeBuffers, maxAlloc, null); + allocator.allocateMultiple(largeBuffers, maxAlloc, null, isStopped); for (int i = 0; i < largeBuffers.length; ++i) { readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]); } @@ -218,7 +236,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { } else { LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1]; smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag); - allocator.allocateMultiple(smallBuffer, length, null); + allocator.allocateMultiple(smallBuffer, length, null, isStopped); readIntoCacheBuffer(stream, smallSize, smallBuffer[0]); if (largeBuffers == null) { return smallBuffer[0]; // This is the overwhelmingly common case. @@ -243,12 +261,12 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { bb.position(pos); } - private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) { + private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(key); if (oldVal == null) { - result = wrapBb(result, key, tailBuffer, tag); + result = wrapBb(result, key, tailBuffer, tag, isStopped); oldVal = metadata.putIfAbsent(key, result); if (oldVal == null) { cacheInPolicy(result); // Cached successfully, add to policy. @@ -306,11 +324,11 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { } private <T> LlapBufferOrBuffers wrapBb( - LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) { + LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { if (result != null) return result; if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { // The common case by far. - return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer); + return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer, isStopped); } else { int allocCount = determineAllocCount(tailBuffer); @SuppressWarnings("unchecked") @@ -318,22 +336,24 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { for (int i = 0; i < allocCount; ++i) { results[i] = new LlapMetadataBuffer<T>(key, tag); } - wrapLargeBb(results, tailBuffer); + wrapLargeBb(results, tailBuffer, isStopped); return new LlapMetadataBuffers<T>(results); } } - private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer) { + private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer, + AtomicBoolean isStopped) { // Note: we pass in null factory because we allocate objects here. We could also pass a // per-call factory that would set fileKey; or set it after put. - allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null); + allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null, isStopped); return putBufferToDest(tailBuffer.duplicate(), result); } - private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer) { + private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer, + AtomicBoolean isStopped) { // Note: we pass in null factory because we allocate objects here. We could also pass a // per-call factory that would set fileKey; or set it after put. - allocator.allocateMultiple(results, allocator.getMaxAllocation(), null); + allocator.allocateMultiple(results, allocator.getMaxAllocation(), null, isStopped); ByteBuffer src = tailBuffer.duplicate(); int pos = src.position(), remaining = src.remaining(); for (int i = 0; i < results.length; ++i) { http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 1e6f3ac..b3179c0 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; @@ -58,7 +59,7 @@ public class TestBuddyAllocator { static class DummyMemoryManager implements MemoryManager { @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 6eb2eb5..923042d 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -85,7 +85,7 @@ public class TestLowLevelLrfuCachePolicy { listLock.unlock(); } // Now try to evict with locked buffer still in the list. - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); assertSame(buffer2, et.evicted.get(0)); unlock(lrfu, buffer1); } @@ -237,7 +237,7 @@ public class TestLowLevelLrfuCachePolicy { // Lock the lowest priority buffer; try to evict - we'll evict some other buffer. LlapDataBuffer locked = inserted.get(0); lock(lrfu, locked); - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); LlapDataBuffer evicted = et.evicted.get(0); assertNotNull(evicted); assertTrue(evicted.isInvalid()); @@ -248,7 +248,7 @@ public class TestLowLevelLrfuCachePolicy { // Buffers in test are fakes not linked to cache; notify cache policy explicitly. public boolean cache(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) { - if (mm != null && !mm.reserveMemory(1, false)) { + if (mm != null && !mm.reserveMemory(1, false, null)) { return false; } buffer.incRef(); @@ -337,7 +337,7 @@ public class TestLowLevelLrfuCachePolicy { lock(lrfu, buf); } assertEquals(heapSize, m.cacheUsed.get()); - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (!et.evicted.isEmpty()) { assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); } @@ -362,13 +362,13 @@ public class TestLowLevelLrfuCachePolicy { // Evict all blocks. et.evicted.clear(); for (int i = 0; i < inserted.size(); ++i) { - assertTrue(mm.reserveMemory(1, false)); + assertTrue(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } } // The map should now be empty. - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index df20f20..aa9d6ed 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -21,10 +21,14 @@ import static org.junit.Assert.*; import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; @@ -75,8 +79,11 @@ public class TestOrcMetadataCache { } private static class DummyMemoryManager implements MemoryManager { + private int allocs; + @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { + ++allocs; } @Override @@ -102,11 +109,11 @@ public class TestOrcMetadataCache { ByteBuffer smallBuffer = ByteBuffer.allocate(MAX_ALLOC - 1); rdm.nextBytes(smallBuffer.array()); - LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer); + LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer, null, null); cache.decRefBuffer(result); ByteBuffer cacheBuf = result.getSingleBuffer().getByteBufferDup(); assertEquals(smallBuffer, cacheBuf); - result = cache.putFileMetadata(fileKey1, smallBuffer); + result = cache.putFileMetadata(fileKey1, smallBuffer, null, null); cache.decRefBuffer(result); cacheBuf = result.getSingleBuffer().getByteBufferDup(); assertEquals(smallBuffer, cacheBuf); @@ -120,7 +127,7 @@ public class TestOrcMetadataCache { ByteBuffer largeBuffer = ByteBuffer.allocate((int)(MAX_ALLOC * 2.5)); rdm.nextBytes(largeBuffer.array()); - result = cache.putFileMetadata(fileKey1, largeBuffer); + result = cache.putFileMetadata(fileKey1, largeBuffer, null, null); cache.decRefBuffer(result); assertNull(result.getSingleBuffer()); assertEquals(largeBuffer, extractResultBbs(result)); @@ -162,13 +169,13 @@ public class TestOrcMetadataCache { Object fileKey1 = new Object(); // Note: incomplete CBs are always an exact match. - cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0); + cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0, null); cp.verifyEquals(1); DiskRangeList result = cache.getIncompleteCbs( fileKey1, new DiskRangeList(0, 3), 0, gotAllData); assertTrue(gotAllData.value); verifyResult(result, INCOMPLETE, 0, 3); - cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0); + cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0, null); cp.verifyEquals(3); DiskRangeList ranges = new DiskRangeList(0, 3); ranges.insertAfter(new DiskRangeList(4, 6)); http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index dcb24b80..8370aa6 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -284,6 +284,7 @@ public class LlapCacheAwareFs extends FileSystem { int extraOffsetInChunk = 0; if (maxAlloc < chunkLength) { largeBuffers = new MemoryBuffer[largeBufCount]; + // Note: we don't use StoppableAllocator here - this is not on an IO thread. allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory()); for (int i = 0; i < largeBuffers.length; ++i) { // By definition here we copy up to the limit of the buffer. @@ -301,6 +302,7 @@ public class LlapCacheAwareFs extends FileSystem { largeBuffers = null; if (smallSize > 0) { smallBuffer = new MemoryBuffer[1]; + // Note: we don't use StoppableAllocator here - this is not on an IO thread. allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory()); ByteBuffer bb = smallBuffer[0].getByteBufferRaw(); copyDiskDataToCacheBuffer(array, http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index f6b949e..f3699f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; @@ -68,4 +69,6 @@ public interface EncodedReader { void readIndexStreams(OrcIndex index, StripeInformation stripe, List<OrcProto.Stream> streams, boolean[] included, boolean[] sargColumns) throws IOException; + + void setStopped(AtomicBoolean isStopped); } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 63d1387..1b11e0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -25,11 +25,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -146,6 +148,8 @@ class EncodedReaderImpl implements EncodedReader { private final TypeDescription fileSchema; private final WriterVersion version; private final String tag; + private AtomicBoolean isStopped; + private StoppableAllocator allocator; public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, @@ -162,6 +166,8 @@ class EncodedReaderImpl implements EncodedReader { this.bufferSize = bufferSize; this.rowIndexStride = strideRate; this.cacheWrapper = cacheWrapper; + Allocator alloc = cacheWrapper.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.dataReader = dataReader; this.trace = trace; this.tag = tag; @@ -897,8 +903,7 @@ class EncodedReaderImpl implements EncodedReader { } boolean isAllocated = false; try { - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, - cacheWrapper.getDataBufferFactory()); + allocateMultiple(targetBuffers, bufferSize); isAllocated = true; } finally { // toDecompress/targetBuffers contents are actually already added to some structures that @@ -1206,8 +1211,7 @@ class EncodedReaderImpl implements EncodedReader { cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, - (int)(partCount == 1 ? streamLen : partSize), cacheWrapper.getDataBufferFactory()); + allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); // 4. Now copy the data into cache buffers. ix = 0; @@ -1260,8 +1264,7 @@ class EncodedReaderImpl implements EncodedReader { // non-cached. Since we are at the first gap, the previous stuff must be contiguous. singleAlloc[0] = null; trace.logPartialUncompressedData(partOffset, candidateEnd, true); - cacheWrapper.getAllocator().allocateMultiple( - singleAlloc, (int)(candidateEnd - partOffset), cacheWrapper.getDataBufferFactory()); + allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1270,12 +1273,19 @@ class EncodedReaderImpl implements EncodedReader { return tcc; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped); + } else { + cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory()); + } + } + private CacheChunk copyAndReplaceUncompressedToNonCached( BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { singleAlloc[0] = null; trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false); - cacheWrapper.getAllocator().allocateMultiple( - singleAlloc, bc.getLength(), cacheWrapper.getDataBufferFactory()); + allocateMultiple(singleAlloc, bc.getLength()); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -2152,4 +2162,9 @@ class EncodedReaderImpl implements EncodedReader { return false; } } + + @Override + public void setStopped(AtomicBoolean isStopped) { + this.isStopped = isStopped; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java new file mode 100644 index 0000000..0806d78 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; + +public interface StoppableAllocator extends Allocator { + /** Stoppable allocate method specific to branch-2. */ + void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, + AtomicBoolean isStopped) throws AllocatorOutOfMemoryException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index f64efe2..8c49056 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -253,7 +253,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration); requestedSchema = DataWritableReadSupport .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); - + Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); @@ -317,7 +317,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase if (LOG.isInfoEnabled()) { LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey); } - footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag); + // Note: we don't pass in isStopped here - this is not on an IO thread. + footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null); try { return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java index d1da7f5..e4aa888 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.common.io; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.io.IOException; import java.io.InputStream; @@ -39,6 +40,13 @@ public interface FileMetadataCache { @Deprecated MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer); + @Deprecated + MemoryBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is, String tag) throws IOException; + + @Deprecated + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); + /** * Releases the buffer returned from getFileMetadata or putFileMetadata method. * @param buffer The buffer to release. @@ -54,8 +62,9 @@ public interface FileMetadataCache { * @return The buffer or buffers representing the cached footer. * The caller must decref this buffer when done. */ - MemoryBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is, String tag) throws IOException; + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, + String tag, AtomicBoolean isStopped); - MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); -} \ No newline at end of file + MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is, String tag, AtomicBoolean isStopped) throws IOException; +}
