http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index d7a8c2f..070950c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -53,11 +53,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.orc.ColumnStatistics;
-import org.apache.orc.FileMetaInfo;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.StripeStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -553,6 +553,8 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
       int numThreads = HiveConf.getIntVar(conf,
           ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+      boolean useSoftReference = HiveConf.getBoolVar(conf,
+          ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);
 
       cacheStripeDetails = (cacheStripeDetailsSize > 0);
 
@@ -574,7 +576,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           boolean useExternalCache = HiveConf.getBoolVar(
               conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED);
           if (localCache == null) {
-            localCache = new LocalCache(numThreads, cacheStripeDetailsSize);
+            localCache = new LocalCache(numThreads, cacheStripeDetailsSize, 
useSoftReference);
           }
           if (useExternalCache) {
             if (metaCache == null) {
@@ -648,20 +650,20 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final Context context;
     private final FileSystem fs;
     private final HdfsFileStatusWithId fileWithId;
-    private final FileInfo fileInfo;
+    private final OrcTail orcTail;
     private final boolean isOriginal;
     private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
     private final ByteBuffer ppdResult;
 
-    SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, 
FileInfo fileInfo,
+    SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, 
OrcTail orcTail,
         boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path 
dir,
         boolean[] covered, ByteBuffer ppdResult) throws IOException {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
       this.fs = fs;
       this.fileWithId = fileWithId;
-      this.fileInfo = fileInfo;
+      this.orcTail = orcTail;
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.hasBase = hasBase;
@@ -669,11 +671,11 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     }
 
     @VisibleForTesting
-    public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, 
FileInfo fileInfo,
+    public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, 
OrcTail orcTail,
         boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, 
Path dir,
         boolean[] covered) throws IOException {
       this(context, fs, AcidUtils.createOriginalObj(null, fileStatus),
-          fileInfo, isOriginal, deltas, hasBase, dir, covered, null);
+          orcTail, isOriginal, deltas, hasBase, dir, covered, null);
     }
   }
 
@@ -728,13 +730,13 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       FooterCache cache = context.cacheStripeDetails ? ((deltas == null || 
deltas.isEmpty())
           ? context.footerCache : Context.localCache) : null;
       if (cache != null) {
-        FileInfo[] infos = new FileInfo[files.size()];
+        OrcTail[] orcTails = new OrcTail[files.size()];
         ByteBuffer[] ppdResults = null;
         if (cache.hasPpd()) {
           ppdResults = new ByteBuffer[files.size()];
         }
         try {
-          cache.getAndValidate(files, isOriginal, infos, ppdResults);
+          cache.getAndValidate(files, isOriginal, orcTails, ppdResults);
         } catch (HiveException e) {
           throw new IOException(e);
         }
@@ -745,16 +747,16 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
             dir = dirs.get(++dirIx);
             filesInDirCount = dir.fileCount;
           }
-          FileInfo info = infos[i];
+          OrcTail orcTail = orcTails[i];
           ByteBuffer ppdResult = ppdResults == null ? null : ppdResults[i];
           HdfsFileStatusWithId file = files.get(i);
-          if (info != null) {
+          if (orcTail != null) {
             // Cached copy is valid
             context.cacheHitCounter.incrementAndGet();
           }
           // Ignore files eliminated by PPD, or of 0 length.
           if (ppdResult != FooterCache.NO_SPLIT_AFTER_PPD && 
file.getFileStatus().getLen() > 0) {
-            result.add(new SplitInfo(context, dir.fs, file, info,
+            result.add(new SplitInfo(context, dir.fs, file, orcTail,
                 isOriginal, deltas, true, dir.dir, covered, ppdResult));
           }
         }
@@ -921,7 +923,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) 
{
             OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 
entry.getKey(),
                 entry.getValue().getLength(), entry.getValue().getHosts(), 
null, isOriginal, true,
-                deltas, -1);
+                deltas, -1, fileStatus.getLen());
             splits.add(orcSplit);
           }
         }
@@ -963,7 +965,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       if (!deltas.isEmpty()) {
         for (int b = 0; b < numBuckets; ++b) {
           if (!covered[b]) {
-            splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, 
false, false, deltas, -1));
+            splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, 
false, false, deltas, -1, -1));
           }
         }
       }
@@ -1054,9 +1056,8 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final Long fsFileId;
     private final long blockSize;
     private final TreeMap<Long, BlockLocation> locations;
-    private final FileInfo fileInfo;
+    private OrcTail orcTail;
     private List<StripeInformation> stripes;
