http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java new file mode 100644 index 0000000..2da590e --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -0,0 +1,758 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.orc.OrcFile; +import org.apache.orc.OrcUtils; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.CompressionCodec; +import org.apache.orc.FileFormatException; +import org.apache.orc.FileMetaInfo; +import org.apache.orc.FileMetadata; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.io.Text; +import org.apache.orc.OrcProto; + +import com.google.common.collect.Lists; +import com.google.protobuf.CodedInputStream; + +public class ReaderImpl implements Reader { + + private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class); + + private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; + + protected final FileSystem fileSystem; + private final long maxLength; + protected final Path path; + protected final org.apache.orc.CompressionKind compressionKind; + protected final CompressionCodec codec; + protected final int bufferSize; + private final List<OrcProto.StripeStatistics> stripeStats; + private final int metadataSize; + protected final List<OrcProto.Type> types; + private final TypeDescription schema; + private final List<OrcProto.UserMetadataItem> userMetadata; + private final List<OrcProto.ColumnStatistics> fileStats; + private final List<StripeInformation> stripes; + protected final int rowIndexStride; + private final long contentLength, numberOfRows; + + + private long deserializedSize = -1; + protected final Configuration conf; + private final List<Integer> versionList; + private final OrcFile.WriterVersion writerVersion; + + // Same for metastore cache - maintains the same background buffer, but includes postscript. + // This will only be set if the file footer/metadata was read from disk. + private final ByteBuffer footerMetaAndPsBuffer; + + public static class StripeInformationImpl + implements StripeInformation { + private final OrcProto.StripeInformation stripe; + + public StripeInformationImpl(OrcProto.StripeInformation stripe) { + this.stripe = stripe; + } + + @Override + public long getOffset() { + return stripe.getOffset(); + } + + @Override + public long getLength() { + return stripe.getDataLength() + getIndexLength() + getFooterLength(); + } + + @Override + public long getDataLength() { + return stripe.getDataLength(); + } + + @Override + public long getFooterLength() { + return stripe.getFooterLength(); + } + + @Override + public long getIndexLength() { + return stripe.getIndexLength(); + } + + @Override + public long getNumberOfRows() { + return stripe.getNumberOfRows(); + } + + @Override + public String toString() { + return "offset: " + getOffset() + " data: " + getDataLength() + + " rows: " + getNumberOfRows() + " tail: " + getFooterLength() + + " index: " + getIndexLength(); + } + } + + @Override + public long getNumberOfRows() { + return numberOfRows; + } + + @Override + public List<String> getMetadataKeys() { + List<String> result = new ArrayList<String>(); + for(OrcProto.UserMetadataItem item: userMetadata) { + result.add(item.getName()); + } + return result; + } + + @Override + public ByteBuffer getMetadataValue(String key) { + for(OrcProto.UserMetadataItem item: userMetadata) { + if (item.hasName() && item.getName().equals(key)) { + return item.getValue().asReadOnlyByteBuffer(); + } + } + throw new IllegalArgumentException("Can't find user metadata " + key); + } + + public boolean hasMetadataValue(String key) { + for(OrcProto.UserMetadataItem item: userMetadata) { + if (item.hasName() && item.getName().equals(key)) { + return true; + } + } + return false; + } + + @Override + public org.apache.orc.CompressionKind getCompressionKind() { + return compressionKind; + } + + @Override + public int getCompressionSize() { + return bufferSize; + } + + @Override + public List<StripeInformation> getStripes() { + return stripes; + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public List<OrcProto.Type> getTypes() { + return types; + } + + @Override + public OrcFile.Version getFileVersion() { + for (OrcFile.Version version: OrcFile.Version.values()) { + if ((versionList != null && !versionList.isEmpty()) && + version.getMajor() == versionList.get(0) && + version.getMinor() == versionList.get(1)) { + return version; + } + } + return OrcFile.Version.V_0_11; + } + + @Override + public OrcFile.WriterVersion getWriterVersion() { + return writerVersion; + } + + @Override + public int getRowIndexStride() { + return rowIndexStride; + } + + @Override + public ColumnStatistics[] getStatistics() { + ColumnStatistics[] result = new ColumnStatistics[types.size()]; + for(int i=0; i < result.length; ++i) { + result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i)); + } + return result; + } + + @Override + public TypeDescription getSchema() { + return schema; + } + + /** + * Ensure this is an ORC file to prevent users from trying to read text + * files or RC files as ORC files. + * @param in the file being read + * @param path the filename for error messages + * @param psLen the postscript length + * @param buffer the tail of the file + * @throws IOException + */ + protected static void ensureOrcFooter(FSDataInputStream in, + Path path, + int psLen, + ByteBuffer buffer) throws IOException { + int magicLength = OrcFile.MAGIC.length(); + int fullLength = magicLength + 1; + if (psLen < fullLength || buffer.remaining() < fullLength) { + throw new FileFormatException("Malformed ORC file " + path + + ". Invalid postscript length " + psLen); + } + int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength; + byte[] array = buffer.array(); + // now look for the magic string at the end of the postscript. + if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) { + // If it isn't there, this may be the 0.11.0 version of ORC. + // Read the first 3 bytes of the file to check for the header + byte[] header = new byte[magicLength]; + in.readFully(0, header, 0, magicLength); + // if it isn't there, this isn't an ORC file + if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) { + throw new FileFormatException("Malformed ORC file " + path + + ". Invalid postscript."); + } + } + } + + /** + * Build a version string out of an array. + * @param version the version number as a list + * @return the human readable form of the version string + */ + private static String versionString(List<Integer> version) { + StringBuilder buffer = new StringBuilder(); + for(int i=0; i < version.size(); ++i) { + if (i != 0) { + buffer.append('.'); + } + buffer.append(version.get(i)); + } + return buffer.toString(); + } + + /** + * Check to see if this ORC file is from a future version and if so, + * warn the user that we may not be able to read all of the column encodings. + * @param log the logger to write any error message to + * @param path the data source path for error messages + * @param version the version of hive that wrote the file. + */ + protected static void checkOrcVersion(Logger log, Path path, + List<Integer> version) { + if (version.size() >= 1) { + int major = version.get(0); + int minor = 0; + if (version.size() >= 2) { + minor = version.get(1); + } + if (major > OrcFile.Version.CURRENT.getMajor() || + (major == OrcFile.Version.CURRENT.getMajor() && + minor > OrcFile.Version.CURRENT.getMinor())) { + log.warn(path + " was written by a future Hive version " + + versionString(version) + + ". This file may not be readable by this version of Hive."); + } + } + } + + /** + * Constructor that let's the user specify additional options. + * @param path pathname for file + * @param options options for reading + * @throws IOException + */ + public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { + FileSystem fs = options.getFilesystem(); + if (fs == null) { + fs = path.getFileSystem(options.getConfiguration()); + } + this.fileSystem = fs; + this.path = path; + this.conf = options.getConfiguration(); + this.maxLength = options.getMaxLength(); + + FileMetadata fileMetadata = options.getFileMetadata(); + if (fileMetadata != null) { + this.compressionKind = fileMetadata.getCompressionKind(); + this.bufferSize = fileMetadata.getCompressionBufferSize(); + this.codec = WriterImpl.createCodec(compressionKind); + this.metadataSize = fileMetadata.getMetadataSize(); + this.stripeStats = fileMetadata.getStripeStats(); + this.versionList = fileMetadata.getVersionList(); + this.writerVersion = + OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum()); + this.types = fileMetadata.getTypes(); + this.rowIndexStride = fileMetadata.getRowIndexStride(); + this.contentLength = fileMetadata.getContentLength(); + this.numberOfRows = fileMetadata.getNumberOfRows(); + this.fileStats = fileMetadata.getFileStats(); + this.stripes = fileMetadata.getStripes(); + this.userMetadata = null; // not cached and not needed here + this.footerMetaAndPsBuffer = null; + } else { + FileMetaInfo footerMetaData; + if (options.getFileMetaInfo() != null) { + footerMetaData = options.getFileMetaInfo(); + this.footerMetaAndPsBuffer = null; + } else { + footerMetaData = extractMetaInfoFromFooter(fs, path, + options.getMaxLength()); + this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; + } + MetaInfoObjExtractor rInfo = + new MetaInfoObjExtractor(footerMetaData.compressionType, + footerMetaData.bufferSize, + footerMetaData.metadataSize, + footerMetaData.footerBuffer + ); + this.compressionKind = rInfo.compressionKind; + this.codec = rInfo.codec; + this.bufferSize = rInfo.bufferSize; + this.metadataSize = rInfo.metadataSize; + this.stripeStats = rInfo.metadata.getStripeStatsList(); + this.types = rInfo.footer.getTypesList(); + this.rowIndexStride = rInfo.footer.getRowIndexStride(); + this.contentLength = rInfo.footer.getContentLength(); + this.numberOfRows = rInfo.footer.getNumberOfRows(); + this.userMetadata = rInfo.footer.getMetadataList(); + this.fileStats = rInfo.footer.getStatisticsList(); + this.versionList = footerMetaData.versionList; + this.writerVersion = footerMetaData.writerVersion; + this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList()); + } + this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0); + } + + /** + * Get the WriterVersion based on the ORC file postscript. + * @param writerVersion the integer writer version + * @return the version of the software that produced the file + */ + public static OrcFile.WriterVersion getWriterVersion(int writerVersion) { + for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) { + if (version.getId() == writerVersion) { + return version; + } + } + return OrcFile.WriterVersion.FUTURE; + } + + private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos, + int footerSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(footerAbsPos); + bb.limit(footerAbsPos + footerSize); + return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); + } + + private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, + int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(metadataAbsPos); + bb.limit(metadataAbsPos + metadataSize); + return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); + } + + private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, + int psLen, int psAbsOffset) throws IOException { + // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here. + assert bb.hasArray(); + CodedInputStream in = CodedInputStream.newInstance( + bb.array(), bb.arrayOffset() + psAbsOffset, psLen); + OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); + checkOrcVersion(LOG, path, ps.getVersionList()); + + // Check compression codec. + switch (ps.getCompression()) { + case NONE: + break; + case ZLIB: + break; + case SNAPPY: + break; + case LZO: + break; + default: + throw new IllegalArgumentException("Unknown compression"); + } + return ps; + } + + private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, + Path path, + long maxFileLength + ) throws IOException { + FSDataInputStream file = fs.open(path); + ByteBuffer buffer = null, fullFooterBuffer = null; + OrcProto.PostScript ps = null; + OrcFile.WriterVersion writerVersion = null; + try { + // figure out the size of the file using the option or filesystem + long size; + if (maxFileLength == Long.MAX_VALUE) { + size = fs.getFileStatus(path).getLen(); + } else { + size = maxFileLength; + } + + //read last bytes into buffer to get PostScript + int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); + buffer = ByteBuffer.allocate(readSize); + assert buffer.position() == 0; + file.readFully((size - readSize), + buffer.array(), buffer.arrayOffset(), readSize); + buffer.position(0); + + //read the PostScript + //get length of PostScript + int psLen = buffer.get(readSize - 1) & 0xff; + ensureOrcFooter(file, path, psLen, buffer); + int psOffset = readSize - 1 - psLen; + ps = extractPostScript(buffer, path, psLen, psOffset); + + int footerSize = (int) ps.getFooterLength(); + int metadataSize = (int) ps.getMetadataLength(); + writerVersion = extractWriterVersion(ps); + + //check if extra bytes need to be read + int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); + if (extra > 0) { + //more bytes need to be read, seek back to the right place and read extra bytes + ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); + file.readFully((size - readSize - extra), extraBuf.array(), + extraBuf.arrayOffset() + extraBuf.position(), extra); + extraBuf.position(extra); + //append with already read bytes + extraBuf.put(buffer); + buffer = extraBuf; + buffer.position(0); + fullFooterBuffer = buffer.slice(); + buffer.limit(footerSize + metadataSize); + } else { + //footer is already in the bytes in buffer, just adjust position, length + buffer.position(psOffset - footerSize - metadataSize); + fullFooterBuffer = buffer.slice(); + buffer.limit(psOffset); + } + + // remember position for later TODO: what later? this comment is useless + buffer.mark(); + } finally { + try { + file.close(); + } catch (IOException ex) { + LOG.error("Failed to close the file after another error", ex); + } + } + + return new FileMetaInfo( + ps.getCompression().toString(), + (int) ps.getCompressionBlockSize(), + (int) ps.getMetadataLength(), + buffer, + ps.getVersionList(), + writerVersion, + fullFooterBuffer + ); + } + + protected static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) { + return (ps.hasWriterVersion() + ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL); + } + + protected static List<StripeInformation> convertProtoStripesToStripes( + List<OrcProto.StripeInformation> stripes) { + List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size()); + for (OrcProto.StripeInformation info : stripes) { + result.add(new StripeInformationImpl(info)); + } + return result; + } + + /** + * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl + * from serialized fields. + * As the fields are final, the fields need to be initialized in the constructor and + * can't be done in some helper function. So this helper class is used instead. + * + */ + private static class MetaInfoObjExtractor{ + final org.apache.orc.CompressionKind compressionKind; + final CompressionCodec codec; + final int bufferSize; + final int metadataSize; + final OrcProto.Metadata metadata; + final OrcProto.Footer footer; + + MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize, + ByteBuffer footerBuffer) throws IOException { + + this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr.toUpperCase()); + this.bufferSize = bufferSize; + this.codec = WriterImpl.createCodec(compressionKind); + this.metadataSize = metadataSize; + + int position = footerBuffer.position(); + int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; + + this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize); + this.footer = extractFooter( + footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize); + + footerBuffer.position(position); + } + } + + @Override + public ByteBuffer getSerializedFileFooter() { + return footerMetaAndPsBuffer; + } + + @Override + public RecordReader rows() throws IOException { + return rows(new Options()); + } + + @Override + public RecordReader rows(Options options) throws IOException { + LOG.info("Reading ORC rows from " + path + " with " + options); + boolean[] include = options.getInclude(); + // if included columns is null, then include all columns + if (include == null) { + include = new boolean[types.size()]; + Arrays.fill(include, true); + options.include(include); + } + return new RecordReaderImpl(this, options); + } + + + @Override + public long getRawDataSize() { + // if the deserializedSize is not computed, then compute it, else + // return the already computed size. since we are reading from the footer + // we don't have to compute deserialized size repeatedly + if (deserializedSize == -1) { + List<Integer> indices = Lists.newArrayList(); + for (int i = 0; i < fileStats.size(); ++i) { + indices.add(i); + } + deserializedSize = getRawDataSizeFromColIndices(indices); + } + return deserializedSize; + } + + @Override + public long getRawDataSizeFromColIndices(List<Integer> colIndices) { + return getRawDataSizeFromColIndices(colIndices, types, fileStats); + } + + public static long getRawDataSizeFromColIndices( + List<Integer> colIndices, List<OrcProto.Type> types, + List<OrcProto.ColumnStatistics> stats) { + long result = 0; + for (int colIdx : colIndices) { + result += getRawDataSizeOfColumn(colIdx, types, stats); + } + return result; + } + + private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types, + List<OrcProto.ColumnStatistics> stats) { + OrcProto.ColumnStatistics colStat = stats.get(colIdx); + long numVals = colStat.getNumberOfValues(); + OrcProto.Type type = types.get(colIdx); + + switch (type.getKind()) { + case BINARY: + // old orc format doesn't support binary statistics. checking for binary + // statistics is not required as protocol buffers takes care of it. + return colStat.getBinaryStatistics().getSum(); + case STRING: + case CHAR: + case VARCHAR: + // old orc format doesn't support sum for string statistics. checking for + // existence is not required as protocol buffers takes care of it. + + // ORC strings are deserialized to java strings. so use java data model's + // string size + numVals = numVals == 0 ? 1 : numVals; + int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen); + case TIMESTAMP: + return numVals * JavaDataModel.get().lengthOfTimestamp(); + case DATE: + return numVals * JavaDataModel.get().lengthOfDate(); + case DECIMAL: + return numVals * JavaDataModel.get().lengthOfDecimal(); + case DOUBLE: + case LONG: + return numVals * JavaDataModel.get().primitive2(); + case FLOAT: + case INT: + case SHORT: + case BOOLEAN: + case BYTE: + return numVals * JavaDataModel.get().primitive1(); + default: + LOG.debug("Unknown primitive category: " + type.getKind()); + break; + } + + return 0; + } + + @Override + public long getRawDataSizeOfColumns(List<String> colNames) { + List<Integer> colIndices = getColumnIndicesFromNames(colNames); + return getRawDataSizeFromColIndices(colIndices); + } + + private List<Integer> getColumnIndicesFromNames(List<String> colNames) { + // top level struct + OrcProto.Type type = types.get(0); + List<Integer> colIndices = Lists.newArrayList(); + List<String> fieldNames = type.getFieldNamesList(); + int fieldIdx; + for (String colName : colNames) { + if (fieldNames.contains(colName)) { + fieldIdx = fieldNames.indexOf(colName); + } else { + String s = "Cannot find field for: " + colName + " in "; + for (String fn : fieldNames) { + s += fn + ", "; + } + LOG.warn(s); + continue; + } + + // a single field may span multiple columns. find start and end column + // index for the requested field + int idxStart = type.getSubtypes(fieldIdx); + + int idxEnd; + + // if the specified is the last field and then end index will be last + // column index + if (fieldIdx + 1 > fieldNames.size() - 1) { + idxEnd = getLastIdx() + 1; + } else { + idxEnd = type.getSubtypes(fieldIdx + 1); + } + + // if start index and end index are same then the field is a primitive + // field else complex field (like map, list, struct, union) + if (idxStart == idxEnd) { + // simple field + colIndices.add(idxStart); + } else { + // complex fields spans multiple columns + for (int i = idxStart; i < idxEnd; i++) { + colIndices.add(i); + } + } + } + return colIndices; + } + + private int getLastIdx() { + Set<Integer> indices = new HashSet<>(); + for (OrcProto.Type type : types) { + indices.addAll(type.getSubtypesList()); + } + return Collections.max(indices); + } + + @Override + public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() { + return stripeStats; + } + + @Override + public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() { + return fileStats; + } + + @Override + public List<StripeStatistics> getStripeStatistics() { + List<StripeStatistics> result = new ArrayList<>(); + for (OrcProto.StripeStatistics ss : stripeStats) { + result.add(new StripeStatistics(ss.getColStatsList())); + } + return result; + } + + public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() { + return userMetadata; + } + + @Override + public List<Integer> getVersionList() { + return versionList; + } + + @Override + public int getMetadataSize() { + return metadataSize; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("ORC Reader("); + buffer.append(path); + if (maxLength != -1) { + buffer.append(", "); + buffer.append(maxLength); + } + buffer.append(")"); + return buffer.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java new file mode 100644 index 0000000..36a802e --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -0,0 +1,1215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.orc.BooleanColumnStatistics; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.CompressionCodec; +import org.apache.orc.DataReader; +import org.apache.orc.DateColumnStatistics; +import org.apache.orc.DecimalColumnStatistics; +import org.apache.orc.DoubleColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.OrcConf; +import org.apache.orc.StringColumnStatistics; +import org.apache.orc.StripeInformation; +import org.apache.orc.TimestampColumnStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.BloomFilterIO; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.ql.util.TimestampUtils; +import org.apache.hadoop.io.Text; +import org.apache.orc.OrcProto; + +public class RecordReaderImpl implements RecordReader { + static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class); + private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); + private static final Object UNKNOWN_VALUE = new Object(); + protected final Path path; + private final long firstRow; + private final List<StripeInformation> stripes = + new ArrayList<StripeInformation>(); + private OrcProto.StripeFooter stripeFooter; + private final long totalRowCount; + private final CompressionCodec codec; + protected final TypeDescription schema; + private final List<OrcProto.Type> types; + private final int bufferSize; + private final boolean[] included; + private final long rowIndexStride; + private long rowInStripe = 0; + private int currentStripe = -1; + private long rowBaseInStripe = 0; + private long rowCountInStripe = 0; + private final Map<StreamName, InStream> streams = + new HashMap<StreamName, InStream>(); + DiskRangeList bufferChunks = null; + private final TreeReaderFactory.TreeReader reader; + private final OrcProto.RowIndex[] indexes; + private final OrcProto.BloomFilterIndex[] bloomFilterIndices; + private final SargApplier sargApp; + // an array about which row groups aren't skipped + private boolean[] includedRowGroups = null; + private final DataReader dataReader; + + /** + * Given a list of column names, find the given column and return the index. + * + * @param columnNames the list of potential column names + * @param columnName the column name to look for + * @param rootColumn offset the result with the rootColumn + * @return the column number or -1 if the column wasn't found + */ + static int findColumns(String[] columnNames, + String columnName, + int rootColumn) { + for(int i=0; i < columnNames.length; ++i) { + if (columnName.equals(columnNames[i])) { + return i + rootColumn; + } + } + return -1; + } + + /** + * Find the mapping from predicate leaves to columns. + * @param sargLeaves the search argument that we need to map + * @param columnNames the names of the columns + * @param rootColumn the offset of the top level row, which offsets the + * result + * @return an array mapping the sarg leaves to concrete column numbers + */ + public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves, + String[] columnNames, + int rootColumn) { + int[] result = new int[sargLeaves.size()]; + Arrays.fill(result, -1); + for(int i=0; i < result.length; ++i) { + String colName = sargLeaves.get(i).getColumnName(); + result[i] = findColumns(columnNames, colName, rootColumn); + } + return result; + } + + protected RecordReaderImpl(ReaderImpl fileReader, + Reader.Options options) throws IOException { + SchemaEvolution treeReaderSchema; + this.included = options.getInclude(); + included[0] = true; + if (options.getSchema() == null) { + if (LOG.isInfoEnabled()) { + LOG.info("Schema on read not provided -- using file schema " + + fileReader.getSchema()); + } + treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included); + } else { + + // Now that we are creating a record reader for a file, validate that the schema to read + // is compatible with the file schema. + // + treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), + options.getSchema(),included); + } + this.schema = treeReaderSchema.getReaderSchema(); + this.path = fileReader.path; + this.codec = fileReader.codec; + this.types = fileReader.types; + this.bufferSize = fileReader.bufferSize; + this.rowIndexStride = fileReader.rowIndexStride; + SearchArgument sarg = options.getSearchArgument(); + if (sarg != null && rowIndexStride != 0) { + sargApp = new SargApplier( + sarg, options.getColumnNames(), rowIndexStride, types, + included.length); + } else { + sargApp = null; + } + long rows = 0; + long skippedRows = 0; + long offset = options.getOffset(); + long maxOffset = options.getMaxOffset(); + for(StripeInformation stripe: fileReader.getStripes()) { + long stripeStart = stripe.getOffset(); + if (offset > stripeStart) { + skippedRows += stripe.getNumberOfRows(); + } else if (stripeStart < maxOffset) { + this.stripes.add(stripe); + rows += stripe.getNumberOfRows(); + } + } + + Boolean zeroCopy = options.getUseZeroCopy(); + if (zeroCopy == null) { + zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf); + } + if (options.getDataReader() != null) { + this.dataReader = options.getDataReader(); + } else { + this.dataReader = RecordReaderUtils.createDefaultDataReader( + DataReaderProperties.builder() + .withBufferSize(bufferSize) + .withCompression(fileReader.compressionKind) + .withFileSystem(fileReader.fileSystem) + .withPath(fileReader.path) + .withTypeCount(types.size()) + .withZeroCopy(zeroCopy) + .build()); + } + this.dataReader.open(); + + firstRow = skippedRows; + totalRowCount = rows; + Boolean skipCorrupt = options.getSkipCorruptRecords(); + if (skipCorrupt == null) { + skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf); + } + + reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(), + treeReaderSchema, included, skipCorrupt); + indexes = new OrcProto.RowIndex[types.size()]; + bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; + advanceToNextRow(reader, 0L, true); + } + + public static final class PositionProviderImpl implements PositionProvider { + private final OrcProto.RowIndexEntry entry; + private int index; + + public PositionProviderImpl(OrcProto.RowIndexEntry entry) { + this(entry, 0); + } + + public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) { + this.entry = entry; + this.index = startPos; + } + + @Override + public long getNext() { + return entry.getPositions(index++); + } + } + + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe + ) throws IOException { + return dataReader.readStripeFooter(stripe); + } + + enum Location { + BEFORE, MIN, MIDDLE, MAX, AFTER + } + + /** + * Given a point and min and max, determine if the point is before, at the + * min, in the middle, at the max, or after the range. + * @param point the point to test + * @param min the minimum point + * @param max the maximum point + * @param <T> the type of the comparision + * @return the location of the point + */ + static <T> Location compareToRange(Comparable<T> point, T min, T max) { + int minCompare = point.compareTo(min); + if (minCompare < 0) { + return Location.BEFORE; + } else if (minCompare == 0) { + return Location.MIN; + } + int maxCompare = point.compareTo(max); + if (maxCompare > 0) { + return Location.AFTER; + } else if (maxCompare == 0) { + return Location.MAX; + } + return Location.MIDDLE; + } + + /** + * Get the maximum value out of an index entry. + * @param index + * the index entry + * @return the object for the maximum value or null if there isn't one + */ + static Object getMax(ColumnStatistics index) { + if (index instanceof IntegerColumnStatistics) { + return ((IntegerColumnStatistics) index).getMaximum(); + } else if (index instanceof DoubleColumnStatistics) { + return ((DoubleColumnStatistics) index).getMaximum(); + } else if (index instanceof StringColumnStatistics) { + return ((StringColumnStatistics) index).getMaximum(); + } else if (index instanceof DateColumnStatistics) { + return ((DateColumnStatistics) index).getMaximum(); + } else if (index instanceof DecimalColumnStatistics) { + return ((DecimalColumnStatistics) index).getMaximum(); + } else if (index instanceof TimestampColumnStatistics) { + return ((TimestampColumnStatistics) index).getMaximum(); + } else if (index instanceof BooleanColumnStatistics) { + if (((BooleanColumnStatistics)index).getTrueCount()!=0) { + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + } else { + return null; + } + } + + /** + * Get the minimum value out of an index entry. + * @param index + * the index entry + * @return the object for the minimum value or null if there isn't one + */ + static Object getMin(ColumnStatistics index) { + if (index instanceof IntegerColumnStatistics) { + return ((IntegerColumnStatistics) index).getMinimum(); + } else if (index instanceof DoubleColumnStatistics) { + return ((DoubleColumnStatistics) index).getMinimum(); + } else if (index instanceof StringColumnStatistics) { + return ((StringColumnStatistics) index).getMinimum(); + } else if (index instanceof DateColumnStatistics) { + return ((DateColumnStatistics) index).getMinimum(); + } else if (index instanceof DecimalColumnStatistics) { + return ((DecimalColumnStatistics) index).getMinimum(); + } else if (index instanceof TimestampColumnStatistics) { + return ((TimestampColumnStatistics) index).getMinimum(); + } else if (index instanceof BooleanColumnStatistics) { + if (((BooleanColumnStatistics)index).getFalseCount()!=0) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } + } else { + return UNKNOWN_VALUE; // null is not safe here + } + } + + /** + * Evaluate a predicate with respect to the statistics from the column + * that is referenced in the predicate. + * @param statsProto the statistics for the column mentioned in the predicate + * @param predicate the leaf predicate we need to evaluation + * @param bloomFilter + * @return the set of truth values that may be returned for the given + * predicate. + */ + static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, + PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) { + ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto); + Object minValue = getMin(cs); + Object maxValue = getMax(cs); + BloomFilterIO bf = null; + if (bloomFilter != null) { + bf = new BloomFilterIO(bloomFilter); + } + return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf); + } + + /** + * Evaluate a predicate with respect to the statistics from the column + * that is referenced in the predicate. + * @param stats the statistics for the column mentioned in the predicate + * @param predicate the leaf predicate we need to evaluation + * @return the set of truth values that may be returned for the given + * predicate. + */ + public static TruthValue evaluatePredicate(ColumnStatistics stats, + PredicateLeaf predicate, + BloomFilterIO bloomFilter) { + Object minValue = getMin(stats); + Object maxValue = getMax(stats); + return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter); + } + + static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, + Object max, boolean hasNull, BloomFilterIO bloomFilter) { + // if we didn't have any values, everything must have been null + if (min == null) { + if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { + return TruthValue.YES; + } else { + return TruthValue.NULL; + } + } else if (min == UNKNOWN_VALUE) { + return TruthValue.YES_NO_NULL; + } + + TruthValue result; + Object baseObj = predicate.getLiteral(); + try { + // Predicate object and stats objects are converted to the type of the predicate object. + Object minValue = getBaseObjectForComparison(predicate.getType(), min); + Object maxValue = getBaseObjectForComparison(predicate.getType(), max); + Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj); + + result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull); + if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) { + result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull); + } + // in case failed conversion, return the default YES_NO_NULL truth value + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + final String statsType = min == null ? + (max == null ? "null" : max.getClass().getSimpleName()) : + min.getClass().getSimpleName(); + final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName(); + final String reason = e.getClass().getSimpleName() + " when evaluating predicate." + + " Skipping ORC PPD." + + " Exception: " + e.getMessage() + + " StatsType: " + statsType + + " PredicateType: " + predicateType; + LOG.warn(reason); + LOG.debug(reason, e); + } + if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) { + result = TruthValue.YES_NO; + } else { + result = TruthValue.YES_NO_NULL; + } + } + return result; + } + + private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate, + TruthValue result, BloomFilterIO bloomFilter) { + // evaluate bloom filter only when + // 1) Bloom filter is available + // 2) Min/Max evaluation yield YES or MAYBE + // 3) Predicate is EQUALS or IN list + if (bloomFilter != null + && result != TruthValue.NO_NULL && result != TruthValue.NO + && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS) + || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) + || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) { + return true; + } + return false; + } + + private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj, + Object minValue, + Object maxValue, + boolean hasNull) { + Location loc; + + switch (predicate.getOperator()) { + case NULL_SAFE_EQUALS: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.AFTER) { + return TruthValue.NO; + } else { + return TruthValue.YES_NO; + } + case EQUALS: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (minValue.equals(maxValue) && loc == Location.MIN) { + return hasNull ? TruthValue.YES_NULL : TruthValue.YES; + } else if (loc == Location.BEFORE || loc == Location.AFTER) { + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + case LESS_THAN: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.AFTER) { + return hasNull ? TruthValue.YES_NULL : TruthValue.YES; + } else if (loc == Location.BEFORE || loc == Location.MIN) { + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + case LESS_THAN_EQUALS: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.AFTER || loc == Location.MAX) { + return hasNull ? TruthValue.YES_NULL : TruthValue.YES; + } else if (loc == Location.BEFORE) { + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + case IN: + if (minValue.equals(maxValue)) { + // for a single value, look through to see if that value is in the + // set + for (Object arg : predicate.getLiteralList()) { + predObj = getBaseObjectForComparison(predicate.getType(), arg); + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.MIN) { + return hasNull ? TruthValue.YES_NULL : TruthValue.YES; + } + } + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + // are all of the values outside of the range? + for (Object arg : predicate.getLiteralList()) { + predObj = getBaseObjectForComparison(predicate.getType(), arg); + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.MIN || loc == Location.MIDDLE || + loc == Location.MAX) { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + } + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } + case BETWEEN: + List<Object> args = predicate.getLiteralList(); + Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0)); + + loc = compareToRange((Comparable) predObj1, minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.MIN) { + Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1)); + + Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue); + if (loc2 == Location.AFTER || loc2 == Location.MAX) { + return hasNull ? TruthValue.YES_NULL : TruthValue.YES; + } else if (loc2 == Location.BEFORE) { + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + } else if (loc == Location.AFTER) { + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + } else { + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + case IS_NULL: + // min = null condition above handles the all-nulls YES case + return hasNull ? TruthValue.YES_NO : TruthValue.NO; + default: + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + } + + private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate, + final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) { + switch (predicate.getOperator()) { + case NULL_SAFE_EQUALS: + // null safe equals does not return *_NULL variant. So set hasNull to false + return checkInBloomFilter(bloomFilter, predObj, false); + case EQUALS: + return checkInBloomFilter(bloomFilter, predObj, hasNull); + case IN: + for (Object arg : predicate.getLiteralList()) { + // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe + Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg); + TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull); + if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) { + return result; + } + } + return hasNull ? TruthValue.NO_NULL : TruthValue.NO; + default: + return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; + } + } + + private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) { + TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO; + + if (predObj instanceof Long) { + if (bf.testLong(((Long) predObj).longValue())) { + result = TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Double) { + if (bf.testDouble(((Double) predObj).doubleValue())) { + result = TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof String || predObj instanceof Text || + predObj instanceof HiveDecimalWritable || + predObj instanceof BigDecimal) { + if (bf.testString(predObj.toString())) { + result = TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Timestamp) { + if (bf.testLong(((Timestamp) predObj).getTime())) { + result = TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Date) { + if (bf.testLong(DateWritable.dateToDays((Date) predObj))) { + result = TruthValue.YES_NO_NULL; + } + } else { + // if the predicate object is null and if hasNull says there are no nulls then return NO + if (predObj == null && !hasNull) { + result = TruthValue.NO; + } else { + result = TruthValue.YES_NO_NULL; + } + } + + if (result == TruthValue.YES_NO_NULL && !hasNull) { + result = TruthValue.YES_NO; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Bloom filter evaluation: " + result.toString()); + } + + return result; + } + + private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) { + if (obj == null) { + return null; + } + switch (type) { + case BOOLEAN: + if (obj instanceof Boolean) { + return obj; + } else { + // will only be true if the string conversion yields "true", all other values are + // considered false + return Boolean.valueOf(obj.toString()); + } + case DATE: + if (obj instanceof Date) { + return obj; + } else if (obj instanceof String) { + return Date.valueOf((String) obj); + } else if (obj instanceof Timestamp) { + return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L); + } + // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?) + break; + case DECIMAL: + if (obj instanceof Boolean) { + return new HiveDecimalWritable(((Boolean) obj).booleanValue() ? + HiveDecimal.ONE : HiveDecimal.ZERO); + } else if (obj instanceof Integer) { + return new HiveDecimalWritable(((Integer) obj).intValue()); + } else if (obj instanceof Long) { + return new HiveDecimalWritable(((Long) obj)); + } else if (obj instanceof Float || obj instanceof Double || + obj instanceof String) { + return new HiveDecimalWritable(obj.toString()); + } else if (obj instanceof BigDecimal) { + return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj)); + } else if (obj instanceof HiveDecimal) { + return new HiveDecimalWritable((HiveDecimal) obj); + } else if (obj instanceof HiveDecimalWritable) { + return obj; + } else if (obj instanceof Timestamp) { + return new HiveDecimalWritable(Double.toString( + TimestampUtils.getDouble((Timestamp) obj))); + } + break; + case FLOAT: + if (obj instanceof Number) { + // widening conversion + return ((Number) obj).doubleValue(); + } else if (obj instanceof HiveDecimal) { + return ((HiveDecimal) obj).doubleValue(); + } else if (obj instanceof String) { + return Double.valueOf(obj.toString()); + } else if (obj instanceof Timestamp) { + return TimestampUtils.getDouble((Timestamp) obj); + } else if (obj instanceof HiveDecimal) { + return ((HiveDecimal) obj).doubleValue(); + } else if (obj instanceof BigDecimal) { + return ((BigDecimal) obj).doubleValue(); + } + break; + case LONG: + if (obj instanceof Number) { + // widening conversion + return ((Number) obj).longValue(); + } else if (obj instanceof HiveDecimal) { + return ((HiveDecimal) obj).longValue(); + } else if (obj instanceof String) { + return Long.valueOf(obj.toString()); + } + break; + case STRING: + if (obj != null) { + return (obj.toString()); + } + break; + case TIMESTAMP: + if (obj instanceof Timestamp) { + return obj; + } else if (obj instanceof Integer) { + return new Timestamp(((Number) obj).longValue()); + } else if (obj instanceof Float) { + return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue()); + } else if (obj instanceof Double) { + return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue()); + } else if (obj instanceof HiveDecimal) { + return TimestampUtils.decimalToTimestamp((HiveDecimal) obj); + } else if (obj instanceof HiveDecimalWritable) { + return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal()); + } else if (obj instanceof Date) { + return new Timestamp(((Date) obj).getTime()); + } + // float/double conversion to timestamp is interpreted as seconds whereas integer conversion + // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting + // is also config driven. The filter operator changes its promotion based on config: + // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases. + break; + default: + break; + } + + throw new IllegalArgumentException(String.format( + "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass() + .getSimpleName(), type)); + } + + public static class SargApplier { + public final static boolean[] READ_ALL_RGS = null; + public final static boolean[] READ_NO_RGS = new boolean[0]; + + private final SearchArgument sarg; + private final List<PredicateLeaf> sargLeaves; + private final int[] filterColumns; + private final long rowIndexStride; + // same as the above array, but indices are set to true + private final boolean[] sargColumns; + + public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride, + List<OrcProto.Type> types, int includedCount) { + this.sarg = sarg; + sargLeaves = sarg.getLeaves(); + filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0); + this.rowIndexStride = rowIndexStride; + // included will not be null, row options will fill the array with trues if null + sargColumns = new boolean[includedCount]; + for (int i : filterColumns) { + // filter columns may have -1 as index which could be partition column in SARG. + if (i > 0) { + sargColumns[i] = true; + } + } + } + + /** + * Pick the row groups that we need to load from the current stripe. + * + * @return an array with a boolean for each row group or null if all of the + * row groups must be read. + * @throws IOException + */ + public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes, + OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException { + long rowsInStripe = stripe.getNumberOfRows(); + int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); + boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc? + TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; + boolean hasSelected = false, hasSkipped = false; + for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) { + for (int pred = 0; pred < leafValues.length; ++pred) { + int columnIx = filterColumns[pred]; + if (columnIx != -1) { + if (indexes[columnIx] == null) { + throw new AssertionError("Index is not populated for " + columnIx); + } + OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup); + if (entry == null) { + throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup); + } + OrcProto.ColumnStatistics stats = entry.getStatistics(); + OrcProto.BloomFilter bf = null; + if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) { + bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup); + } + leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf); + if (LOG.isTraceEnabled()) { + LOG.trace("Stats = " + stats); + LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]); + } + } else { + // the column is a virtual column + leafValues[pred] = TruthValue.YES_NO_NULL; + } + } + result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); + hasSelected = hasSelected || result[rowGroup]; + hasSkipped = hasSkipped || (!result[rowGroup]); + if (LOG.isDebugEnabled()) { + LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + + (rowIndexStride * (rowGroup + 1) - 1) + " is " + + (result[rowGroup] ? "" : "not ") + "included."); + } + } + + return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS; + } + } + + /** + * Pick the row groups that we need to load from the current stripe. + * + * @return an array with a boolean for each row group or null if all of the + * row groups must be read. + * @throws IOException + */ + protected boolean[] pickRowGroups() throws IOException { + // if we don't have a sarg or indexes, we read everything + if (sargApp == null) { + return null; + } + readRowIndex(currentStripe, included, sargApp.sargColumns); + return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false); + } + + private void clearStreams() { + // explicit close of all streams to de-ref ByteBuffers + for (InStream is : streams.values()) { + is.close(); + } + if (bufferChunks != null) { + if (dataReader.isTrackingDiskRanges()) { + for (DiskRangeList range = bufferChunks; range != null; range = range.next) { + if (!(range instanceof BufferChunk)) { + continue; + } + dataReader.releaseBuffer(((BufferChunk) range).getChunk()); + } + } + } + bufferChunks = null; + streams.clear(); + } + + /** + * Read the current stripe into memory. + * + * @throws IOException + */ + private void readStripe() throws IOException { + StripeInformation stripe = beginReadStripe(); + includedRowGroups = pickRowGroups(); + + // move forward to the first unskipped row + if (includedRowGroups != null) { + while (rowInStripe < rowCountInStripe && + !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) { + rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride); + } + } + + // if we haven't skipped the whole stripe, read the data + if (rowInStripe < rowCountInStripe) { + // if we aren't projecting columns or filtering rows, just read it all + if (included == null && includedRowGroups == null) { + readAllDataStreams(stripe); + } else { + readPartialDataStreams(stripe); + } + reader.startStripe(streams, stripeFooter); + // if we skipped the first row group, move the pointers forward + if (rowInStripe != 0) { + seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride)); + } + } + } + + private StripeInformation beginReadStripe() throws IOException { + StripeInformation stripe = stripes.get(currentStripe); + stripeFooter = readStripeFooter(stripe); + clearStreams(); + // setup the position in the stripe + rowCountInStripe = stripe.getNumberOfRows(); + rowInStripe = 0; + rowBaseInStripe = 0; + for (int i = 0; i < currentStripe; ++i) { + rowBaseInStripe += stripes.get(i).getNumberOfRows(); + } + // reset all of the indexes + for (int i = 0; i < indexes.length; ++i) { + indexes[i] = null; + } + return stripe; + } + + private void readAllDataStreams(StripeInformation stripe) throws IOException { + long start = stripe.getIndexLength(); + long end = start + stripe.getDataLength(); + // explicitly trigger 1 big read + DiskRangeList toRead = new DiskRangeList(start, end); + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); + List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList(); + createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); + } + + /** + * Plan the ranges of the file that we need to read given the list of + * columns and row groups. + * + * @param streamList the list of streams available + * @param indexes the indexes that have been loaded + * @param includedColumns which columns are needed + * @param includedRowGroups which row groups are needed + * @param isCompressed does the file have generic compression + * @param encodings the encodings for each column + * @param types the types of the columns + * @param compressionSize the compression block size + * @return the list of disk ranges that will be loaded + */ + static DiskRangeList planReadPartialDataStreams + (List<OrcProto.Stream> streamList, + OrcProto.RowIndex[] indexes, + boolean[] includedColumns, + boolean[] includedRowGroups, + boolean isCompressed, + List<OrcProto.ColumnEncoding> encodings, + List<OrcProto.Type> types, + int compressionSize, + boolean doMergeBuffers) { + long offset = 0; + // figure out which columns have a present stream + boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); + CreateHelper list = new CreateHelper(); + for (OrcProto.Stream stream : streamList) { + long length = stream.getLength(); + int column = stream.getColumn(); + OrcProto.Stream.Kind streamKind = stream.getKind(); + // since stream kind is optional, first check if it exists + if (stream.hasKind() && + (StreamName.getArea(streamKind) == StreamName.Area.DATA) && + (column < includedColumns.length && includedColumns[column])) { + // if we aren't filtering or it is a dictionary, load it. + if (includedRowGroups == null + || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) { + RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers); + } else { + RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups, + isCompressed, indexes[column], encodings.get(column), types.get(column), + compressionSize, hasNull[column], offset, length, list, doMergeBuffers); + } + } + offset += length; + } + return list.extract(); + } + + void createStreams(List<OrcProto.Stream> streamDescriptions, + DiskRangeList ranges, + boolean[] includeColumn, + CompressionCodec codec, + int bufferSize, + Map<StreamName, InStream> streams) throws IOException { + long streamOffset = 0; + for (OrcProto.Stream streamDesc : streamDescriptions) { + int column = streamDesc.getColumn(); + if ((includeColumn != null && + (column < included.length && !includeColumn[column])) || + streamDesc.hasKind() && + (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) { + streamOffset += streamDesc.getLength(); + continue; + } + List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers( + ranges, streamOffset, streamDesc.getLength()); + StreamName name = new StreamName(column, streamDesc.getKind()); + streams.put(name, InStream.create(name.toString(), buffers, + streamDesc.getLength(), codec, bufferSize)); + streamOffset += streamDesc.getLength(); + } + } + + private void readPartialDataStreams(StripeInformation stripe) throws IOException { + List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); + DiskRangeList toRead = planReadPartialDataStreams(streamList, + indexes, included, includedRowGroups, codec != null, + stripeFooter.getColumnsList(), types, bufferSize, true); + if (LOG.isDebugEnabled()) { + LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); + } + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); + if (LOG.isDebugEnabled()) { + LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks)); + } + + createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); + } + + /** + * Read the next stripe until we find a row that we don't skip. + * + * @throws IOException + */ + private void advanceStripe() throws IOException { + rowInStripe = rowCountInStripe; + while (rowInStripe >= rowCountInStripe && + currentStripe < stripes.size() - 1) { + currentStripe += 1; + readStripe(); + } + } + + /** + * Skip over rows that we aren't selecting, so that the next row is + * one that we will read. + * + * @param nextRow the row we want to go to + * @throws IOException + */ + private boolean advanceToNextRow( + TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe) + throws IOException { + long nextRowInStripe = nextRow - rowBaseInStripe; + // check for row skipping + if (rowIndexStride != 0 && + includedRowGroups != null && + nextRowInStripe < rowCountInStripe) { + int rowGroup = (int) (nextRowInStripe / rowIndexStride); + if (!includedRowGroups[rowGroup]) { + while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) { + rowGroup += 1; + } + if (rowGroup >= includedRowGroups.length) { + if (canAdvanceStripe) { + advanceStripe(); + } + return canAdvanceStripe; + } + nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride); + } + } + if (nextRowInStripe >= rowCountInStripe) { + if (canAdvanceStripe) { + advanceStripe(); + } + return canAdvanceStripe; + } + if (nextRowInStripe != rowInStripe) { + if (rowIndexStride != 0) { + int rowGroup = (int) (nextRowInStripe / rowIndexStride); + seekToRowEntry(reader, rowGroup); + reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride); + } else { + reader.skipRows(nextRowInStripe - rowInStripe); + } + rowInStripe = nextRowInStripe; + } + return true; + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + try { + if (rowInStripe >= rowCountInStripe) { + currentStripe += 1; + if (currentStripe >= stripes.size()) { + batch.size = 0; + return false; + } + readStripe(); + } + + int batchSize = computeBatchSize(batch.getMaxSize()); + + rowInStripe += batchSize; + reader.setVectorColumnCount(batch.getDataColumnCount()); + reader.nextBatch(batch, batchSize); + batch.selectedInUse = false; + batch.size = batchSize; + advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); + return batch.size != 0; + } catch (IOException e) { + // Rethrow exception with file name in log message + throw new IOException("Error reading file: " + path, e); + } + } + + private int computeBatchSize(long targetBatchSize) { + final int batchSize; + // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row + // groups are selected then marker position is set to the end of range (subset of row groups + // within strip). Batch size computed out of marker position makes sure that batch size is + // aware of row group boundary and will not cause overflow when reading rows + // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287 + if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) { + int startRowGroup = (int) (rowInStripe / rowIndexStride); + if (!includedRowGroups[startRowGroup]) { + while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) { + startRowGroup += 1; + } + } + + int endRowGroup = startRowGroup; + while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) { + endRowGroup += 1; + } + + final long markerPosition = + (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) + : rowCountInStripe; + batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe)); + + if (isLogDebugEnabled && batchSize < targetBatchSize) { + LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); + } + } else { + batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); + } + return batchSize; + } + + @Override + public void close() throws IOException { + clearStreams(); + dataReader.close(); + } + + @Override + public long getRowNumber() { + return rowInStripe + rowBaseInStripe + firstRow; + } + + /** + * Return the fraction of rows that have been read from the selected. + * section of the file + * + * @return fraction between 0.0 and 1.0 of rows consumed + */ + @Override + public float getProgress() { + return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; + } + + private int findStripe(long rowNumber) { + for (int i = 0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + if (stripe.getNumberOfRows() > rowNumber) { + return i; + } + rowNumber -= stripe.getNumberOfRows(); + } + throw new IllegalArgumentException("Seek after the end of reader range"); + } + + public OrcIndex readRowIndex(int stripeIndex, boolean[] included, + boolean[] sargColumns) throws IOException { + return readRowIndex(stripeIndex, included, null, null, sargColumns); + } + + public OrcIndex readRowIndex(int stripeIndex, boolean[] included, + OrcProto.RowIndex[] indexes, + OrcProto.BloomFilterIndex[] bloomFilterIndex, + boolean[] sargColumns) throws IOException { + StripeInformation stripe = stripes.get(stripeIndex); + OrcProto.StripeFooter stripeFooter = null; + // if this is the current stripe, use the cached objects. + if (stripeIndex == currentStripe) { + stripeFooter = this.stripeFooter; + indexes = indexes == null ? this.indexes : indexes; + bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex; + sargColumns = sargColumns == null ? + (sargApp == null ? null : sargApp.sargColumns) : sargColumns; + } + return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns, + bloomFilterIndex); + } + + private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry) + throws IOException { + PositionProvider[] index = new PositionProvider[indexes.length]; + for (int i = 0; i < indexes.length; ++i) { + if (indexes[i] != null) { + index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry)); + } + } + reader.seek(index); + } + + @Override + public void seekToRow(long rowNumber) throws IOException { + if (rowNumber < 0) { + throw new IllegalArgumentException("Seek to a negative row number " + + rowNumber); + } else if (rowNumber < firstRow) { + throw new IllegalArgumentException("Seek before reader range " + + rowNumber); + } + // convert to our internal form (rows from the beginning of slice) + rowNumber -= firstRow; + + // move to the right stripe + int rightStripe = findStripe(rowNumber); + if (rightStripe != currentStripe) { + currentStripe = rightStripe; + readStripe(); + } + readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns); + + // if we aren't to the right row yet, advance in the stripe. + advanceToNextRow(reader, rowNumber, true); + } + + private static final String TRANSLATED_SARG_SEPARATOR = "_"; + public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) { + return rootColumn + TRANSLATED_SARG_SEPARATOR + + ((indexInSourceTable == null) ? -1 : indexInSourceTable); + } + + public static int[] mapTranslatedSargColumns( + List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) { + int[] result = new int[sargLeaves.size()]; + OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now. + String lastRootStr = null; + for (int i = 0; i < result.length; ++i) { + String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR); + assert rootAndIndex.length == 2; + String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1]; + int index = Integer.parseInt(indexStr); + // First, check if the column even maps to anything. + if (index == -1) { + result[i] = -1; + continue; + } + assert index >= 0; + // Then, find the root type if needed. + if (!rootStr.equals(lastRootStr)) { + lastRoot = types.get(Integer.parseInt(rootStr)); + lastRootStr = rootStr; + } + // Subtypes of the root types correspond, in order, to the columns in the table schema + // (disregarding schema evolution that doesn't presently work). Get the index for the + // corresponding subtype. + result[i] = lastRoot.getSubtypes(index); + } + return result; + } +}
