Repository: hive Updated Branches: refs/heads/master 53249a357 -> eead54c98
HIVE-13241 : LLAP: Incremental Caching marks some small chunks as "incomplete CB" (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eead54c9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eead54c9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eead54c9 Branch: refs/heads/master Commit: eead54c981984d946ef0d9f932667ea48650a2c3 Parents: 53249a3 Author: Sergey Shelukhin <[email protected]> Authored: Mon Apr 25 18:20:53 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Apr 25 18:20:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../hive/llap/cache/EvictionDispatcher.java | 5 + .../hive/llap/io/api/impl/LlapIoImpl.java | 4 +- .../llap/io/encoded/OrcEncodedDataReader.java | 16 ++- .../llap/io/metadata/OrcFileEstimateErrors.java | 121 +++++++++++++++++++ .../hive/llap/io/metadata/OrcMetadataCache.java | 58 ++++++++- .../hive/llap/cache/TestOrcMetadataCache.java | 2 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 98 ++++++++------- 8 files changed, 256 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8ccc262..bae3999 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2603,6 +2603,10 @@ public class HiveConf extends Configuration { "modification time, which is almost certain to identify file uniquely. However, if you\n" + "use a FS without file IDs and rewrite files a lot (or are paranoid), you might want\n" + "to avoid this setting."), + LLAP_CACHE_ENABLE_ORC_GAP_CACHE("hive.llap.orc.gap.cache", true, + "Whether LLAP cache for ORC should remember gaps in ORC RG read estimates, to avoid\n" + + "re-reading the data that was read once and discarded because it is unneeded. This is\n" + + "only necessary for ORC files written before HIVE-9660 (Hive 2.1?)."), LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true, "Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" + "cases of file overwrites. This is supported on HDFS."), http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java index bae571e..91932e2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; @@ -49,4 +50,8 @@ public final class EvictionDispatcher implements EvictionListener { public void notifyEvicted(OrcStripeMetadata buffer) { metadataCache.notifyEvicted(buffer); } + + public void notifyEvicted(OrcFileEstimateErrors buffer) { + metadataCache.notifyEvicted(buffer); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index d43ff15..6a72b4c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; @@ -117,7 +118,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics); this.allocator = allocator; orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true); - metadataCache = new OrcMetadataCache(memManager, cachePolicy); + boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); + metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache); // And finally cache policy uses cache to notify it of eviction. The cycle is complete! cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache)); cachePolicy.setParentDebugDumper(orcCache); http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index eb953c7..406f8f6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -817,15 +817,23 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { - return (lowLevelCache == null) ? range : lowLevelCache.getFileData( - fileKey, range, baseOffset, factory, counters, gotAllData); + DiskRangeList result = (lowLevelCache == null) ? range + : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData); + if (gotAllData.value) return result; + return (metadataCache == null) ? range + : metadataCache.getIncompleteCbs(fileKey, range, baseOffset, factory, gotAllData); } @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset) { - return (lowLevelCache == null) ? null : lowLevelCache.putFileData( - fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); + if (data != null) { + return (lowLevelCache == null) ? null : lowLevelCache.putFileData( + fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); + } else if (metadataCache != null) { + metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); + } + return null; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java new file mode 100644 index 0000000..ad88b98 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.metadata; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; +import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; +import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; +import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; +import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; +import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; +import org.apache.hadoop.hive.ql.io.SyntheticFileId; + +public class OrcFileEstimateErrors extends LlapCacheableBuffer { + private final Object fileKey; + private int estimatedMemUsage; + private final ConcurrentHashMap<Long, Integer> cache = new ConcurrentHashMap<>(); + + private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS; + private final static ObjectEstimator SIZE_ESTIMATOR; + static { + SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(createDummy( + new SyntheticFileId(new Path("/"), 0, 0))); + SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileEstimateErrors.class); + } + + public OrcFileEstimateErrors(Object fileKey) { + this.fileKey = fileKey; + } + + public void addError(long offset, int length, long baseOffset) { + Long key = Long.valueOf(offset + baseOffset); + Integer existingLength = cache.get(key); + if (existingLength != null && existingLength >= length) return; + Integer value = Integer.valueOf(length); + while (true) { + existingLength = cache.putIfAbsent(key, value); + if (existingLength == null || existingLength >= length) return; + cache.remove(key, existingLength); + } + } + + public DiskRangeList getIncompleteCbs(DiskRangeList ranges, long baseOffset, + DiskRangeListFactory factory, BooleanRef gotAllData) { + DiskRangeList prev = ranges.prev; + if (prev == null) { + prev = new MutateHelper(ranges); + } + DiskRangeList current = ranges; + while (current != null) { + // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance. + DiskRangeList check = current; + current = current.next; + if (check.hasData()) continue; + Integer badLength = cache.get(Long.valueOf(check.getOffset() + baseOffset)); + if (badLength == null || badLength < check.getLength()) { + gotAllData.value = false; + continue; + } + check.removeSelf(); + } + return prev.next; + } + + public Object getFileKey() { + return fileKey; + } + + public long estimateMemoryUsage() { + // Since we won't be able to update this as we add, for now, estimate 10x usage. + // This shouldn't be much and this cache should be remove later anyway. + estimatedMemUsage = 10 * SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS); + return estimatedMemUsage; + } + + private static OrcFileEstimateErrors createDummy(Object fileKey) { + OrcFileEstimateErrors dummy = new OrcFileEstimateErrors(fileKey); + dummy.addError(0L, 0, 0L); + return dummy; + } + + @Override + protected boolean invalidate() { + return true; + } + + @Override + public long getMemoryUsage() { + return estimatedMemUsage; + } + + @Override + public void notifyEvicted(EvictionDispatcher evictionDispatcher) { + evictionDispatcher.notifyEvicted(this); + } + + @Override + protected boolean isLocked() { + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java index e970137..66713d3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java @@ -21,22 +21,29 @@ package org.apache.hadoop.hive.llap.io.metadata; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; +import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; import org.apache.hadoop.hive.llap.cache.MemoryManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; public class OrcMetadataCache { - private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = - new ConcurrentHashMap<Object, OrcFileMetadata>(); + private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>(); private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata = - new ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata>(); + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors; private final MemoryManager memoryManager; private final LowLevelCachePolicy policy; - public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy) { + public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy, + boolean useEstimateCache) { this.memoryManager = memoryManager; this.policy = policy; + this.estimateErrors = useEstimateCache + ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null; } public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) { @@ -71,6 +78,37 @@ public class OrcMetadataCache { return val; } + public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + if (estimateErrors == null) return; + OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); + boolean isNew = false; + // We should technically update memory usage if updating the old object, but we don't do it + // for now; there is no mechanism to properly notify the cache policy/etc. wrt parallel evicts. + if (errorData == null) { + errorData = new OrcFileEstimateErrors(fileKey); + for (DiskRange range : ranges) { + errorData.addError(range.getOffset(), range.getLength(), baseOffset); + } + long memUsage = errorData.estimateMemoryUsage(); + memoryManager.reserveMemory(memUsage, false); + OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); + if (old != null) { + errorData = old; + memoryManager.releaseMemory(memUsage); + policy.notifyLock(errorData); + } else { + isNew = true; + policy.cache(errorData, Priority.NORMAL); + } + } + if (!isNew) { + for (DiskRange range : ranges) { + errorData.addError(range.getOffset(), range.getLength(), baseOffset); + } + } + policy.notifyUnlock(errorData); + } + public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException { return stripeMetadata.get(stripeKey); } @@ -79,6 +117,14 @@ public class OrcMetadataCache { return metadata.get(fileKey); } + public DiskRangeList getIncompleteCbs(Object fileKey, DiskRangeList ranges, long baseOffset, + DiskRangeListFactory factory, BooleanRef gotAllData) { + if (estimateErrors == null) return ranges; + OrcFileEstimateErrors errors = estimateErrors.get(fileKey); + if (errors == null) return ranges; + return errors.getIncompleteCbs(ranges, baseOffset, factory, gotAllData); + } + public void notifyEvicted(OrcFileMetadata buffer) { metadata.remove(buffer.getFileKey()); // See OrcFileMetadata - we don't clear the object, it will be GCed when released by users. @@ -88,4 +134,8 @@ public class OrcMetadataCache { stripeMetadata.remove(buffer.getKey()); // See OrcStripeMetadata - we don't clear the object, it will be GCed when released by users. } + + public void notifyEvicted(OrcFileEstimateErrors buffer) { + estimateErrors.remove(buffer.getFileKey()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 901e58a..3f2e750 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -88,7 +88,7 @@ public class TestOrcMetadataCache { public void testGetPut() throws Exception { DummyMemoryManager mm = new DummyMemoryManager(); DummyCachePolicy cp = new DummyCachePolicy(); - OrcMetadataCache cache = new OrcMetadataCache(mm, cp); + OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false); OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = OrcFileMetadata.createDummy(2); assertSame(ofm1, cache.putFileMetadata(ofm1)); assertEquals(1, mm.allocs); http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 8ccedb7..40cc86f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -101,18 +101,18 @@ class EncodedReaderImpl implements EncodedReader { private final int bufferSize; private final List<OrcProto.Type> types; private final long rowIndexStride; - private final DataCache cache; + private final DataCache cacheWrapper; private boolean isTracingEnabled; public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec, - int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf) - throws IOException { + int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, + PoolFactory pf) throws IOException { this.fileKey = fileKey; this.codec = codec; this.types = types; this.bufferSize = bufferSize; this.rowIndexStride = strideRate; - this.cache = cache; + this.cacheWrapper = cacheWrapper; this.dataReader = dataReader; if (POOLS != null) return; if (pf == null) { @@ -291,7 +291,7 @@ class EncodedReaderImpl implements EncodedReader { } BooleanRef isAllInCache = new BooleanRef(); if (hasFileId) { - cache.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache); + cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache); if (isTracingEnabled && LOG.isInfoEnabled()) { LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); @@ -303,7 +303,7 @@ class EncodedReaderImpl implements EncodedReader { this.dataReader.open(); isDataReaderOpen = true; } - dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc()); + dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc()); } // 3. For uncompressed case, we need some special processing before read. @@ -452,7 +452,7 @@ class EncodedReaderImpl implements EncodedReader { CacheChunk cc = (CacheChunk)toFree; if (cc.getBuffer() == null) continue; MemoryBuffer buffer = cc.getBuffer(); - cache.releaseBuffer(buffer); + cacheWrapper.releaseBuffer(buffer); cc.setBuffer(null); } } @@ -501,11 +501,11 @@ class EncodedReaderImpl implements EncodedReader { } @Override - public void handleCacheCollision(DataCache cache, + public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) { assert cacheBuffers == null; // This is done at pre-read stage where there's nothing special w/refcounts. Just release. - cache.getAllocator().deallocate(getBuffer()); + cacheWrapper.getAllocator().deallocate(getBuffer()); // Replace the buffer in our big range list, as well as in current results. this.setBuffer(replacementBuffer); } @@ -544,14 +544,14 @@ class EncodedReaderImpl implements EncodedReader { } @Override - public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer, + public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) { assert originalCbIndex >= 0; // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put, // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer // is not in cache. - cache.getAllocator().deallocate(getBuffer()); - cache.reuseBuffer(replacementBuffer); + cacheWrapper.getAllocator().deallocate(getBuffer()); + cacheWrapper.reuseBuffer(replacementBuffer); // Replace the buffer in our big range list, as well as in current results. this.buffer = replacementBuffer; cacheBuffers.set(originalCbIndex, replacementBuffer); @@ -594,9 +594,11 @@ class EncodedReaderImpl implements EncodedReader { boolean isCompressed = codec != null; List<ProcCacheChunk> toDecompress = null; List<ByteBuffer> toRelease = null; + List<IncompleteCb> badEstimates = null; if (isCompressed) { toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>(); - toDecompress = new ArrayList<ProcCacheChunk>(); + toDecompress = new ArrayList<>(); + badEstimates = new ArrayList<>(); } // 1. Find our bearings in the stream. Normally, iter will already point either to where we @@ -612,17 +614,25 @@ class EncodedReaderImpl implements EncodedReader { // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below). lastUncompressed = isCompressed ? prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, - unlockUntilCOffset, current, csd, toRelease, toDecompress) + unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates) : prepareRangesForUncompressedRead( cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd); + // 2.5. Remember the bad estimates for future reference. + if (badEstimates != null && !badEstimates.isEmpty()) { + // Relies on the fact that cache does not actually store these. + DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]); + long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset); + assert result == null; // We don't expect conflicts from bad estimates. + } + + if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do. + // 3. Allocate the buffers, prepare cache keys. // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache // data and some unallocated membufs for decompression. toDecompress contains all the work we // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter // has also been adjusted to point to these buffers instead of compressed data for the ranges. - if (toDecompress == null) return lastUncompressed; // Nothing to decompress. - MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()]; DiskRange[] cacheKeys = new DiskRange[toDecompress.size()]; int ix = 0; @@ -631,7 +641,7 @@ class EncodedReaderImpl implements EncodedReader { targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cache.getAllocator().allocateMultiple(targetBuffers, bufferSize); + cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize); // 4. Now decompress (or copy) the data into cache buffers. for (ProcCacheChunk chunk : toDecompress) { @@ -646,7 +656,7 @@ class EncodedReaderImpl implements EncodedReader { if (isTracingEnabled) { LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); } - cache.reuseBuffer(chunk.getBuffer()); + cacheWrapper.reuseBuffer(chunk.getBuffer()); } // 5. Release original compressed buffers to zero-copy reader if needed. @@ -658,8 +668,9 @@ class EncodedReaderImpl implements EncodedReader { } // 6. Finally, put uncompressed data to cache. + if (fileKey != null) { - long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); + long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers()); } @@ -674,7 +685,8 @@ class EncodedReaderImpl implements EncodedReader { private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData, - List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException { + List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress, + List<IncompleteCb> badEstimates) throws IOException { if (cOffset > current.getOffset()) { // Target compression block is in the middle of the range; slice the range in two. current = current.split(cOffset).next; @@ -689,7 +701,7 @@ class EncodedReaderImpl implements EncodedReader { if (isTracingEnabled) { LOG.trace("Locking " + cc.getBuffer() + " due to reuse"); } - cache.reuseBuffer(cc.getBuffer()); + cacheWrapper.reuseBuffer(cc.getBuffer()); columnStreamData.getCacheBuffers().add(cc.getBuffer()); currentOffset = cc.getEnd(); if (isTracingEnabled) { @@ -710,7 +722,7 @@ class EncodedReaderImpl implements EncodedReader { // several disk ranges, so we might need to combine them. BufferChunk bc = (BufferChunk)current; ProcCacheChunk newCached = addOneCompressionBuffer( - bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease); + bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates); lastUncompressed = (newCached == null) ? lastUncompressed : newCached; next = (newCached != null) ? newCached.next : null; currentOffset = (next != null) ? next.getOffset() : -1; @@ -737,7 +749,7 @@ class EncodedReaderImpl implements EncodedReader { if (isTracingEnabled) { LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse"); } - cache.reuseBuffer(lastUncompressed.getBuffer()); + cacheWrapper.reuseBuffer(lastUncompressed.getBuffer()); if (isFirst) { columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset)); isFirst = false; @@ -818,7 +830,7 @@ class EncodedReaderImpl implements EncodedReader { if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) { // We are missing a section at the end of the part... copy the start to non-cached. lastUncompressed = copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc); + candidateCached, partOffset, hasEntirePartTo, cacheWrapper, singleAlloc); candidateCached = null; } current = next; @@ -850,10 +862,10 @@ class EncodedReaderImpl implements EncodedReader { if (candidateCached != null) { assert hadEntirePartTo != -1; copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc); + candidateCached, partOffset, hadEntirePartTo, cacheWrapper, singleAlloc); candidateCached = null; } - lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc); + lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cacheWrapper, singleAlloc); next = lastUncompressed.next; // There may be more data after the gap. } else { // So far we have all the data from the beginning of the part. @@ -885,7 +897,7 @@ class EncodedReaderImpl implements EncodedReader { cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. ++ix; } - cache.getAllocator().allocateMultiple( + cacheWrapper.getAllocator().allocateMultiple( targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); // 4. Now copy the data into cache buffers. @@ -909,7 +921,7 @@ class EncodedReaderImpl implements EncodedReader { // 6. Finally, put uncompressed data to cache. if (fileKey != null) { - long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); + long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toCache, targetBuffers, null); } @@ -922,7 +934,7 @@ class EncodedReaderImpl implements EncodedReader { // of the prevalent ORC compression buffer (the default), or maximum allocation (since we // cannot allocate bigger chunks), whichever is less. long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue(); - int maxAllocSize = cache.getAllocator().getMaxAllocation(); + int maxAllocSize = cacheWrapper.getAllocator().getMaxAllocation(); return (int)Math.min(maxAllocSize, orcCbSizeDefault); } @@ -942,14 +954,14 @@ class EncodedReaderImpl implements EncodedReader { private static CacheChunk copyAndReplaceCandidateToNonCached( UncompressedCacheChunk candidateCached, long partOffset, - long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) { + long candidateEnd, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { // We thought we had the entire part to cache, but we don't; convert start to // non-cached. Since we are at the first gap, the previous stuff must be contiguous. singleAlloc[0] = null; - cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); + cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); MemoryBuffer buffer = singleAlloc[0]; - cache.reuseBuffer(buffer); + cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); CacheChunk tcc = POOLS.tccPool.take(); tcc.init(buffer, partOffset, candidateEnd); @@ -958,11 +970,11 @@ class EncodedReaderImpl implements EncodedReader { } private static CacheChunk copyAndReplaceUncompressedToNonCached( - BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) { + BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { singleAlloc[0] = null; - cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength()); + cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength()); MemoryBuffer buffer = singleAlloc[0]; - cache.reuseBuffer(buffer); + cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); CacheChunk tcc = POOLS.tccPool.take(); tcc.init(buffer, bc.getOffset(), bc.getEnd()); @@ -1056,7 +1068,7 @@ class EncodedReaderImpl implements EncodedReader { LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread" + (isBacktracking ? "; backtracking" : "")); } - cache.releaseBuffer(cc.getBuffer()); + cacheWrapper.releaseBuffer(cc.getBuffer()); cc.setBuffer(null); } @@ -1081,7 +1093,7 @@ class EncodedReaderImpl implements EncodedReader { } assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results " + "even though mask is [" + Long.toBinaryString(maskVal) + "]"; - replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers); + replacedChunk.handleCacheCollision(cacheWrapper, replacementBuffer, cacheBuffers); } maskVal >>= 1; } @@ -1131,11 +1143,12 @@ class EncodedReaderImpl implements EncodedReader { * @param toDecompress The list of work to decompress - pairs of compressed buffers and the * target buffers (same as the ones added to cacheBuffers). * @param toRelease The list of buffers to release to zcr because they are no longer in use. + * @param badEstimates The list of bad estimates that cannot be decompressed. * @return The resulting cache chunk. */ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress, - List<ByteBuffer> toRelease) throws IOException { + List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException { ByteBuffer slice = null; ByteBuffer compressed = current.getChunk(); long cbStartOffset = current.getOffset(); @@ -1166,7 +1179,7 @@ class EncodedReaderImpl implements EncodedReader { return cc; } if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { - addIncompleteCompressionBuffer(cbStartOffset, current, 0); + badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0)); return null; // This is impossible to read from this chunk. } @@ -1221,13 +1234,13 @@ class EncodedReaderImpl implements EncodedReader { } tmp.removeSelf(); } else { - addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount); + badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount)); return null; // This is impossible to read from this chunk. } } } - private void addIncompleteCompressionBuffer( + private IncompleteCb addIncompleteCompressionBuffer( long cbStartOffset, DiskRangeList target, int extraChunkCount) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); if (isTracingEnabled) { @@ -1235,6 +1248,7 @@ class EncodedReaderImpl implements EncodedReader { + icb + " in the buffers"); } target.replaceSelfWith(icb); + return icb; } /** @@ -1253,7 +1267,7 @@ class EncodedReaderImpl implements EncodedReader { boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength, BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) { // Prepare future cache buffer. - MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated(); + MemoryBuffer futureAlloc = cacheWrapper.getAllocator().createUnallocated(); // Add it to result in order we are processing. cacheBuffers.add(futureAlloc); // Add it to the list of work to decompress.