-    private FileMetaInfo fileMetaInfo;
     private List<StripeStatistics> stripeStats;
     private List<OrcProto.Type> types;
     private boolean[] includedCols;
@@ -1078,7 +1079,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       this.file = splitInfo.fileWithId.getFileStatus();
       this.fsFileId = splitInfo.fileWithId.getFileId();
       this.blockSize = this.file.getBlockSize();
-      this.fileInfo = splitInfo.fileInfo;
+      this.orcTail = splitInfo.orcTail;
       // TODO: potential DFS call
       this.locations = SHIMS.getLocationsWithOffset(fs, file);
       this.isOriginal = splitInfo.isOriginal;
@@ -1129,11 +1130,10 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
      * are written with large block sizes.
      * @param offset the start of the split
      * @param length the length of the split
-     * @param fileMetaInfo file metadata from footer and postscript
+     * @param orcTail orc tail
      * @throws IOException
      */
-    OrcSplit createSplit(long offset, long length,
-                     FileMetaInfo fileMetaInfo) throws IOException {
+    OrcSplit createSplit(long offset, long length, OrcTail orcTail) throws 
IOException {
       String[] hosts;
       Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset);
       BlockLocation start = startEntry.getValue();
@@ -1196,7 +1196,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         fileKey = new SyntheticFileId(file);
       }
       return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
-          fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
+          orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen);
     }
 
     private static final class OffsetAndLength { // Java cruft; pair of long.
@@ -1271,7 +1271,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         int index = si.getIndex();
         if (lastIdx >= 0 && lastIdx + 1 != index && current.offset != -1) {
           // Create split for the previous unfinished stripe.
-          splits.add(createSplit(current.offset, current.length, null));
+          splits.add(createSplit(current.offset, current.length, orcTail));
           current.offset = -1;
         }
         lastIdx = index;
@@ -1305,16 +1305,16 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         if (!includeStripe[idx]) {
           // create split for the previous unfinished stripe
           if (current.offset != -1) {
-            splits.add(createSplit(current.offset, current.length, 
fileMetaInfo));
+            splits.add(createSplit(current.offset, current.length, orcTail));
             current.offset = -1;
           }
           continue;
         }
 
         current = generateOrUpdateSplit(
-            splits, current, stripe.getOffset(), stripe.getLength(), 
fileMetaInfo);
+            splits, current, stripe.getOffset(), stripe.getLength(), orcTail);
       }
-      generateLastSplit(splits, current, fileMetaInfo);
+      generateLastSplit(splits, current, orcTail);
 
       // Add uncovered ACID delta splits.
       splits.addAll(deltaSplits);
