Repository: hive Updated Branches: refs/heads/master 72684f10d -> 78d5572f8
HIVE-18452 : work around HADOOP-15171 (Sergey Shelukhin, reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/78d5572f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/78d5572f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/78d5572f Branch: refs/heads/master Commit: 78d5572f8e165a907c63e1ef8579f591b8c34563 Parents: 72684f1 Author: sergey <ser...@apache.org> Authored: Tue Jan 16 16:43:03 2018 -0800 Committer: sergey <ser...@apache.org> Committed: Tue Jan 16 16:43:03 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../llap/io/encoded/OrcEncodedDataReader.java | 20 +++-- .../ql/io/orc/encoded/EncodedReaderImpl.java | 83 ++++++++++++++++++-- .../hadoop/hive/ql/io/orc/encoded/Reader.java | 2 +- .../hive/ql/io/orc/encoded/ReaderImpl.java | 4 +- 5 files changed, 98 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 854bbdf..f2e927f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1382,6 +1382,8 @@ public class HiveConf extends Configuration { "while writing a table with ORC file format, enabling this config will do stripe-level\n" + "fast merge for small ORC files. Note that enabling this config will not honor the\n" + "padding tolerance config (hive.exec.orc.block.padding.tolerance)."), + HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", true, + "Whether to use codec pool in ORC. Disable if there are bugs with codec reuse."), HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true, "If this is set the header for RCFiles will simply be RCF. If this is not\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 1e0eccf..68bb168 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -161,6 +161,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private final QueryFragmentCounters counters; private final UserGroupInformation ugi; private final SchemaEvolution evolution; + private final boolean useCodecPool; // Read state. private int stripeIxFrom; @@ -173,6 +174,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private CompressionCodec codec; private Object fileKey; private FileSystem fs; + /** * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. * Contains only stripes that are read, and only columns included. null => read all RGs. @@ -212,6 +214,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } catch (IOException e) { throw new RuntimeException(e); } + this.useCodecPool = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL); // moved this part of code from performDataRead as LlapInputFormat need to know the file schema // to decide if schema evolution is supported or not. @@ -443,7 +446,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> ensureOrcReader(); // Reader creation updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); - stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace); + stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace, + HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL)); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } @@ -739,18 +743,24 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { - LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(stripeKey, footerRange.getData()); + LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( + stripeKey, footerRange.getData().duplicate()); metadataCache.decRefBuffer(cacheBuf); // We don't use this one. } - ByteBuffer bb = footerRange.getData(); + ByteBuffer bb = footerRange.getData().duplicate(); CompressionKind kind = orcReader.getCompressionKind(); - CompressionCodec codec = OrcCodecPool.getCodec(kind); + boolean isPool = useCodecPool; + CompressionCodec codec = isPool ? OrcCodecPool.getCodec(kind) : WriterImpl.createCodec(kind); try { return buildStripeFooter(Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), bb.remaining(), codec, orcReader.getCompressionSize()); } finally { - OrcCodecPool.returnCodec(kind, codec); + if (isPool) { + OrcCodecPool.returnCodec(kind, codec); + } else { + codec.close(); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 627e617..555bda7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.orc.CompressionCodec; import org.apache.orc.DataReader; import org.apache.orc.OrcConf; @@ -54,6 +55,7 @@ import org.apache.orc.impl.OutStream; import org.apache.orc.impl.RecordReaderUtils; import org.apache.orc.impl.StreamName; import org.apache.orc.impl.StreamName.Area; +import org.apache.orc.impl.WriterImpl; import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc; @@ -126,6 +128,7 @@ class EncodedReaderImpl implements EncodedReader { private final DataReader dataReader; private boolean isDataReaderOpen = false; private final CompressionCodec codec; + private final boolean isCodecFromPool; private final boolean isCompressed; private final org.apache.orc.CompressionKind compressionKind; private final int bufferSize; @@ -140,11 +143,12 @@ class EncodedReaderImpl implements EncodedReader { public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, - PoolFactory pf, IoTrace trace) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException { this.fileKey = fileKey; this.compressionKind = kind; this.isCompressed = kind != org.apache.orc.CompressionKind.NONE; - this.codec = OrcCodecPool.getCodec(kind); + this.isCodecFromPool = useCodecPool; + this.codec = useCodecPool ? OrcCodecPool.getCodec(kind) : WriterImpl.createCodec(kind); this.types = types; this.fileSchema = fileSchema; // Note: this is redundant with types this.version = version; @@ -672,7 +676,11 @@ class EncodedReaderImpl implements EncodedReader { @Override public void close() throws IOException { - OrcCodecPool.returnCodec(compressionKind, codec); + if (isCodecFromPool) { + OrcCodecPool.returnCodec(compressionKind, codec); + } else { + codec.close(); + } dataReader.close(); } @@ -1229,14 +1237,61 @@ class EncodedReaderImpl implements EncodedReader { private static void decompressChunk( ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException { int startPos = dest.position(), startLim = dest.limit(); + int startSrcPos = src.position(), startSrcLim = src.limit(); + if (LOG.isTraceEnabled()) { + LOG.trace("Decompressing " + src.remaining() + " bytes to dest buffer pos " + + dest.position() + ", limit " + dest.limit()); + } codec.decompress(src, dest); - // Codec resets the position to 0 and limit to correct limit. dest.position(startPos); int newLim = dest.limit(); if (newLim > startLim) { throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim + ") became [" + dest.position() + ", " + newLim + ")"); } + if (dest.remaining() > 0) return; + + // There's a bug in native decompressor. See HADOOP-15171 + dest.limit(startLim); + src.position(startSrcPos); + src.limit(startSrcLim); + LOG.warn("The codec has produced 0 bytes for " + src.remaining() + " bytes at pos " + + src.position() + ", data hash " + src.hashCode() + ": [" + logSomeBytes(src)); + ByteBuffer srcHeap = ByteBuffer.allocate(src.remaining()), + destHeap = ByteBuffer.allocate(dest.remaining()); + int destHeapPos = destHeap.position(); + srcHeap.put(src); + srcHeap.position(startSrcPos); + codec.decompress(srcHeap, destHeap); + destHeap.position(destHeapPos); + int newLen = destHeap.remaining(); + LOG.warn("Fell back to JDK decompressor with memcopy; got " + newLen + " bytes"); + dest.put(destHeap); + dest.position(startPos); + dest.limit(startPos + newLen); + } + + private static String logSomeBytes(ByteBuffer src) { + final int max = 500; + StringBuilder sb = new StringBuilder(); + int base = src.position(), end = base + Math.min(max, src.remaining()); + for (int i = base; i < end; ++i) { + if (i != base) { + sb.append(' '); + } + int b = src.get(i) & 0xff; + if (b <= 0xf) { + sb.append('0'); + } + sb.append(Integer.toHexString(b)); + } + int rem = src.remaining() - max; + if (rem > 0) { + sb.append(" ... (").append(rem).append(" bytes)]"); + } else { + sb.append("]"); + } + return sb.toString(); } private void ponderReleaseInitialRefcount( @@ -1865,12 +1920,19 @@ class EncodedReaderImpl implements EncodedReader { if (lastCached != null) { iter = lastCached; } + if (isTracingEnabled) { + traceLogBuffersUsedToParse(csd); + } CodedInputStream cis = CodedInputStream.newInstance( new IndexStream(csd.getCacheBuffers(), sctx.length)); cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT); switch (sctx.kind) { case ROW_INDEX: - index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis); + OrcProto.RowIndex tmp = index.getRowGroupIndex()[colIx] + = OrcProto.RowIndex.parseFrom(cis); + if (isTracingEnabled) { + LOG.trace("Index is " + tmp.toString().replace('\n', ' ')); + } break; case BLOOM_FILTER: case BLOOM_FILTER_UTF8: @@ -1911,6 +1973,17 @@ class EncodedReaderImpl implements EncodedReader { } } + private void traceLogBuffersUsedToParse(ColumnStreamData csd) { + String s = "Buffers "; + if (csd.getCacheBuffers() != null) { + for (MemoryBuffer buf : csd.getCacheBuffers()) { + ByteBuffer bb = buf.getByteBufferDup(); + s += "{" + buf + ", " + bb.remaining() + /* " => " + bb.hashCode() + */"}, "; + } + } + LOG.trace(s); + } + private DiskRangeList preReadUncompressedStreams(long stripeOffset, ReadContext[] colCtxs, MutateHelper toRead, IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index df536ea..7986827 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader { * @return The reader. */ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace) throws IOException; + PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException; /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java index 203ef69..4a5ccaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java @@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements @Override public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException { return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(), - bufferSize, rowIndexStride, dataCache, dataReader, pf, trace); + bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool); } }