HIVE-17006 : LLAP: Parquet caching v1 (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e673a73 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e673a73 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e673a73 Branch: refs/heads/master Commit: 9e673a73d28ef047c5d4299b415c306423f68c71 Parents: 642acdf Author: sergey <[email protected]> Authored: Thu Aug 31 14:21:04 2017 -0700 Committer: sergey <[email protected]> Committed: Thu Aug 31 14:21:04 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 35 + .../test/resources/testconfiguration.properties | 1 + .../apache/hadoop/hive/llap/io/api/LlapIo.java | 4 +- .../hive/llap/cache/EvictionDispatcher.java | 13 +- .../hive/llap/cache/LlapAllocatorBuffer.java | 8 +- .../hive/llap/io/api/impl/LlapIoImpl.java | 99 ++- .../io/metadata/ParquetMetadataCacheImpl.java | 353 ++++++++ .../hadoop/hive/llap/LlapCacheAwareFs.java | 422 +++++++++ .../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 3 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 103 ++- .../io/LlapCacheOnlyInputFormatInterface.java | 28 + .../ql/io/parquet/MapredParquetInputFormat.java | 12 +- .../parquet/VectorizedParquetInputFormat.java | 22 +- .../vector/ParquetFooterInputFromCache.java | 196 +++++ .../vector/VectorizedParquetRecordReader.java | 169 +++- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 28 +- .../clientpositive/parquet_ppd_decimal.q | 1 + .../clientpositive/parquet_predicate_pushdown.q | 1 + .../test/queries/clientpositive/parquet_types.q | 1 + .../parquet_types_vectorization.q | 1 + .../queries/clientpositive/vectorized_parquet.q | 1 + .../clientpositive/vectorized_parquet_types.q | 1 + .../llap/parquet_predicate_pushdown.q.out | 18 +- .../llap/parquet_types_vectorization.q.out | 849 +++++++++++++++++++ .../llap/vector_partitioned_date_time.q.out | 16 +- .../llap/vectorized_parquet.q.out | 2 +- .../llap/vectorized_parquet_types.q.out | 2 +- .../hive/common/io/FileMetadataCache.java | 51 ++ .../io/encoded/MemoryBufferOrBuffers.java | 24 + 29 files changed, 2360 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index e784797..0feff59 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.common; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -1033,4 +1036,36 @@ public final class FileUtils { return result; } + /** + * Reads length bytes of data from the stream into the byte buffer. + * @param stream Stream to read from. + * @param length The number of bytes to read. + * @param bb The buffer to read into; the data is written at current position and then the + * position is incremented by length. + * @throws EOFException the length bytes cannot be read. The buffer position is not modified. + */ + public static void readFully(InputStream stream, int length, ByteBuffer bb) throws IOException { + byte[] b = null; + int offset = 0; + if (bb.hasArray()) { + b = bb.array(); + offset = bb.arrayOffset() + bb.position(); + } else { + b = new byte[bb.remaining()]; + } + int fullLen = length; + while (length > 0) { + int result = stream.read(b, offset, length); + if (result < 0) { + throw new EOFException("Reading " + fullLen + " bytes"); + } + offset += result; + length -= result; + } + if (!bb.hasArray()) { + bb.put(b); + } else { + bb.position(bb.position() + fullLen); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 6f2efa7..f452341 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -75,6 +75,7 @@ minillap.shared.query.files=insert_into1.q,\ orc_merge4.q,\ orc_merge_diff_fs.q,\ parallel_colstats.q,\ + parquet_types_vectorization.q,\ unionDistinct_1.q,\ union_type_chk.q,\ cte_2.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index 42129b7..d2f9e6c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; public interface LlapIo<T> { - InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde); + InputFormat<NullWritable, T> getInputFormat( + InputFormat<?, ?> sourceInputFormat, Deserializer serde); void close(); String getMemoryInfo(); + void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat); } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java index 0cbc8f6..c5248ce 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl; +import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFileMetadataBuffer; /** * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches. @@ -31,11 +33,15 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD private final SerDeLowLevelCacheImpl serdeCache; private final OrcMetadataCache metadataCache; private final EvictionAwareAllocator allocator; + // TODO# temporary, will be merged with OrcMetadataCache after HIVE-15665. + private final ParquetMetadataCacheImpl parquetMetadataCache; public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache, - OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) { + OrcMetadataCache metadataCache, EvictionAwareAllocator allocator, + ParquetMetadataCacheImpl parquetMetadataCache) { this.dataCache = dataCache; this.metadataCache = metadataCache; + this.parquetMetadataCache = parquetMetadataCache; this.serdeCache = serdeCache; this.allocator = allocator; } @@ -45,10 +51,13 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD buffer.notifyEvicted(this); // This will call one of the specific notifyEvicted overloads. } + public void notifyEvicted(LlapFileMetadataBuffer buffer) { + this.parquetMetadataCache.notifyEvicted(buffer); + } + public void notifyEvicted(LlapSerDeDataBuffer buffer) { serdeCache.notifyEvicted(buffer); allocator.deallocateEvicted(buffer); - } public void notifyEvicted(LlapDataBuffer buffer) { http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java index 52144c2..8f57a04 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java @@ -131,15 +131,17 @@ public abstract class LlapAllocatorBuffer extends LlapCacheableBuffer implements long newState, oldState; do { oldState = state.get(); + // We have to check it here since invalid decref will overflow. + int oldRefCount = State.getRefCount(oldState); + if (oldRefCount == 0) { + throw new AssertionError("Invalid decRef when refCount is 0: " + this); + } newState = State.decRefCount(oldState); } while (!state.compareAndSet(oldState, newState)); int newRefCount = State.getRefCount(newState); if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount); } - if (newRefCount < 0) { - throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this); - } return newRefCount; } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 35b9d1f..f42622b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -34,11 +34,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.FileMetadataCache; +import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump; import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; @@ -49,24 +56,30 @@ import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; import org.apache.hadoop.hive.llap.cache.SimpleAllocator; import org.apache.hadoop.hive.llap.cache.SimpleBufferManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hive.common.util.FixedSizedObjectPool; + import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -86,8 +99,13 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { private ObjectName buddyAllocatorMXBean; private final Allocator allocator; private final LlapOomDebugDump memoryDump; + private final FileMetadataCache fileMetadataCache; + private final LowLevelCache dataCache; + private final BufferUsageManager bufferManager; + private final Configuration daemonConf; private LlapIoImpl(Configuration conf) throws IOException { + this.daemonConf = conf; String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode); LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none"); @@ -115,7 +133,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { sessionId); OrcMetadataCache metadataCache = null; - LowLevelCache cache = null; SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null; boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED); @@ -154,16 +171,22 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { this.memoryDump = allocator; LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl( cacheMetrics, cachePolicy, allocator, true); - cache = cacheImpl; + dataCache = cacheImpl; if (isEncodeEnabled) { SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl( cacheMetrics, cachePolicy, allocator); serdeCache = serdeCacheImpl; } + boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache); + // TODO# temporary, see comments there + ParquetMetadataCacheImpl parquetMc = new ParquetMetadataCacheImpl( + allocator, memManager, cachePolicy, cacheMetrics); + fileMetadataCache = parquetMc; // And finally cache policy uses cache to notify it of eviction. The cycle is complete! - EvictionDispatcher e = new EvictionDispatcher(cache, serdeCache, metadataCache, allocator); + EvictionDispatcher e = new EvictionDispatcher( + dataCache, serdeCache, metadataCache, allocator, parquetMc); if (isSplitCache) { metaCachePolicy.setEvictionListener(e); metaCachePolicy.setParentDebugDumper(e); @@ -172,14 +195,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { cachePolicy.setParentDebugDumper(e); cacheImpl.startThreads(); // Start the cache threads. - bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager. + bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager. bufferManagerGeneric = serdeCache; } else { this.allocator = new SimpleAllocator(conf); memoryDump = null; + fileMetadataCache = null; SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics); - bufferManagerOrc = bufferManagerGeneric = sbm; - cache = sbm; + bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm; + dataCache = sbm; } // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); @@ -190,7 +214,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf); // TODO: this should depends on input format and be in a map, or something. this.orcCvp = new OrcColumnVectorProducer( - metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool); + metadataCache, dataCache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool); this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null; LOG.info("LLAP IO initialized"); @@ -210,10 +234,9 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { return sb.toString(); } - @SuppressWarnings("rawtypes") @Override public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat( - InputFormat sourceInputFormat, Deserializer sourceSerDe) { + InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe) { ColumnVectorProducer cvp = genericCvp; if (sourceInputFormat instanceof OrcInputFormat) { cvp = orcCvp; // Special-case for ORC. @@ -233,4 +256,62 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { } executor.shutdownNow(); } + + + @Override + public void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat) { + LlapCacheOnlyInputFormatInterface cacheIf = (LlapCacheOnlyInputFormatInterface)inputFormat; + cacheIf.injectCaches(fileMetadataCache, + new GenericDataCache(dataCache, bufferManager), daemonConf); + } + + private class GenericDataCache implements DataCache, BufferObjectFactory { + private final LowLevelCache lowLevelCache; + private final BufferUsageManager bufferManager; + + public GenericDataCache(LowLevelCache lowLevelCache, BufferUsageManager bufferManager) { + this.lowLevelCache = lowLevelCache; + this.bufferManager = bufferManager; + } + + @Override + public DiskRangeList getFileData(Object fileKey, DiskRangeList range, + long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { + // TODO: we currently pass null counters because this doesn't use LlapRecordReader. + // Create counters for non-elevator-using fragments also? + return lowLevelCache.getFileData(fileKey, range, baseOffset, factory, null, gotAllData); + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset) { + return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null); + } + + @Override + public void releaseBuffer(MemoryBuffer buffer) { + bufferManager.decRefBuffer(buffer); + } + + @Override + public void reuseBuffer(MemoryBuffer buffer) { + boolean isReused = bufferManager.incRefBuffer(buffer); + assert isReused; + } + + @Override + public Allocator getAllocator() { + return bufferManager.getAllocator(); + } + + @Override + public BufferObjectFactory getDataBufferFactory() { + return this; + } + + @Override + public MemoryBuffer create() { + return new LlapDataBuffer(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java new file mode 100644 index 0000000..b61a8ca --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.metadata; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.io.FileMetadataCache; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; +import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; +import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; +import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump; +import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; +import org.apache.hadoop.hive.llap.cache.MemoryManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.parquet.io.SeekableInputStream; + +// TODO# merge with OrcMetadataCache (and rename) after HIVE-15665. Shares a lot of the code. +public class ParquetMetadataCacheImpl implements LlapOomDebugDump, FileMetadataCache { + private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata = + new ConcurrentHashMap<>(); + + private final MemoryManager memoryManager; + private final LowLevelCachePolicy policy; + private final EvictionAwareAllocator allocator; + private final LlapDaemonCacheMetrics metrics; + + public ParquetMetadataCacheImpl(EvictionAwareAllocator allocator, MemoryManager memoryManager, + LowLevelCachePolicy policy, LlapDaemonCacheMetrics metrics) { + this.memoryManager = memoryManager; + this.allocator = allocator; + this.policy = policy; + this.metrics = metrics; + } + + public void notifyEvicted(LlapFileMetadataBuffer buffer) { + LlapBufferOrBuffers removed = metadata.remove(buffer.getFileKey()); + if (removed == null) return; + if (removed.getSingleBuffer() != null) { + assert removed.getSingleBuffer() == buffer; + return; + } + discardMultiBuffer(removed); + } + + @Override + public LlapBufferOrBuffers getFileMetadata(Object fileKey) { + LlapBufferOrBuffers result = metadata.get(fileKey); + if (result == null) return null; + if (!lockBuffer(result, true)) { + // No need to discard the buffer we cannot lock - eviction takes care of that. + metadata.remove(fileKey, result); + return null; + } + return result; + } + + @Override + public LlapBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is) throws IOException { + LlapBufferOrBuffers result = null; + while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). + LlapBufferOrBuffers oldVal = metadata.get(fileKey); + if (oldVal == null) { + result = wrapBbForFile(result, fileKey, length, is); + if (!lockBuffer(result, false)) { + throw new AssertionError("Cannot lock a newly created value " + result); + } + oldVal = metadata.putIfAbsent(fileKey, result); + if (oldVal == null) { + cacheInPolicy(result); // Cached successfully, add to policy. + return result; + } + } + if (lockOldVal(fileKey, result, oldVal)) { + return oldVal; + } + // We found some old value but couldn't incRef it; remove it. + metadata.remove(fileKey, oldVal); + } + } + + private void cacheInPolicy(LlapBufferOrBuffers buffers) { + LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer(); + if (singleBuffer != null) { + policy.cache(singleBuffer, Priority.HIGH); + return; + } + for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) { + policy.cache(buffer, Priority.HIGH); + } + } + + private <T extends LlapBufferOrBuffers> boolean lockOldVal(Object key, T newVal, T oldVal) { + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when metadata is already cached for" + + " {}; old {}, new {}", key, oldVal, newVal); + } + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal); + } + if (lockBuffer(oldVal, true)) { + // We found an old, valid block for this key in the cache. + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}", + newVal, oldVal); + } + + if (newVal != null) { + unlockBuffer(newVal, false); + } + return true; + } + return false; + } + + @Override + public void decRefBuffer(MemoryBufferOrBuffers buffer) { + if (!(buffer instanceof LlapBufferOrBuffers)) { + throw new AssertionError(buffer.getClass()); + } + unlockBuffer((LlapBufferOrBuffers)buffer, true); + } + + private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, + Object fileKey, int length, InputStream stream) throws IOException { + if (result != null) return result; + int maxAlloc = allocator.getMaxAllocation(); + LlapFileMetadataBuffer[] largeBuffers = null; + if (maxAlloc < length) { + largeBuffers = new LlapFileMetadataBuffer[length / maxAlloc]; + for (int i = 0; i < largeBuffers.length; ++i) { + largeBuffers[i] = new LlapFileMetadataBuffer(fileKey); + } + allocator.allocateMultiple(largeBuffers, maxAlloc, null); + for (int i = 0; i < largeBuffers.length; ++i) { + readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]); + } + } + int smallSize = length % maxAlloc; + if (smallSize == 0) { + return new LlapFileMetadataBuffers(largeBuffers); + } else { + LlapFileMetadataBuffer[] smallBuffer = new LlapFileMetadataBuffer[1]; + smallBuffer[0] = new LlapFileMetadataBuffer(fileKey); + allocator.allocateMultiple(smallBuffer, length, null); + readIntoCacheBuffer(stream, smallSize, smallBuffer[0]); + if (largeBuffers == null) { + return smallBuffer[0]; + } else { + LlapFileMetadataBuffer[] cacheData = new LlapFileMetadataBuffer[largeBuffers.length + 1]; + System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length); + cacheData[largeBuffers.length] = smallBuffer[0]; + return new LlapFileMetadataBuffers(largeBuffers); + } + } + } + + private static void readIntoCacheBuffer( + InputStream stream, int length, MemoryBuffer dest) throws IOException { + ByteBuffer bb = dest.getByteBufferRaw(); + int pos = bb.position(); + bb.limit(pos + length); + // TODO: SeekableInputStream.readFully eventually calls a Hadoop method that used to be + // buggy in 2.7 and also anyway just does a copy for a direct buffer. Do a copy here. + // ((SeekableInputStream)stream).readFully(bb); + FileUtils.readFully(stream, length, bb); + bb.position(pos); + } + + + private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean doNotifyPolicy) { + LlapAllocatorBuffer buffer = buffers.getSingleLlapBuffer(); + if (buffer != null) { + return lockOneBuffer(buffer, doNotifyPolicy); + } + LlapAllocatorBuffer[] bufferArray = buffers.getMultipleLlapBuffers(); + for (int i = 0; i < bufferArray.length; ++i) { + if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue; + for (int j = 0; j < i; ++j) { + unlockSingleBuffer(buffer, true); + } + discardMultiBuffer(buffers); + return false; + } + return true; + } + + private void discardMultiBuffer(LlapBufferOrBuffers removed) { + long memoryFreed = 0; + for (LlapAllocatorBuffer buf : removed.getMultipleLlapBuffers()) { + long memUsage = buf.getMemoryUsage(); + // We cannot just deallocate the buffer, as it can hypothetically have users. + int result = buf.invalidate(); + switch (result) { + case LlapAllocatorBuffer.INVALIDATE_ALREADY_INVALID: continue; // Nothing to do. + case LlapAllocatorBuffer.INVALIDATE_FAILED: { + // Someone is using this buffer; eventually, it will be evicted. + continue; + } + case LlapAllocatorBuffer.INVALIDATE_OK: { + memoryFreed += memUsage; + allocator.deallocateEvicted(buf); + break; + } + default: throw new AssertionError(result); + } + } + memoryManager.releaseMemory(memoryFreed); + } + + private boolean lockOneBuffer(LlapAllocatorBuffer buffer, boolean doNotifyPolicy) { + int rc = buffer.incRef(); + if (rc > 0) { + metrics.incrCacheNumLockedBuffers(); + } + if (doNotifyPolicy && rc == 1) { + // We have just locked a buffer that wasn't previously locked. + policy.notifyLock(buffer); + } + return rc > 0; + } + + private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) { + LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer(); + if (singleBuffer != null) { + unlockSingleBuffer(singleBuffer, isCached); + return; + } + for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) { + unlockSingleBuffer(buffer, isCached); + } + } + + private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean isCached) { + boolean isLastDecref = (buffer.decRef() == 0); + if (isLastDecref) { + if (isCached) { + policy.notifyUnlock(buffer); + } else { + allocator.deallocate(buffer); + } + } + metrics.decrCacheNumLockedBuffers(); + } + + + public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers { + LlapAllocatorBuffer getSingleLlapBuffer(); + LlapAllocatorBuffer[] getMultipleLlapBuffers(); + } + + public final static class LlapFileMetadataBuffer + extends LlapAllocatorBuffer implements LlapBufferOrBuffers { + private final Object fileKey; + + public LlapFileMetadataBuffer(Object fileKey) { + this.fileKey = fileKey; + } + + @Override + public void notifyEvicted(EvictionDispatcher evictionDispatcher) { + evictionDispatcher.notifyEvicted(this); + } + + public Object getFileKey() { + return fileKey; + } + + @Override + public LlapAllocatorBuffer getSingleLlapBuffer() { + return this; + } + + @Override + public LlapAllocatorBuffer[] getMultipleLlapBuffers() { + return null; + } + + @Override + public MemoryBuffer getSingleBuffer() { + return this; + } + + @Override + public MemoryBuffer[] getMultipleBuffers() { + return null; + } + } + + public final static class LlapFileMetadataBuffers implements LlapBufferOrBuffers { + private final LlapFileMetadataBuffer[] buffers; + + public LlapFileMetadataBuffers(LlapFileMetadataBuffer[] buffers) { + this.buffers = buffers; + } + + @Override + public LlapAllocatorBuffer getSingleLlapBuffer() { + return null; + } + + @Override + public LlapAllocatorBuffer[] getMultipleLlapBuffers() { + return buffers; + } + + @Override + public MemoryBuffer getSingleBuffer() { + return null; + } + + @Override + public MemoryBuffer[] getMultipleBuffers() { + return buffers; + } + } + + @Override + public String debugDumpForOom() { + // TODO: nothing, will be merged with ORC cache + return null; + } + + @Override + public void debugDumpShort(StringBuilder sb) { + // TODO: nothing, will be merged with ORC cache + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java new file mode 100644 index 0000000..5c1eed3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -0,0 +1,422 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; +import org.apache.hadoop.util.Progressable; +import org.apache.orc.impl.RecordReaderUtils; + +/** + * This is currently only used by Parquet; however, a generally applicable approach is used - + * you pass in a set of offset pairs for a file, and the file is cached with these boundaries. + * Don't add anything format specific here. + */ +public class LlapCacheAwareFs extends FileSystem { + public static final String SCHEME = "llapcache"; + private static AtomicLong currentSplitId = new AtomicLong(-1); + private URI uri; + + // We store the chunk indices by split file; that way if several callers are reading + // the same file they can separately store and remove the relevant parts of the index. + private static final ConcurrentHashMap<Long, CacheAwareInputStream> files = + new ConcurrentHashMap<>(); + + public static Path registerFile(DataCache cache, Path path, Object fileKey, + TreeMap<Long, Long> index, Configuration conf) throws IOException { + long splitId = currentSplitId.incrementAndGet(); + CacheAwareInputStream stream = new CacheAwareInputStream( + cache, conf, index, path, fileKey, -1); + if (files.putIfAbsent(splitId, stream) != null) { + throw new IOException("Record already exists for " + splitId); + } + conf.set("fs." + LlapCacheAwareFs.SCHEME + ".impl", LlapCacheAwareFs.class.getCanonicalName()); + return new Path(SCHEME + "://" + SCHEME + "/" + splitId); + } + + public static void unregisterFile(Path cachePath) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistering " + cachePath); + } + files.remove(extractSplitId(cachePath)); + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + this.uri = URI.create(SCHEME + "://" + SCHEME); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + return new FSDataInputStream(getCtx(path).cloneWithBufferSize(bufferSize)); + } + + private LlapCacheAwareFs.CacheAwareInputStream getCtx(Path path) { + return files.get(extractSplitId(path)); + } + + private static long extractSplitId(Path path) { + String pathOnly = path.toUri().getPath(); + if (pathOnly.startsWith("/")) { + pathOnly = pathOnly.substring(1); + } + return Long.parseLong(pathOnly); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWorkingDirectory(Path arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().append(ctx.path, arg1, arg2); + } + + @Override + public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3, + short arg4, long arg5, Progressable arg6) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().create(ctx.path, arg1, arg2, arg3, arg4, arg5, arg6); + } + + @Override + public boolean delete(Path arg0, boolean arg1) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().delete(ctx.path, arg1); + + } + + @Override + public FileStatus getFileStatus(Path arg0) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().getFileStatus(ctx.path); + } + + @Override + public FileStatus[] listStatus(Path arg0) throws FileNotFoundException, IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().listStatus(ctx.path); + } + + @Override + public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0); + return ctx.getFs().mkdirs(ctx.path, arg1); + } + + @Override + public boolean rename(Path arg0, Path arg1) throws IOException { + LlapCacheAwareFs.CacheAwareInputStream ctx1 = getCtx(arg0), ctx2 = getCtx(arg1); + return ctx1.getFs().rename(ctx1.path, ctx2.path); + } + + private static class CacheAwareInputStream extends InputStream + implements Seekable, PositionedReadable { + private final TreeMap<Long, Long> chunkIndex; + private final Path path; + private final Object fileKey; + private final Configuration conf; + private final DataCache cache; + private final int bufferSize; + private long position = 0; + + public CacheAwareInputStream(DataCache cache, Configuration conf, + TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize) { + this.cache = cache; + this.fileKey = fileKey; + this.chunkIndex = chunkIndex; + this.path = path; + this.conf = conf; + this.bufferSize = bufferSize; + } + + public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) { + return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize); + } + + @Override + public int read() throws IOException { + // This is not called by ConsecutiveChunk stuff in Parquet. + // If this were used, it might make sense to make it faster. + byte[] theByte = new byte[1]; + int result = read(theByte, 0, 1); + if (result != 1) throw new EOFException(); + return theByte[0] & 0xFF; + } + + @Override + public int read(byte[] array, final int arrayOffset, final int len) throws IOException { + long readStartPos = position; + DiskRangeList drl = new DiskRangeList(readStartPos, readStartPos + len); + DataCache.BooleanRef gotAllData = new DataCache.BooleanRef(); + drl = cache.getFileData(fileKey, drl, 0, new DataCache.DiskRangeListFactory() { + @Override + public DiskRangeList createCacheChunk( + MemoryBuffer buffer, long startOffset, long endOffset) { + CacheChunk result = new CacheChunk(); // TODO: pool? + result.init(buffer, startOffset, endOffset); + return result; + } + }, gotAllData); + if (LOG.isInfoEnabled()) { + LOG.info("Buffers after cache " + RecordReaderUtils.stringifyDiskRanges(drl)); + } + if (gotAllData.value) { + long sizeRead = 0; + while (drl != null) { + assert drl.hasData(); + long from = drl.getOffset(), to = drl.getEnd(); + int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from); + ByteBuffer data = drl.getData().duplicate(); + data.get(array, arrayOffset + offsetFromReadStart, candidateSize); + sizeRead += candidateSize; + drl = drl.next; + } + validateAndUpdatePosition(len, sizeRead); + return len; + } + int maxAlloc = cache.getAllocator().getMaxAllocation(); + // We have some disk data. Separate it by column chunk and put into cache. + + // We started with a single DRL, so we assume there will be no consecutive missing blocks + // after the cache has inserted cache data. We also assume all the missing parts will + // represent one or several column chunks, since we always cache on column chunk boundaries. + DiskRangeList current = drl; + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream is = fs.open(path, bufferSize); + Allocator allocator = cache.getAllocator(); + long sizeRead = 0; + while (current != null) { + DiskRangeList candidate = current; + current = current.next; + long from = candidate.getOffset(), to = candidate.getEnd(); + // The offset in the destination array for the beginning of this missing range. + int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from); + if (candidate.hasData()) { + ByteBuffer data = candidate.getData().duplicate(); + data.get(array, arrayOffset + offsetFromReadStart, candidateSize); + sizeRead += candidateSize; + continue; + } + // The data is not in cache. + + // Account for potential partial chunks. + SortedMap<Long, Long> chunksInThisRead = getAndValidateMissingChunks(maxAlloc, from, to); + + is.seek(from); + is.readFully(array, arrayOffset + offsetFromReadStart, candidateSize); + sizeRead += candidateSize; + // Now copy missing chunks (and parts of chunks) into cache buffers. + if (fileKey == null || cache == null) continue; + int extraDiskDataOffset = 0; + // TODO: should we try to make a giant array for one cache call to avoid overhead? + for (Map.Entry<Long, Long> missingChunk : chunksInThisRead.entrySet()) { + long chunkFrom = Math.max(from, missingChunk.getKey()), + chunkTo = Math.min(to, missingChunk.getValue()), + chunkLength = chunkTo - chunkFrom; + MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null; + try { + int largeBufCount = (int) (chunkLength / maxAlloc); + int smallSize = (int) (chunkLength % maxAlloc); + int chunkPartCount = largeBufCount + ((smallSize > 0) ? 1 : 0); + DiskRange[] cacheRanges = new DiskRange[chunkPartCount]; + int extraOffsetInChunk = 0; + if (maxAlloc < chunkLength) { + largeBuffers = new MemoryBuffer[largeBufCount]; + allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory()); + for (int i = 0; i < largeBuffers.length; ++i) { + // By definition here we copy up to the limit of the buffer. + ByteBuffer bb = largeBuffers[i].getByteBufferRaw(); + int remaining = bb.remaining(); + assert remaining == maxAlloc; + copyDiskDataToCacheBuffer(array, + arrayOffset + offsetFromReadStart + extraDiskDataOffset, + remaining, bb, cacheRanges, i, chunkFrom + extraOffsetInChunk); + extraDiskDataOffset += remaining; + extraOffsetInChunk += remaining; + } + } + newCacheData = largeBuffers; + largeBuffers = null; + if (smallSize > 0) { + smallBuffer = new MemoryBuffer[1]; + allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory()); + ByteBuffer bb = smallBuffer[0].getByteBufferRaw(); + copyDiskDataToCacheBuffer(array, + arrayOffset + offsetFromReadStart + extraDiskDataOffset, + smallSize, bb, cacheRanges, largeBufCount, chunkFrom + extraOffsetInChunk); + extraDiskDataOffset += smallSize; + extraOffsetInChunk += smallSize; // Not strictly necessary, noone will look at it. + if (newCacheData == null) { + newCacheData = smallBuffer; + } else { + // TODO: add allocate overload with an offset and length + MemoryBuffer[] combinedCacheData = new MemoryBuffer[largeBufCount + 1]; + System.arraycopy(newCacheData, 0, combinedCacheData, 0, largeBufCount); + newCacheData = combinedCacheData; + newCacheData[largeBufCount] = smallBuffer[0]; + } + smallBuffer = null; + } + cache.putFileData(fileKey, cacheRanges, newCacheData, 0); + } finally { + // We do not use the new cache buffers for the actual read, given the way read() API is. + // Therefore, we don't need to handle cache collisions - just decref all the buffers. + if (newCacheData != null) { + for (MemoryBuffer buffer : newCacheData) { + if (buffer == null) continue; + cache.releaseBuffer(buffer); + } + } + // If we have failed before building newCacheData, deallocate other the allocated. + if (largeBuffers != null) { + for (MemoryBuffer buffer : largeBuffers) { + if (buffer == null) continue; + allocator.deallocate(buffer); + } + } + if (smallBuffer != null && smallBuffer[0] != null) { + allocator.deallocate(smallBuffer[0]); + } + } + } + } + validateAndUpdatePosition(len, sizeRead); + return len; + } + + private void validateAndUpdatePosition(int len, long sizeRead) { + if (sizeRead != len) { + throw new AssertionError("Reading at " + position + " for " + len + ": " + + sizeRead + " bytes copied"); + } + position += len; + } + + private void copyDiskDataToCacheBuffer(byte[] diskData, int offsetInDiskData, int sizeToCopy, + ByteBuffer cacheBuffer, DiskRange[] cacheRanges, int cacheRangeIx, long cacheRangeStart) { + int bbPos = cacheBuffer.position(); + long cacheRangeEnd = cacheRangeStart + sizeToCopy; + if (LOG.isTraceEnabled()) { + LOG.trace("Caching [" + cacheRangeStart + ", " + cacheRangeEnd + ")"); + } + cacheRanges[cacheRangeIx] = new DiskRange(cacheRangeStart, cacheRangeEnd); + cacheBuffer.put(diskData, offsetInDiskData, sizeToCopy); + cacheBuffer.position(bbPos); + } + + private SortedMap<Long, Long> getAndValidateMissingChunks( + int maxAlloc, long from, long to) { + Map.Entry<Long, Long> firstMissing = chunkIndex.floorEntry(from); + if (firstMissing == null) { + throw new AssertionError("No lower bound for offset " + from); + } + if (firstMissing.getValue() <= from + || ((from - firstMissing.getKey()) % maxAlloc) != 0) { + // The data does not belong to a recognized chunk, or is split wrong. + throw new AssertionError("Lower bound for offset " + from + " is [" + + firstMissing.getKey() + ", " + firstMissing.getValue() + ")"); + } + SortedMap<Long, Long> missingChunks = chunkIndex.subMap(firstMissing.getKey(), to); + if (missingChunks.isEmpty()) { + throw new AssertionError("No chunks for [" + from + ", " + to + ")"); + } + long lastMissingOffset = missingChunks.lastKey(), + lastMissingEnd = missingChunks.get(lastMissingOffset); + if (lastMissingEnd < to + || (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) { + // The data does not belong to a recognized chunk, or is split wrong. + throw new AssertionError("Lower bound for offset " + to + " is [" + + lastMissingOffset + ", " + lastMissingEnd + ")"); + } + return missingChunks; + } + + public FileSystem getFs() throws IOException { + return path.getFileSystem(conf); + } + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public void seek(long position) throws IOException { + this.position = position; + } + + @Override + @Private + public boolean seekToNewSource(long arg0) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int read(long arg0, byte[] arg1, int arg2, int arg3) throws IOException { + seek(arg0); + return read(arg1, arg2, arg3); + } + + @Override + public void readFully(long arg0, byte[] arg1) throws IOException { + read(arg0, arg1, 0, arg1.length); + } + + @Override + public void readFully(long arg0, byte[] arg1, int arg2, int arg3) throws IOException { + read(arg0, arg1, 0, arg1.length); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index fa7f59d..102150a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -49,7 +49,8 @@ public class HdfsUtils { if (fileSystem instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fileSystem; if ((!checkDefaultFs) || isDefaultFs(dfs)) { - return SHIMS.getFileId(dfs, path.toUri().getPath()); + Object result = SHIMS.getFileId(dfs, path.toUri().getPath()); + if (result != null) return result; } } if (!allowSynthetic) { http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 442c921..5c9d289 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -214,6 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } String ifName = inputFormat.getClass().getCanonicalName(); boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface; + boolean isCacheOnly = inputFormat instanceof LlapCacheOnlyInputFormatInterface; boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf); if (!isVectorized) { // Pretend it's vectorized if the non-vector wrapped is enabled. @@ -224,33 +225,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> if (isVectorized && !isSupported && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED)) { // See if we can use re-encoding to read the format thru IO elevator. - String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS); - if (LOG.isDebugEnabled()) { - LOG.debug("Checking " + ifName + " against " + formatList); - } - String[] formats = StringUtils.getStrings(formatList); - if (formats != null) { - for (String format : formats) { - // TODO: should we check isAssignableFrom? - if (ifName.equals(format)) { - if (LOG.isInfoEnabled()) { - LOG.info("Using SerDe-based LLAP reader for " + ifName); - } - isSupported = isSerdeBased = true; - break; - } - } - } + isSupported = isSerdeBased = checkInputFormatForLlapEncode(conf, ifName); } - if (!isSupported || !isVectorized) { + if ((!isSupported || !isVectorized) && !isCacheOnly) { if (LOG.isInfoEnabled()) { LOG.info("Not using llap for " + ifName + ": supported = " - + isSupported + ", vectorized = " + isVectorized); + + isSupported + ", vectorized = " + isVectorized + ", cache only = " + isCacheOnly); } return inputFormat; } if (LOG.isDebugEnabled()) { - LOG.debug("Wrapping " + ifName); + LOG.debug("Processing " + ifName); } @SuppressWarnings("unchecked") @@ -264,43 +249,85 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> Deserializer serde = null; if (isSerdeBased) { if (part == null) { - if (LOG.isInfoEnabled()) { + if (isCacheOnly) { + LOG.info("Using cache only because there's no partition spec for SerDe-based IF"); + injectLlapCaches(inputFormat, llapIo); + } else { LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF"); } return inputFormat; } - VectorPartitionDesc vpart = part.getVectorPartitionDesc(); - if (vpart != null) { - VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType(); - if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) { + serde = findSerDeForLlapSerDeIf(conf, part); + } + if (isSupported && isVectorized) { + InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde); + // null means we cannot wrap; the cause is logged inside. + if (wrappedIf != null) { + return castInputFormat(wrappedIf); + } + } + if (isCacheOnly) { + injectLlapCaches(inputFormat, llapIo); + } + return inputFormat; + } + + private static boolean checkInputFormatForLlapEncode(Configuration conf, String ifName) { + String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking " + ifName + " against " + formatList); + } + String[] formats = StringUtils.getStrings(formatList); + if (formats != null) { + for (String format : formats) { + // TODO: should we check isAssignableFrom? + if (ifName.equals(format)) { if (LOG.isInfoEnabled()) { - LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition " - + part.getTableName() + " " + part.getPartSpec()); + LOG.info("Using SerDe-based LLAP reader for " + ifName); } - vpart.setVectorMapOperatorReadType( - VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT); + return true; } } - try { - serde = part.getDeserializer(conf); - } catch (Exception e) { - throw new HiveException("Error creating SerDe for LLAP IO", e); + } + return false; + } + + private static Deserializer findSerDeForLlapSerDeIf( + Configuration conf, PartitionDesc part) throws HiveException { + VectorPartitionDesc vpart = part.getVectorPartitionDesc(); + if (vpart != null) { + VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType(); + if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) { + if (LOG.isInfoEnabled()) { + LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition " + + part.getTableName() + " " + part.getPartSpec()); + } + vpart.setVectorMapOperatorReadType( + VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT); } } - InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde); - if (wrappedIf == null) { - return inputFormat; // We cannot wrap; the cause is logged inside. + try { + return part.getDeserializer(conf); + } catch (Exception e) { + throw new HiveException("Error creating SerDe for LLAP IO", e); } - return castInputFormat(wrappedIf); } - + public static void injectLlapCaches(InputFormat<WritableComparable, Writable> inputFormat, + LlapIo<VectorizedRowBatch> llapIo) { + LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName()); + llapIo.initCacheOnlyInputFormat(inputFormat); + } public static boolean canWrapForLlap(Class<? extends InputFormat> clazz, boolean checkVector) { return LlapWrappableInputFormatInterface.class.isAssignableFrom(clazz) && (!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz)); } + public static boolean canInjectCaches(Class<? extends InputFormat> clazz) { + return LlapCacheOnlyInputFormatInterface.class.isAssignableFrom(clazz); + } + @SuppressWarnings("unchecked") private static <T, U, V, W> InputFormat<T, U> castInputFormat(InputFormat<V, W> from) { // This is ugly in two ways... http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java new file mode 100644 index 0000000..13d594c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.FileMetadataCache; + +/** Marker interface for LLAP IO. */ +public interface LlapCacheOnlyInputFormatInterface { + void injectCaches(FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf); +} http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index f4fadbb..38aaeed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -15,7 +15,11 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -37,7 +41,7 @@ import org.apache.parquet.hadoop.ParquetInputFormat; * are not currently supported. Removing the interface turns off vectorization. */ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable> - implements VectorizedInputFormatInterface { + implements VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface { private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class); @@ -78,4 +82,10 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra throw new RuntimeException("Cannot create a RecordReaderWrapper", e); } } + + @Override + public void injectCaches( + FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) { + vectorizedSelf.injectCaches(metadataCache, dataCache, cacheConf); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 322178a..0cc580a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -15,6 +15,10 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.FileMetadataCache; +import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -28,7 +32,12 @@ import org.apache.hadoop.mapred.Reporter; * Vectorized input format for Parquet files */ public class VectorizedParquetInputFormat - extends FileInputFormat<NullWritable, VectorizedRowBatch> { + extends FileInputFormat<NullWritable, VectorizedRowBatch> + implements LlapCacheOnlyInputFormatInterface { + + private FileMetadataCache metadataCache = null; + private DataCache dataCache = null; + private Configuration cacheConf = null; public VectorizedParquetInputFormat() { } @@ -38,6 +47,15 @@ public class VectorizedParquetInputFormat InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - return new VectorizedParquetRecordReader(inputSplit, jobConf); + return new VectorizedParquetRecordReader( + inputSplit, jobConf, metadataCache, dataCache, cacheConf); + } + + @Override + public void injectCaches( + FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) { + this.metadataCache = metadataCache; + this.dataCache = dataCache; + this.cacheConf = cacheConf; } } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java new file mode 100644 index 0000000..2a6e052 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java @@ -0,0 +1,196 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +/** + * The Parquet InputFile implementation that allows the reader to + * read the footer from cache without being aware of the latter. + * This implements both InputFile and the InputStream that the reader gets from InputFile. + */ +final class ParquetFooterInputFromCache + extends SeekableInputStream implements InputFile { + final static int FOOTER_LENGTH_SIZE = 4; // For the file size check. + private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE; + private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length; + private final int length, footerLength; + private int position = 0, bufferIx = 0, bufferPos = 0; + private final MemoryBuffer[] cacheData; + private final int[] positions; + + public ParquetFooterInputFromCache(MemoryBufferOrBuffers footerData) { + MemoryBuffer oneBuffer = footerData.getSingleBuffer(); + if (oneBuffer != null) { + cacheData = new MemoryBuffer[2]; + cacheData[0] = oneBuffer; + } else { + MemoryBuffer[] bufs = footerData.getMultipleBuffers(); + cacheData = new MemoryBuffer[bufs.length + 1]; + System.arraycopy(bufs, 0, cacheData, 0, bufs.length); + } + int footerLength = 0; + positions = new int[cacheData.length]; + for (int i = 0; i < cacheData.length - 1; ++i) { + positions[i] = footerLength; + int dataLen = cacheData[i].getByteBufferRaw().remaining(); + assert dataLen > 0; + footerLength += dataLen; + } + positions[cacheData.length - 1] = footerLength; + cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength); + this.footerLength = footerLength; + this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH; + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public SeekableInputStream newStream() throws IOException { + // Note: this doesn't maintain proper newStream semantics (if any). + // We could either clone this instead or enforce that this is only called once. + return this; + } + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public void seek(long targetPos) throws IOException { + this.position = (int)targetPos; + targetPos -= FAKE_PREFIX_LENGTH; + // Not efficient, but we don't expect this to be called frequently. + for (int i = 1; i <= positions.length; ++i) { + int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i]; + if (endPos > targetPos) { + bufferIx = i - 1; + bufferPos = (int) (targetPos - positions[i - 1]); + return; + } + } + throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength + + Arrays.toString(positions)); + } + + @Override + public void readFully(byte[] b, int offset, int len) throws IOException { + if (readInternal(b, offset, len) == len) return; + throw new EOFException(); + } + + public int readInternal(byte[] b, int offset, int len) { + if (position >= length) return -1; + int argPos = offset, argEnd = offset + len; + while (argPos < argEnd) { + if (bufferIx == cacheData.length) return (argPos - offset); + ByteBuffer data = cacheData[bufferIx].getByteBufferDup(); + int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos); + data.position(data.position() + bufferPos); + data.get(b, argPos, toConsume); + if (data.remaining() == 0) { + ++bufferIx; + bufferPos = 0; + } else { + bufferPos += toConsume; + } + argPos += toConsume; + } + return len; + } + + @Override + public int read() throws IOException { + if (position >= length) return -1; + ++position; + ByteBuffer data = cacheData[bufferIx].getByteBufferRaw(); + int bp = bufferPos; + ++bufferPos; + if (bufferPos == data.remaining()) { + ++bufferIx; // The first line check should handle the OOB. + bufferPos = 0; + } + return data.get(data.position() + bp) & 0xFF; + } + + @Override + public int read(ByteBuffer bb) throws IOException { + // Simple implementation for now - currently Parquet uses heap buffers. + int result = -1; + if (bb.hasArray()) { + result = readInternal(bb.array(), bb.arrayOffset(), result); + if (result > 0) { + bb.position(bb.position() + result); + } + } else { + byte[] b = new byte[bb.remaining()]; + result = readInternal(b, 0, result); + bb.put(b, 0, result); + } + return result; + } + + @Override + public void readFully(byte[] arg0) throws IOException { + readFully(arg0, 0, arg0.length); + } + + @Override + public void readFully(ByteBuffer arg0) throws IOException { + read(arg0); + } + + /** + * The fake buffer that emulates end of file, with footer length and magic. Given that these + * can be generated based on the footer buffer itself, we don't cache them. + */ + private final static class FooterEndBuffer implements MemoryBuffer { + private final ByteBuffer bb; + public FooterEndBuffer(int footerLength) { + byte[] b = new byte[8]; + b[0] = (byte) ((footerLength >>> 0) & 0xFF); + b[1] = (byte) ((footerLength >>> 8) & 0xFF); + b[2] = (byte) ((footerLength >>> 16) & 0xFF); + b[3] = (byte) ((footerLength >>> 24) & 0xFF); + for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) { + b[4 + i] = ParquetFileWriter.MAGIC[i]; + } + bb = ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getByteBufferRaw() { + return bb; + } + + @Override + public ByteBuffer getByteBufferDup() { + return bb.duplicate(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 6a7a219..0977759 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -14,9 +14,22 @@ package org.apache.hadoop.hive.ql.io.parquet.vector; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.FileMetadataCache; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapCacheAwareFs; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; @@ -31,13 +44,21 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.parquet.ParquetRuntimeException; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -50,11 +71,11 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.TreeMap; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; -import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** @@ -74,6 +95,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private VectorizedRowBatchCtx rbCtx; private List<Integer> indexColumnsWanted; private Object[] partitionValues; + private Path cacheFsPath; /** * For each request column, the reader to read this column. This is NULL if this column @@ -114,9 +136,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } public VectorizedParquetRecordReader( - org.apache.hadoop.mapred.InputSplit oldInputSplit, - JobConf conf) { + org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) { + this(oldInputSplit, conf, null, null, null); + } + + public VectorizedParquetRecordReader( + org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf, + FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) { try { + this.metadataCache = metadataCache; + this.cache = dataCache; + this.cacheConf = cacheConf; serDeStats = new SerDeStats(); projectionPusher = new ProjectionPusher(); ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf); @@ -132,16 +162,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } } - private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException { - int partitionColumnCount = rbCtx.getPartitionColumnCount(); - if (partitionColumnCount > 0) { - partitionValues = new Object[partitionColumnCount]; - rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues); - } else { - partitionValues = null; - } - } + private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException { + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues); + } else { + partitionValues = null; + } + } + @SuppressWarnings("deprecation") public void initialize( InputSplit oldSplit, JobConf configuration) throws IOException, InterruptedException { @@ -164,16 +195,33 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); // if task.side.metadata is set, rowGroupOffsets is null + Object cacheKey = null; + // TODO: also support fileKey in splits, like OrcSplit does + if (metadataCache != null) { + cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file, + HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), + HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); + } + if (cacheKey != null) { + // If we are going to use cache, change the path to depend on file ID for extra consistency. + FileSystem fs = file.getFileSystem(configuration); + if (cacheKey instanceof Long && HiveConf.getBoolVar( + cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { + file = HdfsUtils.getFileIdPath(fs, file, (long)cacheKey); + } + } + if (rowGroupOffsets == null) { //TODO check whether rowGroupOffSets can be null // then we need to apply the predicate push down filter - footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); + footer = readSplitFooter( + configuration, file, cacheKey, range(split.getStart(), split.getEnd())); MessageType fileSchema = footer.getFileMetaData().getSchema(); FilterCompat.Filter filter = getFilter(configuration); blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); } else { // otherwise we find the row groups that were selected on the client - footer = readFooter(configuration, file, NO_FILTER); + footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER); Set<Long> offsets = new HashSet<>(); for (long offset : rowGroupOffsets) { offsets.add(offset); @@ -230,10 +278,98 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase requestedSchema = fileSchema; } + Path path = wrapPathForCache(file, cacheKey, configuration, blocks); this.reader = new ParquetFileReader( - configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); + configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); + } + + private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, + List<BlockMetaData> blocks) throws IOException { + if (fileKey == null || cache == null) { + return path; + } + HashSet<ColumnPath> includedCols = new HashSet<>(); + for (ColumnDescriptor col : requestedSchema.getColumns()) { + includedCols.add(ColumnPath.get(col.getPath())); + } + // We could make some assumptions given how the reader currently does the work (consecutive + // chunks, etc.; blocks and columns stored in offset order in the lists), but we won't - + // just save all the chunk boundaries and lengths for now. + TreeMap<Long, Long> chunkIndex = new TreeMap<>(); + for (BlockMetaData block : blocks) { + for (ColumnChunkMetaData mc : block.getColumns()) { + if (!includedCols.contains(mc.getPath())) continue; + chunkIndex.put(mc.getStartingPos(), mc.getStartingPos() + mc.getTotalSize()); + } + } + // Register the cache-aware path so that Parquet reader would go thru it. + configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl", + LlapCacheAwareFs.class.getCanonicalName()); + path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration); + this.cacheFsPath = path; + return path; + } + + private ParquetMetadata readSplitFooter( + JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException { + MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null + : metadataCache.getFileMetadata(cacheKey); + if (footerData != null) { + if (LOG.isInfoEnabled()) { + LOG.info("Found the footer in cache for " + cacheKey); + } + try { + return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); + } finally { + metadataCache.decRefBuffer(footerData); + } + } + final FileSystem fs = file.getFileSystem(configuration); + final FileStatus stat = fs.getFileStatus(file); + if (cacheKey == null || metadataCache == null) { + return readFooterFromFile(file, fs, stat, filter); + } + + // To avoid reading the footer twice, we will cache it first and then read from cache. + // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact. + try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) { + long footerLengthIndex = stat.getLen() + - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length; + stream.seek(footerLengthIndex); + int footerLength = BytesUtils.readIntLittleEndian(stream); + stream.seek(footerLengthIndex - footerLength); + if (LOG.isInfoEnabled()) { + LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey); + } + footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream); + try { + return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); + } finally { + metadataCache.decRefBuffer(footerData); + } + } } + private ParquetMetadata readFooterFromFile(final Path file, final FileSystem fs, + final FileStatus stat, MetadataFilter filter) throws IOException { + InputFile inputFile = new InputFile() { + @Override + public SeekableInputStream newStream() throws IOException { + return HadoopStreams.wrap(fs.open(file)); + } + @Override + public long getLength() throws IOException { + return stat.getLen(); + } + }; + return ParquetFileReader.readFooter(inputFile, filter); + } + + + private FileMetadataCache metadataCache; + private DataCache cache; + private Configuration cacheConf; + @Override public boolean next( NullWritable nullWritable, @@ -259,6 +395,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase @Override public void close() throws IOException { + if (cacheFsPath != null) { + LlapCacheAwareFs.unregisterFile(cacheFsPath); + } if (reader != null) { reader.close(); } http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 8b99ae0..2e63260 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -258,7 +258,7 @@ public class MapWork extends BaseWork { } public void deriveLlap(Configuration conf, boolean isExecDriver) { - boolean hasLlap = false, hasNonLlap = false, hasAcid = false; + boolean hasLlap = false, hasNonLlap = false, hasAcid = false, hasCacheOnly = false; // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here. boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode); boolean canWrapAny = false, doCheckIfs = false; @@ -272,10 +272,9 @@ public class MapWork extends BaseWork { } } boolean hasPathToPartInfo = (pathToPartitionInfo != null && !pathToPartitionInfo.isEmpty()); - if (canWrapAny && hasPathToPartInfo) { - assert isLlapOn; + if (hasPathToPartInfo) { for (PartitionDesc part : pathToPartitionInfo.values()) { - boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap( + boolean isUsingLlapIo = canWrapAny && HiveInputFormat.canWrapForLlap( part.getInputFileFormatClass(), doCheckIfs); if (isUsingLlapIo) { if (part.getTableDesc() != null && @@ -284,28 +283,31 @@ public class MapWork extends BaseWork { } else { hasLlap = true; } + } else if (isLlapOn && HiveInputFormat.canInjectCaches(part.getInputFileFormatClass())) { + hasCacheOnly = true; } else { hasNonLlap = true; } } } - // check if the column types that are read are supported by LLAP IO - if (hasLlap) { - // TODO: no need for now hasLlap = checkVectorizerSupportedTypes(); - } - llapIoDesc = deriveLlapIoDescString( - isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); + isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid, hasCacheOnly); } private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, - boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { + boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid, + boolean hasCacheOnly) { if (!isLlapOn) return null; // LLAP IO is off, don't output. - if (!canWrapAny) return "no inputs"; // Cannot use with input formats. + if (!canWrapAny && !hasCacheOnly) return "no inputs"; // Cannot use with input formats. if (!hasPathToPartInfo) return "unknown"; // No information to judge. + int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) + + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0); + if (varieties > 1) return "some inputs"; // Will probably never actually happen. if (hasAcid) return "may be used (ACID table)"; - return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs"); + if (hasLlap) return "all inputs"; + if (hasCacheOnly) return "all inputs (cache only)"; + return "no inputs"; } public void internTable(Interner<TableDesc> interner) { http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q index dfca486..ff883db 100644 --- a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q +++ b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q @@ -1,6 +1,7 @@ SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; SET mapred.min.split.size=1000; SET mapred.max.split.size=5000; +set hive.llap.cache.allow.synthetic.fileid=true; create table newtypestbl(c char(10), v varchar(10), d decimal(5,3), da date) stored as parquet; http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q index a38cdbe..0e8dd04 100644 --- a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q +++ b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q @@ -1,5 +1,6 @@ set hive.mapred.mode=nonstrict; SET hive.optimize.ppd=true; +set hive.llap.cache.allow.synthetic.fileid=true; -- SORT_QUERY_RESULTS CREATE TABLE tbl_pred(t tinyint,