@@ -1323,12 +1323,12 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     private OffsetAndLength generateOrUpdateSplit(
         List<OrcSplit> splits, OffsetAndLength current, long offset,
-        long length, FileMetaInfo fileMetaInfo) throws IOException {
+        long length, OrcTail orcTail) throws IOException {
       // if we are working on a stripe, over the min stripe size, and
       // crossed a block boundary, cut the input split here.
       if (current.offset != -1 && current.length > context.minSize &&
           (current.offset / blockSize != offset / blockSize)) {
-        splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+        splits.add(createSplit(current.offset, current.length, orcTail));
         current.offset = -1;
       }
       // if we aren't building a split, start a new one.
@@ -1339,59 +1339,46 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         current.length = (offset + length) - current.offset;
       }
       if (current.length >= context.maxSize) {
-        splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+        splits.add(createSplit(current.offset, current.length, orcTail));
         current.offset = -1;
       }
       return current;
     }
 
     private void generateLastSplit(List<OrcSplit> splits, OffsetAndLength 
current,
-        FileMetaInfo fileMetaInfo) throws IOException {
+        OrcTail orcTail) throws IOException {
       if (current.offset == -1) return;
-      splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+      splits.add(createSplit(current.offset, current.length, orcTail));
     }
 
     private void populateAndCacheStripeDetails() throws IOException {
-      // Only create OrcReader if we are missing some information.
-      List<OrcProto.ColumnStatistics> colStatsLocal;
-      List<OrcProto.Type> typesLocal;
-      if (fileInfo != null) {
-        stripes = fileInfo.stripeInfos;
-        stripeStats = fileInfo.stripeStats;
-        fileMetaInfo = fileInfo.fileMetaInfo;
-        typesLocal = types = fileInfo.types;
-        colStatsLocal = fileInfo.fileStats;
-        writerVersion = fileInfo.writerVersion;
-        // For multiple runs, in case sendSplitsInFooter changes
-        if (fileMetaInfo == null && context.footerInSplits) {
-          Reader orcReader = createOrcReader();
-          fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
-          assert fileInfo.stripeStats != null && fileInfo.types != null
-              && fileInfo.writerVersion != null;
-          // We assume that if we needed to create a reader, we need to cache 
it to meta cache.
-          // This will also needlessly overwrite it in local cache for now.
-          context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, 
orcReader);
-        }
-      } else {
-        Reader orcReader = createOrcReader();
-        stripes = orcReader.getStripes();
-        typesLocal = types = orcReader.getTypes();
-        colStatsLocal = orcReader.getOrcProtoFileStatistics();
-        writerVersion = orcReader.getWriterVersion();
-        stripeStats = orcReader.getStripeStatistics();
-        fileMetaInfo = context.footerInSplits ?
-            ((ReaderImpl) orcReader).getFileMetaInfo() : null;
+      // When reading the file for first time we get the orc tail from the orc 
reader and cache it
+      // in the footer cache. Subsequent requests will get the orc tail from 
the cache (if file
+      // length and modification time is not changed) and populate the split 
info. If the split info
+      // object contains the orc tail from the cache then we can skip creating 
orc reader avoiding
+      // filesystem calls.
+      if (orcTail == null) {
+        Reader orcReader = OrcFile.createReader(file.getPath(),
+            OrcFile.readerOptions(context.conf)
+                .filesystem(fs)
+                .maxLength(file.getLen()));
+        orcTail = new OrcTail(orcReader.getFileTail(), 
orcReader.getSerializedFileFooter(),
+            file.getModificationTime());
         if (context.cacheStripeDetails) {
-          context.footerCache.put(fsFileId, file, fileMetaInfo, orcReader);
+          context.footerCache.put(new FooterCacheKey(fsFileId, 
file.getPath()), orcTail);
         }
       }
+      stripes = orcTail.getStripes();
+      stripeStats = orcTail.getStripeStatistics();
+      types = orcTail.getTypes();
+      writerVersion = orcTail.getWriterVersion();
       includedCols = genIncludedColumns(types, context.conf, isOriginal);
-      projColsUncompressedSize = computeProjectionSize(typesLocal, 
colStatsLocal, includedCols, isOriginal);
-    }
-
-    private Reader createOrcReader() throws IOException {
-      return OrcFile.createReader(file.getPath(),
-          
OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen()));
+      List<OrcProto.ColumnStatistics> fileColStats = 
orcTail.getFooter().getStatisticsList();
+      projColsUncompressedSize = computeProjectionSize(types, fileColStats, 
includedCols,
+          isOriginal);
+      if (!context.footerInSplits) {
+        orcTail = null;
+      }
     }
 
     private long computeProjectionSize(List<OrcProto.Type> types,
@@ -1595,40 +1582,6 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     return result.toArray(new InputSplit[result.size()]);
   }
 
-  /**
-   * FileInfo.
-   *
-   * Stores information relevant to split generation for an ORC File.
-   *
-   */
-  static class FileInfo {
-    final long modificationTime;
-    final long size;
-    final Long fileId;
-    private final List<StripeInformation> stripeInfos;
-    private FileMetaInfo fileMetaInfo;
-    private final List<StripeStatistics> stripeStats;
-    private final List<OrcProto.ColumnStatistics> fileStats;
-    private final List<OrcProto.Type> types;
-    private final OrcFile.WriterVersion writerVersion;
-
-
-    FileInfo(long modificationTime, long size, List<StripeInformation> 
stripeInfos,
-             List<StripeStatistics> stripeStats, List<OrcProto.Type> types,
-             List<OrcProto.ColumnStatistics> fileStats, FileMetaInfo 
fileMetaInfo,
-             OrcFile.WriterVersion writerVersion, Long fileId) {
-      this.modificationTime = modificationTime;
-      this.size = size;
-      this.fileId = fileId;
-      this.stripeInfos = stripeInfos;
-      this.fileMetaInfo = fileMetaInfo;
-      this.stripeStats = stripeStats;
-      this.types = types;
-      this.fileStats = fileStats;
-      this.writerVersion = writerVersion;
-    }
-  }
-
   @SuppressWarnings("unchecked")
   private org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
     createVectorizedReader(InputSplit split, JobConf conf, Reporter reporter
@@ -1643,18 +1596,22 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
                   Reporter reporter) throws IOException {
     boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
     boolean isAcidRead = isAcidRead(conf, inputSplit);
-
     if (!isAcidRead) {
       if (vectorMode) {
         return createVectorizedReader(inputSplit, conf, reporter);
       } else {
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+        if (inputSplit instanceof OrcSplit) {
+          OrcSplit split = (OrcSplit) inputSplit;
+          
readerOptions.maxLength(split.getFileLength()).orcTail(split.getOrcTail());
+        }
         return new OrcRecordReader(OrcFile.createReader(
             ((FileSplit) inputSplit).getPath(),
-            OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit);
+            readerOptions),
+            conf, (FileSplit) inputSplit);
       }
     }
 
