HIVE-14621 : LLAP: memory.mode = none has NPE (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/406e935f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/406e935f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/406e935f Branch: refs/heads/hive-14535 Commit: 406e935f27f60bb01c53d54bdb2c91429c95207e Parents: 0705323 Author: Sergey Shelukhin <[email protected]> Authored: Mon Aug 29 15:37:36 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Aug 29 15:37:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 ++-- .../hive/llap/cache/EvictionDispatcher.java | 4 +-- .../hadoop/hive/llap/cache/LowLevelCache.java | 3 ++ .../hive/llap/cache/LowLevelCacheImpl.java | 5 +-- .../hadoop/hive/llap/cache/SimpleAllocator.java | 1 + .../hive/llap/cache/SimpleBufferManager.java | 33 ++++++++++++++++-- .../hive/llap/io/api/impl/LlapIoImpl.java | 36 +++++++++----------- .../llap/io/decode/OrcColumnVectorProducer.java | 3 +- .../llap/io/encoded/OrcEncodedDataReader.java | 11 +++--- .../resources/llap-daemon-log4j2.properties | 7 ++-- 10 files changed, 71 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 14a538b..cb0d96f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2733,10 +2733,10 @@ public class HiveConf extends Configuration { "Whether the LLAP IO layer is enabled for non-vectorized queries that read inputs\n" + "that can be vectorized"), LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache", - new StringSet("cache", "allocator", "none"), + new StringSet("cache", "none"), "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" + - "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" + - "'none' doesn't use either (this mode may result in significant performance degradation)"), + "custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" + + "significant performance degradation)"), LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(), "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" + "padded to minimum allocation. For ORC, should generally be the same as the expected\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java index 91932e2..b6fd3e3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java @@ -26,10 +26,10 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches. */ public final class EvictionDispatcher implements EvictionListener { - private final LowLevelCacheImpl dataCache; + private final LowLevelCache dataCache; private final OrcMetadataCache metadataCache; - public EvictionDispatcher(LowLevelCacheImpl dataCache, OrcMetadataCache metadataCache) { + public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) { this.dataCache = dataCache; this.metadataCache = metadataCache; } http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java index 1b61a6e..19c589a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java @@ -59,4 +59,7 @@ public interface LowLevelCache { */ long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks, long baseOffset, Priority priority, LowLevelCacheCounters qfCounters); + + /** Notifies the cache that a particular buffer should be removed due to eviction. */ + void notifyEvicted(MemoryBuffer buffer); } http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index 8bc675d..ea458ca 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -67,7 +67,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla this.doAssumeGranularBlocks = doAssumeGranularBlocks; } - public void init() { + public void startThreads() { if (cleanupInterval < 0) return; cleanupThread = new CleanupThread(cleanupInterval); cleanupThread.start(); @@ -368,7 +368,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla return fake; } - public final void notifyEvicted(LlapDataBuffer buffer) { + @Override + public final void notifyEvicted(MemoryBuffer buffer) { allocator.deallocateEvicted(buffer); newEvictions.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java index 526ff22..d8f59d1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java @@ -68,6 +68,7 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean { LlapDataBuffer buf = (LlapDataBuffer)buffer; ByteBuffer bb = buf.byteBuffer; buf.byteBuffer = null; + if (!bb.isDirect()) return; Field field = cleanerField; if (field == null) return; try { http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index b188c0e..d1eee04 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -20,12 +20,15 @@ package org.apache.hadoop.hive.llap.cache; import java.util.List; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; +import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; -public class SimpleBufferManager implements BufferUsageManager { +public class SimpleBufferManager implements BufferUsageManager, LowLevelCache { private final Allocator allocator; private final LlapDaemonCacheMetrics metrics; @@ -73,4 +76,30 @@ public class SimpleBufferManager implements BufferUsageManager { public Allocator getAllocator() { return allocator; } + + @Override + public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, + DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) { + return range; // Nothing changes - no cache. + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] chunks, long baseOffset, Priority priority, + LowLevelCacheCounters qfCounters) { + for (int i = 0; i < chunks.length; ++i) { + LlapDataBuffer buffer = (LlapDataBuffer)chunks[i]; + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time (no cache)", buffer); + } + boolean canLock = lockBuffer(buffer); + assert canLock; + } + return null; + } + + @Override + public void notifyEvicted(MemoryBuffer buffer) { + throw new UnsupportedOperationException("Buffer manager doesn't have cache"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 9deef0c..8048624 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager; import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; @@ -71,7 +72,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache"); public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking"); - private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator"; + private static final String MODE_CACHE = "cache"; private final ColumnVectorProducer cvp; private final ExecutorService executor; @@ -82,9 +83,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { private LlapIoImpl(Configuration conf) throws IOException { String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); - boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode), - useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode); - LOG.info("Initializing LLAP IO in {} mode", ioMode); + boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode); + LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none"); String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); String sessionId = conf.get("llap.daemon.metrics.sessionid"); this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId); @@ -109,7 +109,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { sessionId); OrcMetadataCache metadataCache = null; - LowLevelCacheImpl orcCache = null; + LowLevelCache cache = null; BufferUsageManager bufferManager = null; if (useLowLevelCache) { // Memory manager uses cache policy to trigger evictions, so create the policy first. @@ -122,23 +122,21 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { // Cache uses allocator to allocate and deallocate, create allocator and then caches. EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics); this.allocator = allocator; - orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true); + LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl( + cacheMetrics, cachePolicy, allocator, true); + cache = cacheImpl; boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache); // And finally cache policy uses cache to notify it of eviction. The cycle is complete! - cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache)); - cachePolicy.setParentDebugDumper(orcCache); - orcCache.init(); // Start the cache threads. - bufferManager = orcCache; // Cache also serves as buffer manager. + cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache)); + cachePolicy.setParentDebugDumper(cacheImpl); + cacheImpl.startThreads(); // Start the cache threads. + bufferManager = cacheImpl; // Cache also serves as buffer manager. } else { - if (useAllocOnly) { - LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( - conf, null, cacheMetrics); - allocator = new BuddyAllocator(conf, memManager, cacheMetrics); - } else { - allocator = new SimpleAllocator(conf); - } - bufferManager = new SimpleBufferManager(allocator, cacheMetrics); + this.allocator = new SimpleAllocator(conf); + SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics); + bufferManager = sbm; + cache = sbm; } // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); @@ -148,7 +146,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer( - metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics); + metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); LOG.info("LLAP IO initialized"); registerMXBeans(); http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 0a8e3df..12275ac 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache; -import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -48,7 +47,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer { private LlapDaemonIOMetrics ioMetrics; public OrcColumnVectorProducer(OrcMetadataCache metadataCache, - LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, + LowLevelCache lowLevelCache, BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) { LlapIoImpl.LOG.info("Initializing ORC column vector producer"); http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 0d212ec..eb8ee6c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -835,12 +835,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { - DiskRangeList result = (lowLevelCache == null) ? range - : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData); + DiskRangeList result = lowLevelCache.getFileData( + fileKey, range, baseOffset, factory, counters, gotAllData); if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey - + ", base offset " + baseOffset + "): " - + RecordReaderUtils.stringifyDiskRanges(range.next)); + LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey + + ", base offset " + baseOffset + "): " + RecordReaderUtils.stringifyDiskRanges(range)); } if (gotAllData.value) return result; return (metadataCache == null) ? range @@ -851,7 +850,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset) { if (data != null) { - return (lowLevelCache == null) ? null : lowLevelCache.putFileData( + return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); } else if (metadataCache != null) { metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/main/resources/llap-daemon-log4j2.properties ---------------------------------------------------------------------- diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index 422a92a..0c953d1 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -100,7 +100,10 @@ appender.query-routing.routes.route-mdc.file-mdc.app.layout.type = PatternLayout appender.query-routing.routes.route-mdc.file-mdc.app.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n # list of all loggers -loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking +loggers = EncodedReader, NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking + +logger.EncodedReader.name = org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl +logger.EncodedReader.level = INFO logger.LlapIoImpl.name = LlapIoImpl logger.LlapIoImpl.level = INFO @@ -109,7 +112,7 @@ logger.LlapIoOrc.name = LlapIoOrc logger.LlapIoOrc.level = WARN logger.LlapIoCache.name = LlapIoCache -logger.LlapIOCache.level = WARN +logger.LlapIoCache.level = WARN logger.LlapIoLocking.name = LlapIoLocking logger.LlapIoLocking.level = WARN
