Repository: hive
Updated Branches:
  refs/heads/master 72684f10d -> 78d5572f8


HIVE-18452 : work around HADOOP-15171 (Sergey Shelukhin, reviewed by Ashutosh 
Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/78d5572f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/78d5572f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/78d5572f

Branch: refs/heads/master
Commit: 78d5572f8e165a907c63e1ef8579f591b8c34563
Parents: 72684f1
Author: sergey <ser...@apache.org>
Authored: Tue Jan 16 16:43:03 2018 -0800
Committer: sergey <ser...@apache.org>
Committed: Tue Jan 16 16:43:03 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../llap/io/encoded/OrcEncodedDataReader.java   | 20 +++--
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 83 ++++++++++++++++++--
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  2 +-
 .../hive/ql/io/orc/encoded/ReaderImpl.java      |  4 +-
 5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 854bbdf..f2e927f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1382,6 +1382,8 @@ public class HiveConf extends Configuration {
         "while writing a table with ORC file format, enabling this config will 
do stripe-level\n" +
         "fast merge for small ORC files. Note that enabling this config will 
not honor the\n" +
         "padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
+    HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", true,
+        "Whether to use codec pool in ORC. Disable if there are bugs with 
codec reuse."),
 
     HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
         "If this is set the header for RCFiles will simply be RCF.  If this is 
not\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 1e0eccf..68bb168 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -161,6 +161,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private final QueryFragmentCounters counters;
   private final UserGroupInformation ugi;
   private final SchemaEvolution evolution;
+  private final boolean useCodecPool;
 
   // Read state.
   private int stripeIxFrom;
@@ -173,6 +174,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private CompressionCodec codec;
   private Object fileKey;
   private FileSystem fs;
+
   /**
    * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that 
need to be read.
    * Contains only stripes that are read, and only columns included. null => 
read all RGs.
@@ -212,6 +214,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    this.useCodecPool = HiveConf.getBoolVar(daemonConf, 
ConfVars.HIVE_ORC_CODEC_POOL);
 
     // moved this part of code from performDataRead as LlapInputFormat need to 
know the file schema
     // to decide if schema evolution is supported or not.
@@ -443,7 +446,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     ensureOrcReader();
     // Reader creation updates HDFS counters, don't do it here.
     DataWrapperForOrc dw = new DataWrapperForOrc();
-    stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, 
trace);
+    stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, 
trace,
+        HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL));
     stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
   }
 
@@ -739,18 +743,24 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
     assert footerRange.next == null; // Can only happens w/zcr for a single 
input buffer.
     if (hasCache) {
-      LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(stripeKey, 
footerRange.getData());
+      LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
+          stripeKey, footerRange.getData().duplicate());
       metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
     }
-    ByteBuffer bb = footerRange.getData();
+    ByteBuffer bb = footerRange.getData().duplicate();
 
     CompressionKind kind = orcReader.getCompressionKind();
-    CompressionCodec codec = OrcCodecPool.getCodec(kind);
+    boolean isPool = useCodecPool;
+    CompressionCodec codec = isPool ? OrcCodecPool.getCodec(kind) : 
WriterImpl.createCodec(kind);
     try {
       return buildStripeFooter(Lists.<DiskRange>newArrayList(new 
BufferChunk(bb, 0)),
           bb.remaining(), codec, orcReader.getCompressionSize());
     } finally {
-      OrcCodecPool.returnCodec(kind, codec);
+      if (isPool) {
+        OrcCodecPool.returnCodec(kind, codec);
+      } else {
+        codec.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 627e617..555bda7 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcConf;
@@ -54,6 +55,7 @@ import org.apache.orc.impl.OutStream;
 import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.impl.StreamName.Area;
+import org.apache.orc.impl.WriterImpl;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc;
@@ -126,6 +128,7 @@ class EncodedReaderImpl implements EncodedReader {
   private final DataReader dataReader;
   private boolean isDataReaderOpen = false;
   private final CompressionCodec codec;
+  private final boolean isCodecFromPool;
   private final boolean isCompressed;
   private final org.apache.orc.CompressionKind compressionKind;
   private final int bufferSize;
@@ -140,11 +143,12 @@ class EncodedReaderImpl implements EncodedReader {
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
       TypeDescription fileSchema, org.apache.orc.CompressionKind kind, 
WriterVersion version,
       int bufferSize, long strideRate, DataCache cacheWrapper, DataReader 
dataReader,
-      PoolFactory pf, IoTrace trace) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
     this.fileKey = fileKey;
     this.compressionKind = kind;
     this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
-    this.codec = OrcCodecPool.getCodec(kind);
+    this.isCodecFromPool = useCodecPool;
+    this.codec = useCodecPool ? OrcCodecPool.getCodec(kind) : 
WriterImpl.createCodec(kind);
     this.types = types;
     this.fileSchema = fileSchema; // Note: this is redundant with types
     this.version = version;
@@ -672,7 +676,11 @@ class EncodedReaderImpl implements EncodedReader {
 
   @Override
   public void close() throws IOException {
-    OrcCodecPool.returnCodec(compressionKind, codec);
+    if (isCodecFromPool) {
+      OrcCodecPool.returnCodec(compressionKind, codec);
+    } else {
+      codec.close();
+    }
     dataReader.close();
   }
 
@@ -1229,14 +1237,61 @@ class EncodedReaderImpl implements EncodedReader {
   private static void decompressChunk(
       ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws 
IOException {
     int startPos = dest.position(), startLim = dest.limit();
+    int startSrcPos = src.position(), startSrcLim = src.limit();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Decompressing " + src.remaining() + " bytes to dest buffer 
pos "
+          + dest.position() + ", limit " + dest.limit());
+    }
     codec.decompress(src, dest);
-    // Codec resets the position to 0 and limit to correct limit.
     dest.position(startPos);
     int newLim = dest.limit();
     if (newLim > startLim) {
       throw new AssertionError("After codec, buffer [" + startPos + ", " + 
startLim
           + ") became [" + dest.position() + ", " + newLim + ")");
     }
+    if (dest.remaining() > 0) return;
+
+    // There's a bug in native decompressor. See HADOOP-15171
+    dest.limit(startLim);
+    src.position(startSrcPos);
+    src.limit(startSrcLim);
+    LOG.warn("The codec has produced 0 bytes for " + src.remaining() + " bytes 
at pos "
+        + src.position() + ", data hash " + src.hashCode() + ": [" +  
logSomeBytes(src));
+    ByteBuffer srcHeap = ByteBuffer.allocate(src.remaining()),
+        destHeap = ByteBuffer.allocate(dest.remaining());
+    int destHeapPos = destHeap.position();
+    srcHeap.put(src);
+    srcHeap.position(startSrcPos);
+    codec.decompress(srcHeap, destHeap);
+    destHeap.position(destHeapPos);
+    int newLen = destHeap.remaining();
+    LOG.warn("Fell back to JDK decompressor with memcopy; got " + newLen + " 
bytes");
+    dest.put(destHeap);
+    dest.position(startPos);
+    dest.limit(startPos + newLen);
+  }
+
+  private static String logSomeBytes(ByteBuffer src) {
+    final int max = 500;
+    StringBuilder sb = new StringBuilder();
+    int base = src.position(), end = base + Math.min(max, src.remaining());
+    for (int i = base; i < end; ++i) {
+      if (i != base) {
+        sb.append(' ');
+      }
+      int b = src.get(i) & 0xff;
+      if (b <= 0xf) {
+        sb.append('0');
+      }
+      sb.append(Integer.toHexString(b));
+    }
+    int rem = src.remaining() - max;
+    if (rem > 0) {
+      sb.append(" ... (").append(rem).append(" bytes)]");
+    } else {
+      sb.append("]");
+    }
+    return sb.toString();
   }
 
   private void ponderReleaseInitialRefcount(
@@ -1865,12 +1920,19 @@ class EncodedReaderImpl implements EncodedReader {
             if (lastCached != null) {
               iter = lastCached;
             }
+            if (isTracingEnabled) {
+              traceLogBuffersUsedToParse(csd);
+            }
             CodedInputStream cis = CodedInputStream.newInstance(
                 new IndexStream(csd.getCacheBuffers(), sctx.length));
             cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
             switch (sctx.kind) {
               case ROW_INDEX:
-                index.getRowGroupIndex()[colIx] = 
OrcProto.RowIndex.parseFrom(cis);
+                OrcProto.RowIndex tmp = index.getRowGroupIndex()[colIx]
+                    = OrcProto.RowIndex.parseFrom(cis);
+                if (isTracingEnabled) {
+                  LOG.trace("Index is " + tmp.toString().replace('\n', ' '));
+                }
                 break;
               case BLOOM_FILTER:
               case BLOOM_FILTER_UTF8:
@@ -1911,6 +1973,17 @@ class EncodedReaderImpl implements EncodedReader {
     }
   }
 
+  private void traceLogBuffersUsedToParse(ColumnStreamData csd) {
+    String s = "Buffers ";
+    if (csd.getCacheBuffers() != null) {
+      for (MemoryBuffer buf : csd.getCacheBuffers()) {
+        ByteBuffer bb = buf.getByteBufferDup();
+        s += "{" + buf + ", " + bb.remaining() + /* " => " + bb.hashCode() + 
*/"}, ";
+      }
+    }
+    LOG.trace(s);
+  }
+
 
   private DiskRangeList preReadUncompressedStreams(long stripeOffset, 
ReadContext[] colCtxs,
       MutateHelper toRead, IdentityHashMap<ByteBuffer, Boolean> toRelease) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index df536ea..7986827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends 
org.apache.hadoop.hive.ql.io.orc.Reader {
    * @return The reader.
    */
   EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader 
dataReader,
-      PoolFactory pf, IoTrace trace) throws IOException;
+      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException;
 
   /** The factory that can create (or return) the pools used by encoded 
reader. */
   public interface PoolFactory {

http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index 203ef69..4a5ccaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends 
org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
 
   @Override
   public EncodedReader encodedReader(Object fileKey, DataCache dataCache, 
DataReader dataReader,
-      PoolFactory pf, IoTrace trace) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
     return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, 
getWriterVersion(),
-        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace);
+        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, 
useCodecPool);
   }
 }

Reply via email to