-    OrcSplit split = (OrcSplit) inputSplit;
     reporter.setStatus(inputSplit.toString());
 
     Options options = new Options(conf).reporter(reporter);
@@ -1759,7 +1716,12 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     if (split.hasBase()) {
       bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
           .getBucket();
-      reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+      OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
+          .maxLength(split.getFileLength());
+      if (split.hasFooter()) {
+        readerOptions.orcTail(split.getOrcTail());
+      }
+      reader = OrcFile.createReader(path, readerOptions);
     } else {
       bucket = (int) split.getStart();
       reader = null;
@@ -1982,16 +1944,32 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
    * Represents footer cache.
    */
   public interface FooterCache {
-    static final ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]);
+    ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]);
 
     void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
-        FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, 
HiveException;
+        OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, 
HiveException;
     boolean hasPpd();
     boolean isBlocking();
-    void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader 
orcReader)
-        throws IOException;
+    void put(FooterCacheKey cacheKey, OrcTail orcTail) throws IOException;
   }
 
+  public static class FooterCacheKey {
+    Long fileId; // used by external cache
+    Path path; // used by local cache
+
+    FooterCacheKey(Long fileId, Path path) {
+      this.fileId = fileId;
+      this.path = path;
+    }
+
+    public Long getFileId() {
+      return fileId;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+  }
   /**
    * Convert a Hive type property string that contains separated type names 
into a list of
    * TypeDescription objects.

http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index 2e63aba..0c85827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -20,27 +20,25 @@ package org.apache.hadoop.hive.ql.io.orc;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.orc.FileMetaInfo;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OrcTail;
 
 /**
  * OrcFileSplit. Holds file meta info
  *
  */
 public class OrcNewSplit extends FileSplit {
-  private FileMetaInfo fileMetaInfo;
+  private OrcTail orcTail;
   private boolean hasFooter;
   private boolean isOriginal;
   private boolean hasBase;
   private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
-  private OrcFile.WriterVersion writerVersion;
 
   protected OrcNewSplit(){
     //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so 
can't use it.
@@ -52,7 +50,7 @@ public class OrcNewSplit extends FileSplit {
   public OrcNewSplit(OrcSplit inner) throws IOException {
     super(inner.getPath(), inner.getStart(), inner.getLength(),
           inner.getLocations());
-    this.fileMetaInfo = inner.getFileMetaInfo();
+    this.orcTail = inner.getOrcTail();
     this.hasFooter = inner.hasFooter();
     this.isOriginal = inner.isOriginal();
     this.hasBase = inner.hasBase();
@@ -73,20 +71,11 @@ public class OrcNewSplit extends FileSplit {
       delta.write(out);
     }
     if (hasFooter) {
-      // serialize FileMetaInfo fields
-      Text.writeString(out, fileMetaInfo.compressionType);
-      WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
-      WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
-
-      // serialize FileMetaInfo field footer
-      ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
-      footerBuff.reset();
-
-      // write length of buffer
-      WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
-      out.write(footerBuff.array(), footerBuff.position(),
-          footerBuff.limit() - footerBuff.position());
-      WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
+      OrcProto.FileTail fileTail = orcTail.getMinimalFileTail();
+      byte[] tailBuffer = fileTail.toByteArray();
+      int tailLen = tailBuffer.length;
+      WritableUtils.writeVInt(out, tailLen);
+      out.write(tailBuffer);
     }
   }
 
@@ -108,25 +97,16 @@ public class OrcNewSplit extends FileSplit {
       deltas.add(dmd);
     }
     if (hasFooter) {
-      // deserialize FileMetaInfo fields
-      String compressionType = Text.readString(in);
-      int bufferSize = WritableUtils.readVInt(in);
-      int metadataSize = WritableUtils.readVInt(in);
-
-      // deserialize FileMetaInfo field footer
-      int footerBuffSize = WritableUtils.readVInt(in);
-      ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
-      in.readFully(footerBuff.array(), 0, footerBuffSize);
-      OrcFile.WriterVersion writerVersion =
-          ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
-
-      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize,
-          metadataSize, footerBuff, writerVersion);
+      int tailLen = WritableUtils.readVInt(in);
+      byte[] tailBuffer = new byte[tailLen];
+      in.readFully(tailBuffer);
+      OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer);
+      orcTail = new OrcTail(fileTail, null);
     }
   }
 
-  FileMetaInfo getFileMetaInfo(){
-    return fileMetaInfo;
+  public OrcTail getOrcTail() {
+    return orcTail;
   }
 
   public boolean hasFooter() {

http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 407fd62..969e70e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -18,24 +18,26 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.orc.FileMetaInfo;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;
-
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OrcTail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -43,13 +45,15 @@ import org.apache.hadoop.mapred.FileSplit;
  *
  */
 public class OrcSplit extends FileSplit implements ColumnarSplit, 
LlapAwareSplit {
-  private FileMetaInfo fileMetaInfo;
+  private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
+  private OrcTail orcTail;
   private boolean hasFooter;
   private boolean isOriginal;
   private boolean hasBase;
   private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
   private long projColsUncompressedSize;
   private transient Object fileKey;
+  private long fileLen;
 
   static final int HAS_SYNTHETIC_FILEID_FLAG = 16;
   static final int HAS_LONG_FILEID_FLAG = 8;
@@ -65,25 +69,40 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
   }
 
   public OrcSplit(Path path, Object fileId, long offset, long length, String[] 
hosts,
-      FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
-      List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
+      OrcTail orcTail, boolean isOriginal, boolean hasBase,
+      List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long 
fileLen) {
     super(path, offset, length, hosts);
     // For HDFS, we could avoid serializing file ID and just replace the path 
with inode-based
     // path. However, that breaks bunch of stuff because Hive later looks up 
things by split path.
     this.fileKey = fileId;
-    this.fileMetaInfo = fileMetaInfo;
-    hasFooter = this.fileMetaInfo != null;
+    this.orcTail = orcTail;
+    hasFooter = this.orcTail != null;
     this.isOriginal = isOriginal;
     this.hasBase = hasBase;
     this.deltas.addAll(deltas);
     this.projColsUncompressedSize = projectedDataSize <= 0 ? length : 
projectedDataSize;
+    // setting file length to Long.MAX_VALUE will let orc reader read file 
length from file system
+    this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    //serialize path, offset, length using FileSplit
-    super.write(out);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    // serialize path, offset, length using FileSplit
+    super.write(dos);
+    int required = bos.size();
+
+    // write addition payload required for orc
+    writeAdditionalPayload(dos);
+    int additional = bos.size() - required;
+
+    out.write(bos.toByteArray());
+    LOG.info("Writing additional {} bytes to OrcSplit as payload. Required {} 
bytes.", additional,
+        required);
+  }
 
+  private void writeAdditionalPayload(final DataOutputStream out) throws 
IOException {
     boolean isFileIdLong = fileKey instanceof Long, isFileIdWritable = fileKey 
instanceof Writable;
     int flags = (hasBase ? BASE_FLAG : 0) |
         (isOriginal ? ORIGINAL_FLAG : 0) |
@@ -96,26 +115,18 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
       delta.write(out);
     }
     if (hasFooter) {
-      // serialize FileMetaInfo fields
-      Text.writeString(out, fileMetaInfo.compressionType);
-      WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
-      WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
-
-      // serialize FileMetaInfo field footer
-      ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
-      footerBuff.reset();
-
-      // write length of buffer
-      WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
-      out.write(footerBuff.array(), footerBuff.position(),
-          footerBuff.limit() - footerBuff.position());
-      WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
+      OrcProto.FileTail fileTail = orcTail.getMinimalFileTail();
+      byte[] tailBuffer = fileTail.toByteArray();
+      int tailLen = tailBuffer.length;
+      WritableUtils.writeVInt(out, tailLen);
+      out.write(tailBuffer);
     }
     if (isFileIdLong) {
       out.writeLong(((Long)fileKey).longValue());
     } else if (isFileIdWritable) {
       ((Writable)fileKey).write(out);
     }
+    out.writeLong(fileLen);
   }
 
   @Override
@@ -141,20 +152,11 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
       deltas.add(dmd);
     }
     if (hasFooter) {
-      // deserialize FileMetaInfo fields
-      String compressionType = Text.readString(in);
-      int bufferSize = WritableUtils.readVInt(in);
-      int metadataSize = WritableUtils.readVInt(in);
-
-      // deserialize FileMetaInfo field footer
-      int footerBuffSize = WritableUtils.readVInt(in);
-      ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
-      in.readFully(footerBuff.array(), 0, footerBuffSize);
-      OrcFile.WriterVersion writerVersion =
-          ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
-
-      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize,
-          metadataSize, footerBuff, writerVersion);
+      int tailLen = WritableUtils.readVInt(in);
+      byte[] tailBuffer = new byte[tailLen];
+      in.readFully(tailBuffer);
+      OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer);
+      orcTail = new OrcTail(fileTail, null);
     }
     if (hasLongFileId) {
       fileKey = in.readLong();
@@ -163,10 +165,11 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
       fileId.readFields(in);
       this.fileKey = fileId;
     }
