http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index d7a8c2f..070950c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -53,11 +53,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.orc.ColumnStatistics; -import org.apache.orc.FileMetaInfo; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -553,6 +553,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE); int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); + boolean useSoftReference = HiveConf.getBoolVar(conf, + ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES); cacheStripeDetails = (cacheStripeDetailsSize > 0); @@ -574,7 +576,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, boolean useExternalCache = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED); if (localCache == null) { - localCache = new LocalCache(numThreads, cacheStripeDetailsSize); + localCache = new LocalCache(numThreads, cacheStripeDetailsSize, useSoftReference); } if (useExternalCache) { if (metaCache == null) { @@ -648,20 +650,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final Context context; private final FileSystem fs; private final HdfsFileStatusWithId fileWithId; - private final FileInfo fileInfo; + private final OrcTail orcTail; private final boolean isOriginal; private final List<DeltaMetaData> deltas; private final boolean hasBase; private final ByteBuffer ppdResult; - SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, FileInfo fileInfo, + SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail, boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException { super(dir, context.numBuckets, deltas, covered); this.context = context; this.fs = fs; this.fileWithId = fileWithId; - this.fileInfo = fileInfo; + this.orcTail = orcTail; this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; @@ -669,11 +671,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } @VisibleForTesting - public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, FileInfo fileInfo, + public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, OrcTail orcTail, boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered) throws IOException { this(context, fs, AcidUtils.createOriginalObj(null, fileStatus), - fileInfo, isOriginal, deltas, hasBase, dir, covered, null); + orcTail, isOriginal, deltas, hasBase, dir, covered, null); } } @@ -728,13 +730,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, FooterCache cache = context.cacheStripeDetails ? ((deltas == null || deltas.isEmpty()) ? context.footerCache : Context.localCache) : null; if (cache != null) { - FileInfo[] infos = new FileInfo[files.size()]; + OrcTail[] orcTails = new OrcTail[files.size()]; ByteBuffer[] ppdResults = null; if (cache.hasPpd()) { ppdResults = new ByteBuffer[files.size()]; } try { - cache.getAndValidate(files, isOriginal, infos, ppdResults); + cache.getAndValidate(files, isOriginal, orcTails, ppdResults); } catch (HiveException e) { throw new IOException(e); } @@ -745,16 +747,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, dir = dirs.get(++dirIx); filesInDirCount = dir.fileCount; } - FileInfo info = infos[i]; + OrcTail orcTail = orcTails[i]; ByteBuffer ppdResult = ppdResults == null ? null : ppdResults[i]; HdfsFileStatusWithId file = files.get(i); - if (info != null) { + if (orcTail != null) { // Cached copy is valid context.cacheHitCounter.incrementAndGet(); } // Ignore files eliminated by PPD, or of 0 length. if (ppdResult != FooterCache.NO_SPLIT_AFTER_PPD && file.getFileStatus().getLen() > 0) { - result.add(new SplitInfo(context, dir.fs, file, info, + result.add(new SplitInfo(context, dir.fs, file, orcTail, isOriginal, deltas, true, dir.dir, covered, ppdResult)); } } @@ -921,7 +923,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) { OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(), entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true, - deltas, -1); + deltas, -1, fileStatus.getLen()); splits.add(orcSplit); } } @@ -963,7 +965,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (!deltas.isEmpty()) { for (int b = 0; b < numBuckets; ++b) { if (!covered[b]) { - splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1)); + splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1)); } } } @@ -1054,9 +1056,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final Long fsFileId; private final long blockSize; private final TreeMap<Long, BlockLocation> locations; - private final FileInfo fileInfo; + private OrcTail orcTail; private List<StripeInformation> stripes; - private FileMetaInfo fileMetaInfo; private List<StripeStatistics> stripeStats; private List<OrcProto.Type> types; private boolean[] includedCols; @@ -1078,7 +1079,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.file = splitInfo.fileWithId.getFileStatus(); this.fsFileId = splitInfo.fileWithId.getFileId(); this.blockSize = this.file.getBlockSize(); - this.fileInfo = splitInfo.fileInfo; + this.orcTail = splitInfo.orcTail; // TODO: potential DFS call this.locations = SHIMS.getLocationsWithOffset(fs, file); this.isOriginal = splitInfo.isOriginal; @@ -1129,11 +1130,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, * are written with large block sizes. * @param offset the start of the split * @param length the length of the split - * @param fileMetaInfo file metadata from footer and postscript + * @param orcTail orc tail * @throws IOException */ - OrcSplit createSplit(long offset, long length, - FileMetaInfo fileMetaInfo) throws IOException { + OrcSplit createSplit(long offset, long length, OrcTail orcTail) throws IOException { String[] hosts; Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset); BlockLocation start = startEntry.getValue(); @@ -1196,7 +1196,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, fileKey = new SyntheticFileId(file); } return new OrcSplit(file.getPath(), fileKey, offset, length, hosts, - fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize); + orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen); } private static final class OffsetAndLength { // Java cruft; pair of long. @@ -1271,7 +1271,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, int index = si.getIndex(); if (lastIdx >= 0 && lastIdx + 1 != index && current.offset != -1) { // Create split for the previous unfinished stripe. - splits.add(createSplit(current.offset, current.length, null)); + splits.add(createSplit(current.offset, current.length, orcTail)); current.offset = -1; } lastIdx = index; @@ -1305,16 +1305,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (!includeStripe[idx]) { // create split for the previous unfinished stripe if (current.offset != -1) { - splits.add(createSplit(current.offset, current.length, fileMetaInfo)); + splits.add(createSplit(current.offset, current.length, orcTail)); current.offset = -1; } continue; } current = generateOrUpdateSplit( - splits, current, stripe.getOffset(), stripe.getLength(), fileMetaInfo); + splits, current, stripe.getOffset(), stripe.getLength(), orcTail); } - generateLastSplit(splits, current, fileMetaInfo); + generateLastSplit(splits, current, orcTail); // Add uncovered ACID delta splits. splits.addAll(deltaSplits); @@ -1323,12 +1323,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private OffsetAndLength generateOrUpdateSplit( List<OrcSplit> splits, OffsetAndLength current, long offset, - long length, FileMetaInfo fileMetaInfo) throws IOException { + long length, OrcTail orcTail) throws IOException { // if we are working on a stripe, over the min stripe size, and // crossed a block boundary, cut the input split here. if (current.offset != -1 && current.length > context.minSize && (current.offset / blockSize != offset / blockSize)) { - splits.add(createSplit(current.offset, current.length, fileMetaInfo)); + splits.add(createSplit(current.offset, current.length, orcTail)); current.offset = -1; } // if we aren't building a split, start a new one. @@ -1339,59 +1339,46 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, current.length = (offset + length) - current.offset; } if (current.length >= context.maxSize) { - splits.add(createSplit(current.offset, current.length, fileMetaInfo)); + splits.add(createSplit(current.offset, current.length, orcTail)); current.offset = -1; } return current; } private void generateLastSplit(List<OrcSplit> splits, OffsetAndLength current, - FileMetaInfo fileMetaInfo) throws IOException { + OrcTail orcTail) throws IOException { if (current.offset == -1) return; - splits.add(createSplit(current.offset, current.length, fileMetaInfo)); + splits.add(createSplit(current.offset, current.length, orcTail)); } private void populateAndCacheStripeDetails() throws IOException { - // Only create OrcReader if we are missing some information. - List<OrcProto.ColumnStatistics> colStatsLocal; - List<OrcProto.Type> typesLocal; - if (fileInfo != null) { - stripes = fileInfo.stripeInfos; - stripeStats = fileInfo.stripeStats; - fileMetaInfo = fileInfo.fileMetaInfo; - typesLocal = types = fileInfo.types; - colStatsLocal = fileInfo.fileStats; - writerVersion = fileInfo.writerVersion; - // For multiple runs, in case sendSplitsInFooter changes - if (fileMetaInfo == null && context.footerInSplits) { - Reader orcReader = createOrcReader(); - fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo(); - assert fileInfo.stripeStats != null && fileInfo.types != null - && fileInfo.writerVersion != null; - // We assume that if we needed to create a reader, we need to cache it to meta cache. - // This will also needlessly overwrite it in local cache for now. - context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, orcReader); - } - } else { - Reader orcReader = createOrcReader(); - stripes = orcReader.getStripes(); - typesLocal = types = orcReader.getTypes(); - colStatsLocal = orcReader.getOrcProtoFileStatistics(); - writerVersion = orcReader.getWriterVersion(); - stripeStats = orcReader.getStripeStatistics(); - fileMetaInfo = context.footerInSplits ? - ((ReaderImpl) orcReader).getFileMetaInfo() : null; + // When reading the file for first time we get the orc tail from the orc reader and cache it + // in the footer cache. Subsequent requests will get the orc tail from the cache (if file + // length and modification time is not changed) and populate the split info. If the split info + // object contains the orc tail from the cache then we can skip creating orc reader avoiding + // filesystem calls. + if (orcTail == null) { + Reader orcReader = OrcFile.createReader(file.getPath(), + OrcFile.readerOptions(context.conf) + .filesystem(fs) + .maxLength(file.getLen())); + orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(), + file.getModificationTime()); if (context.cacheStripeDetails) { - context.footerCache.put(fsFileId, file, fileMetaInfo, orcReader); + context.footerCache.put(new FooterCacheKey(fsFileId, file.getPath()), orcTail); } } + stripes = orcTail.getStripes(); + stripeStats = orcTail.getStripeStatistics(); + types = orcTail.getTypes(); + writerVersion = orcTail.getWriterVersion(); includedCols = genIncludedColumns(types, context.conf, isOriginal); - projColsUncompressedSize = computeProjectionSize(typesLocal, colStatsLocal, includedCols, isOriginal); - } - - private Reader createOrcReader() throws IOException { - return OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen())); + List<OrcProto.ColumnStatistics> fileColStats = orcTail.getFooter().getStatisticsList(); + projColsUncompressedSize = computeProjectionSize(types, fileColStats, includedCols, + isOriginal); + if (!context.footerInSplits) { + orcTail = null; + } } private long computeProjectionSize(List<OrcProto.Type> types, @@ -1595,40 +1582,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return result.toArray(new InputSplit[result.size()]); } - /** - * FileInfo. - * - * Stores information relevant to split generation for an ORC File. - * - */ - static class FileInfo { - final long modificationTime; - final long size; - final Long fileId; - private final List<StripeInformation> stripeInfos; - private FileMetaInfo fileMetaInfo; - private final List<StripeStatistics> stripeStats; - private final List<OrcProto.ColumnStatistics> fileStats; - private final List<OrcProto.Type> types; - private final OrcFile.WriterVersion writerVersion; - - - FileInfo(long modificationTime, long size, List<StripeInformation> stripeInfos, - List<StripeStatistics> stripeStats, List<OrcProto.Type> types, - List<OrcProto.ColumnStatistics> fileStats, FileMetaInfo fileMetaInfo, - OrcFile.WriterVersion writerVersion, Long fileId) { - this.modificationTime = modificationTime; - this.size = size; - this.fileId = fileId; - this.stripeInfos = stripeInfos; - this.fileMetaInfo = fileMetaInfo; - this.stripeStats = stripeStats; - this.types = types; - this.fileStats = fileStats; - this.writerVersion = writerVersion; - } - } - @SuppressWarnings("unchecked") private org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> createVectorizedReader(InputSplit split, JobConf conf, Reporter reporter @@ -1643,18 +1596,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Reporter reporter) throws IOException { boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf); boolean isAcidRead = isAcidRead(conf, inputSplit); - if (!isAcidRead) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } else { + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf); + if (inputSplit instanceof OrcSplit) { + OrcSplit split = (OrcSplit) inputSplit; + readerOptions.maxLength(split.getFileLength()).orcTail(split.getOrcTail()); + } return new OrcRecordReader(OrcFile.createReader( ((FileSplit) inputSplit).getPath(), - OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit); + readerOptions), + conf, (FileSplit) inputSplit); } } - OrcSplit split = (OrcSplit) inputSplit; reporter.setStatus(inputSplit.toString()); Options options = new Options(conf).reporter(reporter); @@ -1759,7 +1716,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (split.hasBase()) { bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) .getBucket(); - reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) + .maxLength(split.getFileLength()); + if (split.hasFooter()) { + readerOptions.orcTail(split.getOrcTail()); + } + reader = OrcFile.createReader(path, readerOptions); } else { bucket = (int) split.getStart(); reader = null; @@ -1982,16 +1944,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, * Represents footer cache. */ public interface FooterCache { - static final ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]); + ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]); void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal, - FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, HiveException; + OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, HiveException; boolean hasPpd(); boolean isBlocking(); - void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader) - throws IOException; + void put(FooterCacheKey cacheKey, OrcTail orcTail) throws IOException; } + public static class FooterCacheKey { + Long fileId; // used by external cache + Path path; // used by local cache + + FooterCacheKey(Long fileId, Path path) { + this.fileId = fileId; + this.path = path; + } + + public Long getFileId() { + return fileId; + } + + public Path getPath() { + return path; + } + } /** * Convert a Hive type property string that contains separated type names into a list of * TypeDescription objects.
http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java index 2e63aba..0c85827 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java @@ -20,27 +20,25 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.io.AcidInputFormat; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.orc.FileMetaInfo; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.OrcTail; /** * OrcFileSplit. Holds file meta info * */ public class OrcNewSplit extends FileSplit { - private FileMetaInfo fileMetaInfo; + private OrcTail orcTail; private boolean hasFooter; private boolean isOriginal; private boolean hasBase; private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); - private OrcFile.WriterVersion writerVersion; protected OrcNewSplit(){ //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. @@ -52,7 +50,7 @@ public class OrcNewSplit extends FileSplit { public OrcNewSplit(OrcSplit inner) throws IOException { super(inner.getPath(), inner.getStart(), inner.getLength(), inner.getLocations()); - this.fileMetaInfo = inner.getFileMetaInfo(); + this.orcTail = inner.getOrcTail(); this.hasFooter = inner.hasFooter(); this.isOriginal = inner.isOriginal(); this.hasBase = inner.hasBase(); @@ -73,20 +71,11 @@ public class OrcNewSplit extends FileSplit { delta.write(out); } if (hasFooter) { - // serialize FileMetaInfo fields - Text.writeString(out, fileMetaInfo.compressionType); - WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); - WritableUtils.writeVInt(out, fileMetaInfo.metadataSize); - - // serialize FileMetaInfo field footer - ByteBuffer footerBuff = fileMetaInfo.footerBuffer; - footerBuff.reset(); - - // write length of buffer - WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); - out.write(footerBuff.array(), footerBuff.position(), - footerBuff.limit() - footerBuff.position()); - WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId()); + OrcProto.FileTail fileTail = orcTail.getMinimalFileTail(); + byte[] tailBuffer = fileTail.toByteArray(); + int tailLen = tailBuffer.length; + WritableUtils.writeVInt(out, tailLen); + out.write(tailBuffer); } } @@ -108,25 +97,16 @@ public class OrcNewSplit extends FileSplit { deltas.add(dmd); } if (hasFooter) { - // deserialize FileMetaInfo fields - String compressionType = Text.readString(in); - int bufferSize = WritableUtils.readVInt(in); - int metadataSize = WritableUtils.readVInt(in); - - // deserialize FileMetaInfo field footer - int footerBuffSize = WritableUtils.readVInt(in); - ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); - in.readFully(footerBuff.array(), 0, footerBuffSize); - OrcFile.WriterVersion writerVersion = - ReaderImpl.getWriterVersion(WritableUtils.readVInt(in)); - - fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, - metadataSize, footerBuff, writerVersion); + int tailLen = WritableUtils.readVInt(in); + byte[] tailBuffer = new byte[tailLen]; + in.readFully(tailBuffer); + OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer); + orcTail = new OrcTail(fileTail, null); } } - FileMetaInfo getFileMetaInfo(){ - return fileMetaInfo; + public OrcTail getOrcTail() { + return orcTail; } public boolean hasFooter() { http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 407fd62..969e70e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -18,24 +18,26 @@ package org.apache.hadoop.hive.ql.io.orc; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.orc.FileMetaInfo; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SyntheticFileId; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; - +import org.apache.orc.OrcProto; +import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -43,13 +45,15 @@ import org.apache.hadoop.mapred.FileSplit; * */ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit { - private FileMetaInfo fileMetaInfo; + private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class); + private OrcTail orcTail; private boolean hasFooter; private boolean isOriginal; private boolean hasBase; private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private long projColsUncompressedSize; private transient Object fileKey; + private long fileLen; static final int HAS_SYNTHETIC_FILEID_FLAG = 16; static final int HAS_LONG_FILEID_FLAG = 8; @@ -65,25 +69,40 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit } public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts, - FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, - List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) { + OrcTail orcTail, boolean isOriginal, boolean hasBase, + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen) { super(path, offset, length, hosts); // For HDFS, we could avoid serializing file ID and just replace the path with inode-based // path. However, that breaks bunch of stuff because Hive later looks up things by split path. this.fileKey = fileId; - this.fileMetaInfo = fileMetaInfo; - hasFooter = this.fileMetaInfo != null; + this.orcTail = orcTail; + hasFooter = this.orcTail != null; this.isOriginal = isOriginal; this.hasBase = hasBase; this.deltas.addAll(deltas); this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize; + // setting file length to Long.MAX_VALUE will let orc reader read file length from file system + this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen; } @Override public void write(DataOutput out) throws IOException { - //serialize path, offset, length using FileSplit - super.write(out); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + // serialize path, offset, length using FileSplit + super.write(dos); + int required = bos.size(); + + // write addition payload required for orc + writeAdditionalPayload(dos); + int additional = bos.size() - required; + + out.write(bos.toByteArray()); + LOG.info("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.", additional, + required); + } + private void writeAdditionalPayload(final DataOutputStream out) throws IOException { boolean isFileIdLong = fileKey instanceof Long, isFileIdWritable = fileKey instanceof Writable; int flags = (hasBase ? BASE_FLAG : 0) | (isOriginal ? ORIGINAL_FLAG : 0) | @@ -96,26 +115,18 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit delta.write(out); } if (hasFooter) { - // serialize FileMetaInfo fields - Text.writeString(out, fileMetaInfo.compressionType); - WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); - WritableUtils.writeVInt(out, fileMetaInfo.metadataSize); - - // serialize FileMetaInfo field footer - ByteBuffer footerBuff = fileMetaInfo.footerBuffer; - footerBuff.reset(); - - // write length of buffer - WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); - out.write(footerBuff.array(), footerBuff.position(), - footerBuff.limit() - footerBuff.position()); - WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId()); + OrcProto.FileTail fileTail = orcTail.getMinimalFileTail(); + byte[] tailBuffer = fileTail.toByteArray(); + int tailLen = tailBuffer.length; + WritableUtils.writeVInt(out, tailLen); + out.write(tailBuffer); } if (isFileIdLong) { out.writeLong(((Long)fileKey).longValue()); } else if (isFileIdWritable) { ((Writable)fileKey).write(out); } + out.writeLong(fileLen); } @Override @@ -141,20 +152,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit deltas.add(dmd); } if (hasFooter) { - // deserialize FileMetaInfo fields - String compressionType = Text.readString(in); - int bufferSize = WritableUtils.readVInt(in); - int metadataSize = WritableUtils.readVInt(in); - - // deserialize FileMetaInfo field footer - int footerBuffSize = WritableUtils.readVInt(in); - ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); - in.readFully(footerBuff.array(), 0, footerBuffSize); - OrcFile.WriterVersion writerVersion = - ReaderImpl.getWriterVersion(WritableUtils.readVInt(in)); - - fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, - metadataSize, footerBuff, writerVersion); + int tailLen = WritableUtils.readVInt(in); + byte[] tailBuffer = new byte[tailLen]; + in.readFully(tailBuffer); + OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer); + orcTail = new OrcTail(fileTail, null); } if (hasLongFileId) { fileKey = in.readLong(); @@ -163,10 +165,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit fileId.readFields(in); this.fileKey = fileId; } + fileLen = in.readLong(); } - FileMetaInfo getFileMetaInfo(){ - return fileMetaInfo; + public OrcTail getOrcTail() { + return orcTail; } public boolean hasFooter() { @@ -185,6 +188,10 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit return deltas; } + public long getFileLength() { + return fileLen; + } + /** * If this method returns true, then for sure it is ACID. * However, if it returns false.. it could be ACID or non-ACID. @@ -215,7 +222,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit @Override public String toString() { return "OrcSplit [" + getPath() + ", start=" + getStart() + ", length=" + getLength() - + ", isOriginal=" + isOriginal + ", hasBase=" + hasBase + ", deltas=" - + (deltas == null ? 0 : deltas.size()) + "]"; + + ", isOriginal=" + isOriginal + ", fileLength=" + fileLen + ", hasFooter=" + hasFooter + + ", hasBase=" + hasBase + ", deltas=" + (deltas == null ? 0 : deltas.size()) + "]"; } } http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 0b40fef..9bcdb39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -20,47 +20,22 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import org.apache.orc.impl.BufferChunk; -import org.apache.orc.CompressionCodec; -import org.apache.orc.FileMetaInfo; -import org.apache.orc.FileMetadata; -import org.apache.orc.impl.InStream; -import org.apache.orc.StripeInformation; -import org.apache.orc.StripeStatistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.orc.OrcProto; - -import com.google.common.collect.Lists; -import com.google.protobuf.CodedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ReaderImpl extends org.apache.orc.impl.ReaderImpl implements Reader { private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class); - private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; - private final ObjectInspector inspector; - //serialized footer - Keeping this around for use by getFileMetaInfo() - // will help avoid cpu cycles spend in deserializing at cost of increased - // memory footprint. - private ByteBuffer footerByteBuffer; - // Same for metastore cache - maintains the same background buffer, but includes postscript. - // This will only be set if the file footer/metadata was read from disk. - private ByteBuffer footerMetaAndPsBuffer; - @Override public ObjectInspector getObjectInspector() { return inspector; @@ -83,268 +58,14 @@ public class ReaderImpl extends org.apache.orc.impl.ReaderImpl * @param options options for reading * @throws IOException */ - public ReaderImpl(Path path, - OrcFile.ReaderOptions options) throws IOException { + public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { super(path, options); - FileMetadata fileMetadata = options.getFileMetadata(); - if (fileMetadata != null) { - this.inspector = OrcStruct.createObjectInspector(0, fileMetadata.getTypes()); - } else { - FileMetaInfo footerMetaData; - if (options.getFileMetaInfo() != null) { - footerMetaData = options.getFileMetaInfo(); - } else { - footerMetaData = extractMetaInfoFromFooter(fileSystem, path, - options.getMaxLength()); - } - this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; - MetaInfoObjExtractor rInfo = - new MetaInfoObjExtractor(footerMetaData.compressionType, - footerMetaData.bufferSize, - footerMetaData.metadataSize, - footerMetaData.footerBuffer - ); - this.footerByteBuffer = footerMetaData.footerBuffer; - this.inspector = rInfo.inspector; - } - } - - /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */ - public static FooterInfo extractMetaInfoFromFooter( - ByteBuffer bb, Path srcPath) throws IOException { - // Read the PostScript. Be very careful as some parts of this historically use bb position - // and some use absolute offsets that have to take position into account. - int baseOffset = bb.position(); - int lastByteAbsPos = baseOffset + bb.remaining() - 1; - int psLen = bb.get(lastByteAbsPos) & 0xff; - int psAbsPos = lastByteAbsPos - psLen; - OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos); - assert baseOffset == bb.position(); - - // Extract PS information. - int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(), - footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize; - String compressionType = ps.getCompression().toString(); - CompressionCodec codec = - WriterImpl.createCodec(org.apache.orc.CompressionKind.valueOf - (compressionType)); - int bufferSize = (int)ps.getCompressionBlockSize(); - bb.position(metadataAbsPos); - bb.mark(); - - // Extract metadata and footer. - OrcProto.Metadata metadata = extractMetadata( - bb, metadataAbsPos, metadataSize, codec, bufferSize); - List<StripeStatistics> stats = new ArrayList<>(metadata.getStripeStatsCount()); - for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) { - stats.add(new StripeStatistics(ss.getColStatsList())); - } - OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize); - bb.position(metadataAbsPos); - bb.limit(psAbsPos); - // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess... - FileMetaInfo fmi = new FileMetaInfo( - compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps)); - return new FooterInfo(stats, footer, fmi); - } - - private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos, - int footerSize, CompressionCodec codec, int bufferSize) throws IOException { - bb.position(footerAbsPos); - bb.limit(footerAbsPos + footerSize); - return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); - } - - private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, - int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { - bb.position(metadataAbsPos); - bb.limit(metadataAbsPos + metadataSize); - return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); - } - - private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, - int psLen, int psAbsOffset) throws IOException { - // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here. - assert bb.hasArray(); - CodedInputStream in = CodedInputStream.newInstance( - bb.array(), bb.arrayOffset() + psAbsOffset, psLen); - OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); - checkOrcVersion(LOG, path, ps.getVersionList()); - - // Check compression codec. - switch (ps.getCompression()) { - case NONE: - break; - case ZLIB: - break; - case SNAPPY: - break; - case LZO: - break; - default: - throw new IllegalArgumentException("Unknown compression"); - } - return ps; - } - - private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, - Path path, - long maxFileLength - ) throws IOException { - FSDataInputStream file = fs.open(path); - ByteBuffer buffer = null, fullFooterBuffer = null; - OrcProto.PostScript ps = null; - OrcFile.WriterVersion writerVersion = null; - try { - // figure out the size of the file using the option or filesystem - long size; - if (maxFileLength == Long.MAX_VALUE) { - size = fs.getFileStatus(path).getLen(); - } else { - size = maxFileLength; - } - - //read last bytes into buffer to get PostScript - int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); - buffer = ByteBuffer.allocate(readSize); - assert buffer.position() == 0; - file.readFully((size - readSize), - buffer.array(), buffer.arrayOffset(), readSize); - buffer.position(0); - - //read the PostScript - //get length of PostScript - int psLen = buffer.get(readSize - 1) & 0xff; - ensureOrcFooter(file, path, psLen, buffer); - int psOffset = readSize - 1 - psLen; - ps = extractPostScript(buffer, path, psLen, psOffset); - - int footerSize = (int) ps.getFooterLength(); - int metadataSize = (int) ps.getMetadataLength(); - writerVersion = extractWriterVersion(ps); - - //check if extra bytes need to be read - int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); - if (extra > 0) { - //more bytes need to be read, seek back to the right place and read extra bytes - ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); - file.readFully((size - readSize - extra), extraBuf.array(), - extraBuf.arrayOffset() + extraBuf.position(), extra); - extraBuf.position(extra); - //append with already read bytes - extraBuf.put(buffer); - buffer = extraBuf; - buffer.position(0); - fullFooterBuffer = buffer.slice(); - buffer.limit(footerSize + metadataSize); - } else { - //footer is already in the bytes in buffer, just adjust position, length - buffer.position(psOffset - footerSize - metadataSize); - fullFooterBuffer = buffer.slice(); - buffer.limit(psOffset); - } - - // remember position for later TODO: what later? this comment is useless - buffer.mark(); - } finally { - try { - file.close(); - } catch (IOException ex) { - LOG.error("Failed to close the file after another error", ex); - } - } - - return new FileMetaInfo( - ps.getCompression().toString(), - (int) ps.getCompressionBlockSize(), - (int) ps.getMetadataLength(), - buffer, - ps.getVersionList(), - writerVersion, - fullFooterBuffer - ); - } - - /** - * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl - * from serialized fields. - * As the fields are final, the fields need to be initialized in the constructor and - * can't be done in some helper function. So this helper class is used instead. - * - */ - private static class MetaInfoObjExtractor{ - final org.apache.orc.CompressionKind compressionKind; - final CompressionCodec codec; - final int bufferSize; - final int metadataSize; - final OrcProto.Metadata metadata; - final OrcProto.Footer footer; - final ObjectInspector inspector; - - MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize, - ByteBuffer footerBuffer) throws IOException { - - this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr.toUpperCase()); - this.bufferSize = bufferSize; - this.codec = WriterImpl.createCodec(compressionKind); - this.metadataSize = metadataSize; - - int position = footerBuffer.position(); - int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; - - this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize); - this.footer = extractFooter( - footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize); - - footerBuffer.position(position); - this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList()); - } - } - - public FileMetaInfo getFileMetaInfo() { - return new FileMetaInfo(compressionKind.toString(), bufferSize, - getMetadataSize(), footerByteBuffer, getVersionList(), - getWriterVersion(), footerMetaAndPsBuffer); - } - - /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits - * and so we don't just add fields to it, it's already messy and confusing. */ - public static final class FooterInfo { - private final OrcProto.Footer footer; - private final List<StripeStatistics> metadata; - private final List<StripeInformation> stripes; - private final FileMetaInfo fileMetaInfo; - - private FooterInfo( - List<StripeStatistics> metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) { - this.metadata = metadata; - this.footer = footer; - this.fileMetaInfo = fileMetaInfo; - this.stripes = convertProtoStripesToStripes(footer.getStripesList()); - } - - public OrcProto.Footer getFooter() { - return footer; - } - - public List<StripeStatistics> getMetadata() { - return metadata; - } - - public FileMetaInfo getFileMetaInfo() { - return fileMetaInfo; - } - - public List<StripeInformation> getStripes() { - return stripes; - } + this.inspector = OrcStruct.createObjectInspector(0, types); } @Override public ByteBuffer getSerializedFileFooter() { - return footerMetaAndPsBuffer; + return tail.getSerializedTail(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/c168af26/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index e4d2e6e..32ac34e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -182,8 +182,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect if(fSplit instanceof OrcSplit){ OrcSplit orcSplit = (OrcSplit) fSplit; if (orcSplit.hasFooter()) { - opts.fileMetaInfo(orcSplit.getFileMetaInfo()); + opts.orcTail(orcSplit.getOrcTail()); } + opts.maxLength(orcSplit.getFileLength()); } Reader reader = OrcFile.createReader(path, opts); return new VectorizedOrcRecordReader(reader, conf, fSplit);