+    fileLen = in.readLong();
   }
 
-  FileMetaInfo getFileMetaInfo(){
-    return fileMetaInfo;
+  public OrcTail getOrcTail() {
+    return orcTail;
   }
 
   public boolean hasFooter() {
@@ -185,6 +188,10 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
     return deltas;
   }
 
+  public long getFileLength() {
+    return fileLen;
+  }
+
   /**
    * If this method returns true, then for sure it is ACID.
    * However, if it returns false.. it could be ACID or non-ACID.
@@ -215,7 +222,7 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit, LlapAwareSplit
   @Override
   public String toString() {
     return "OrcSplit [" + getPath() + ", start=" + getStart() + ", length=" + 
getLength()
-        + ", isOriginal=" + isOriginal + ", hasBase=" + hasBase + ", deltas="
-        + (deltas == null ? 0 : deltas.size()) + "]";
+        + ", isOriginal=" + isOriginal + ", fileLength=" + fileLen + ", 
hasFooter=" + hasFooter +
+        ", hasBase=" + hasBase + ", deltas=" + (deltas == null ? 0 : 
deltas.size()) + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 0b40fef..9bcdb39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -20,47 +20,22 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
-import org.apache.orc.impl.BufferChunk;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.FileMetaInfo;
-import org.apache.orc.FileMetadata;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.orc.OrcProto;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.CodedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ReaderImpl extends org.apache.orc.impl.ReaderImpl
                         implements Reader {
 
   private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
 
-  private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
-
   private final ObjectInspector inspector;
 
-  //serialized footer - Keeping this around for use by getFileMetaInfo()
-  // will help avoid cpu cycles spend in deserializing at cost of increased
-  // memory footprint.
-  private ByteBuffer footerByteBuffer;
-  // Same for metastore cache - maintains the same background buffer, but 
includes postscript.
-  // This will only be set if the file footer/metadata was read from disk.
-  private ByteBuffer footerMetaAndPsBuffer;
-
   @Override
   public ObjectInspector getObjectInspector() {
     return inspector;
@@ -83,268 +58,14 @@ public class ReaderImpl extends 
org.apache.orc.impl.ReaderImpl
    * @param options options for reading
    * @throws IOException
    */
-  public ReaderImpl(Path path,
-                    OrcFile.ReaderOptions options) throws IOException {
+  public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws 
IOException {
     super(path, options);
-    FileMetadata fileMetadata = options.getFileMetadata();
-    if (fileMetadata != null) {
-      this.inspector =  OrcStruct.createObjectInspector(0, 
fileMetadata.getTypes());
-    } else {
-      FileMetaInfo footerMetaData;
-      if (options.getFileMetaInfo() != null) {
-        footerMetaData = options.getFileMetaInfo();
-      } else {
-        footerMetaData = extractMetaInfoFromFooter(fileSystem, path,
-            options.getMaxLength());
-      }
-      this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
-      MetaInfoObjExtractor rInfo =
-          new MetaInfoObjExtractor(footerMetaData.compressionType,
-                                   footerMetaData.bufferSize,
-                                   footerMetaData.metadataSize,
-                                   footerMetaData.footerBuffer
-                                   );
-      this.footerByteBuffer = footerMetaData.footerBuffer;
-      this.inspector = rInfo.inspector;
-    }
-  }
-
-  /** Extracts the necessary metadata from an externally store buffer 
(fullFooterBuffer). */
-  public static FooterInfo extractMetaInfoFromFooter(
-      ByteBuffer bb, Path srcPath) throws IOException {
-    // Read the PostScript. Be very careful as some parts of this historically 
use bb position
-    // and some use absolute offsets that have to take position into account.
-    int baseOffset = bb.position();
-    int lastByteAbsPos = baseOffset + bb.remaining() - 1;
-    int psLen = bb.get(lastByteAbsPos) & 0xff;
-    int psAbsPos = lastByteAbsPos - psLen;
-    OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos);
-    assert baseOffset == bb.position();
-
-    // Extract PS information.
-    int footerSize = (int)ps.getFooterLength(), metadataSize = 
(int)ps.getMetadataLength(),
-        footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - 
metadataSize;
-    String compressionType = ps.getCompression().toString();
-    CompressionCodec codec =
-        WriterImpl.createCodec(org.apache.orc.CompressionKind.valueOf
-            (compressionType));
-    int bufferSize = (int)ps.getCompressionBlockSize();
-    bb.position(metadataAbsPos);
-    bb.mark();
-
-    // Extract metadata and footer.
-    OrcProto.Metadata metadata = extractMetadata(
-        bb, metadataAbsPos, metadataSize, codec, bufferSize);
-    List<StripeStatistics> stats = new 
ArrayList<>(metadata.getStripeStatsCount());
-    for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) {
-      stats.add(new StripeStatistics(ss.getColStatsList()));
-    }
-    OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, 
codec, bufferSize);
-    bb.position(metadataAbsPos);
-    bb.limit(psAbsPos);
-    // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess...
-    FileMetaInfo fmi = new FileMetaInfo(
-        compressionType, bufferSize, metadataSize, bb, 
extractWriterVersion(ps));
-    return new FooterInfo(stats, footer, fmi);
-  }
-
-  private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
-      int footerSize, CompressionCodec codec, int bufferSize) throws 
IOException {
-    bb.position(footerAbsPos);
-    bb.limit(footerAbsPos + footerSize);
-    return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
-        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, 
codec, bufferSize));
-  }
-
-  private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int 
metadataAbsPos,
-      int metadataSize, CompressionCodec codec, int bufferSize) throws 
IOException {
-    bb.position(metadataAbsPos);
-    bb.limit(metadataAbsPos + metadataSize);
-    return 
OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
-        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, 
codec, bufferSize));
-  }
-
-  private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path 
path,
-      int psLen, int psAbsOffset) throws IOException {
-    // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should 
be used here.
-    assert bb.hasArray();
-    CodedInputStream in = CodedInputStream.newInstance(
-        bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
-    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
-    checkOrcVersion(LOG, path, ps.getVersionList());
-
-    // Check compression codec.
-    switch (ps.getCompression()) {
-      case NONE:
-        break;
-      case ZLIB:
-        break;
-      case SNAPPY:
-        break;
-      case LZO:
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown compression");
-    }
-    return ps;
-  }
-
-  private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
-                                                        Path path,
-                                                        long maxFileLength
-                                                        ) throws IOException {
-    FSDataInputStream file = fs.open(path);
-    ByteBuffer buffer = null, fullFooterBuffer = null;
-    OrcProto.PostScript ps = null;
-    OrcFile.WriterVersion writerVersion = null;
-    try {
-      // figure out the size of the file using the option or filesystem
-      long size;
-      if (maxFileLength == Long.MAX_VALUE) {
-        size = fs.getFileStatus(path).getLen();
-      } else {
-        size = maxFileLength;
-      }
-
-      //read last bytes into buffer to get PostScript
-      int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
-      buffer = ByteBuffer.allocate(readSize);
-      assert buffer.position() == 0;
-      file.readFully((size - readSize),
-          buffer.array(), buffer.arrayOffset(), readSize);
-      buffer.position(0);
-
-      //read the PostScript
-      //get length of PostScript
-      int psLen = buffer.get(readSize - 1) & 0xff;
-      ensureOrcFooter(file, path, psLen, buffer);
-      int psOffset = readSize - 1 - psLen;
-      ps = extractPostScript(buffer, path, psLen, psOffset);
-
-      int footerSize = (int) ps.getFooterLength();
-      int metadataSize = (int) ps.getMetadataLength();
-      writerVersion = extractWriterVersion(ps);
-
-      //check if extra bytes need to be read
-      int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - 
readSize);
-      if (extra > 0) {
-        //more bytes need to be read, seek back to the right place and read 
extra bytes
-        ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
-        file.readFully((size - readSize - extra), extraBuf.array(),
-            extraBuf.arrayOffset() + extraBuf.position(), extra);
-        extraBuf.position(extra);
-        //append with already read bytes
-        extraBuf.put(buffer);
-        buffer = extraBuf;
-        buffer.position(0);
-        fullFooterBuffer = buffer.slice();
-        buffer.limit(footerSize + metadataSize);
-      } else {
-        //footer is already in the bytes in buffer, just adjust position, 
length
-        buffer.position(psOffset - footerSize - metadataSize);
-        fullFooterBuffer = buffer.slice();
-        buffer.limit(psOffset);
-      }
-
-      // remember position for later TODO: what later? this comment is useless
-      buffer.mark();
-    } finally {
-      try {
-        file.close();
-      } catch (IOException ex) {
-        LOG.error("Failed to close the file after another error", ex);
-      }
-    }
-
-    return new FileMetaInfo(
-        ps.getCompression().toString(),
-        (int) ps.getCompressionBlockSize(),
-        (int) ps.getMetadataLength(),
-        buffer,
-        ps.getVersionList(),
-        writerVersion,
-        fullFooterBuffer
-        );
-  }
-
-  /**
-   * MetaInfoObjExtractor - has logic to create the values for the fields in 
ReaderImpl
-   *  from serialized fields.
-   * As the fields are final, the fields need to be initialized in the 
constructor and
-   *  can't be done in some helper function. So this helper class is used 
instead.
-   *
-   */
-  private static class MetaInfoObjExtractor{
-    final org.apache.orc.CompressionKind compressionKind;
-    final CompressionCodec codec;
-    final int bufferSize;
-    final int metadataSize;
-    final OrcProto.Metadata metadata;
-    final OrcProto.Footer footer;
-    final ObjectInspector inspector;
-
-    MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize, 
-        ByteBuffer footerBuffer) throws IOException {
-
-      this.compressionKind = 
org.apache.orc.CompressionKind.valueOf(codecStr.toUpperCase());
-      this.bufferSize = bufferSize;
-      this.codec = WriterImpl.createCodec(compressionKind);
-      this.metadataSize = metadataSize;
-
-      int position = footerBuffer.position();
-      int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - 
metadataSize;
-
-      this.metadata = extractMetadata(footerBuffer, position, metadataSize, 
codec, bufferSize);
-      this.footer = extractFooter(
-          footerBuffer, position + metadataSize, footerBufferSize, codec, 
bufferSize);
-
-      footerBuffer.position(position);
-      this.inspector = OrcStruct.createObjectInspector(0, 
footer.getTypesList());
-    }
-  }
-
-  public FileMetaInfo getFileMetaInfo() {
-    return new FileMetaInfo(compressionKind.toString(), bufferSize,
-        getMetadataSize(), footerByteBuffer, getVersionList(),
-        getWriterVersion(), footerMetaAndPsBuffer);
-  }
-
-  /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized 
for splits
-   * and so we don't just add fields to it, it's already messy and confusing. 
*/
-  public static final class FooterInfo {
-    private final OrcProto.Footer footer;
-    private final List<StripeStatistics> metadata;
-    private final List<StripeInformation> stripes;
-    private final FileMetaInfo fileMetaInfo;
-
-    private FooterInfo(
-        List<StripeStatistics> metadata, OrcProto.Footer footer, FileMetaInfo 
fileMetaInfo) {
-      this.metadata = metadata;
-      this.footer = footer;
-      this.fileMetaInfo = fileMetaInfo;
-      this.stripes = convertProtoStripesToStripes(footer.getStripesList());
-    }
-
-    public OrcProto.Footer getFooter() {
-      return footer;
-    }
-
-    public List<StripeStatistics> getMetadata() {
-      return metadata;
-    }
-
-    public FileMetaInfo getFileMetaInfo() {
-      return fileMetaInfo;
-    }
-
-    public List<StripeInformation> getStripes() {
-      return stripes;
-    }
+    this.inspector = OrcStruct.createObjectInspector(0, types);
   }
 
   @Override
   public ByteBuffer getSerializedFileFooter() {
-    return footerMetaAndPsBuffer;
+    return tail.getSerializedTail();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index e4d2e6e..32ac34e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -182,8 +182,9 @@ public class VectorizedOrcInputFormat extends 
FileInputFormat<NullWritable, Vect
     if(fSplit instanceof OrcSplit){
       OrcSplit orcSplit = (OrcSplit) fSplit;
       if (orcSplit.hasFooter()) {
-        opts.fileMetaInfo(orcSplit.getFileMetaInfo());
+        opts.orcTail(orcSplit.getOrcTail());
       }
+      opts.maxLength(orcSplit.getFileLength());
     }
     Reader reader = OrcFile.createReader(path, opts);
     return new VectorizedOrcRecordReader(reader, conf, fSplit);

Reply via email to