Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Feb 2 22:28:15 2015 @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.common.typ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; @@ -60,6 +61,10 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream.Kind; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -112,13 +117,9 @@ public class RecordReaderImpl implements List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0); private final TreeReader reader; private final OrcProto.RowIndex[] indexes; - private final SearchArgument sarg; - // the leaf predicates for the sarg - private final List<PredicateLeaf> sargLeaves; + private final SargApplier sargApp; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; - // an array the same length as the sargLeaves that map them to column ids - private final int[] filterColumns; private final Configuration conf; private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool(); @@ -259,13 +260,12 @@ public class RecordReaderImpl implements this.bufferSize = bufferSize; this.included = options.getInclude(); this.conf = conf; - this.sarg = options.getSearchArgument(); - if (sarg != null) { - sargLeaves = sarg.getLeaves(); - filterColumns = mapSargColumns(sargLeaves, options.getColumnNames(), 0); + this.rowIndexStride = strideRate; + SearchArgument sarg = options.getSearchArgument(); + if (sarg != null && strideRate != 0) { + sargApp = new SargApplier(sarg, options.getColumnNames(), strideRate); } else { - sargLeaves = null; - filterColumns = null; + sargApp = null; } long rows = 0; long skippedRows = 0; @@ -297,7 +297,6 @@ public class RecordReaderImpl implements totalRowCount = rows; reader = createTreeReader(path, 0, types, included, conf); indexes = new OrcProto.RowIndex[types.size()]; - rowIndexStride = strideRate; advanceToNextRow(reader, 0L, true); } @@ -2583,6 +2582,66 @@ public class RecordReaderImpl implements } return statsObj; } + + public static class SargApplier { + private final SearchArgument sarg; + private final List<PredicateLeaf> sargLeaves; + private final int[] filterColumns; + private final long rowIndexStride; + + public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride) { + this.sarg = sarg; + sargLeaves = sarg.getLeaves(); + filterColumns = mapSargColumns(sargLeaves, columnNames, 0); + this.rowIndexStride = rowIndexStride; + } + + /** + * Pick the row groups that we need to load from the current stripe. + * @return an array with a boolean for each row group or null if all of the + * row groups must be read. + * @throws IOException + */ + public boolean[] pickRowGroups( + StripeInformation stripe, OrcProto.RowIndex[] indexes) throws IOException { + long rowsInStripe = stripe.getNumberOfRows(); + int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); + boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc? + TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; + for(int rowGroup=0; rowGroup < result.length; ++rowGroup) { + for(int pred=0; pred < leafValues.length; ++pred) { + if (filterColumns[pred] != -1) { + OrcProto.ColumnStatistics stats = + indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics(); + leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred)); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats = " + stats); + LOG.debug("Setting " + sargLeaves.get(pred) + " to " + + leafValues[pred]); + } + } else { + // the column is a virtual column + leafValues[pred] = TruthValue.YES_NO_NULL; + } + } + result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); + if (LOG.isDebugEnabled()) { + LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + + (rowIndexStride * (rowGroup+1) - 1) + " is " + + (result[rowGroup] ? "" : "not ") + "included."); + } + } + + // if we found something to skip, use the array. otherwise, return null. + for (boolean b: result) { + if (!b) { + return result; + } + } + return null; + } + + } /** * Pick the row groups that we need to load from the current stripe. @@ -2592,46 +2651,16 @@ public class RecordReaderImpl implements */ protected boolean[] pickRowGroups() throws IOException { // if we don't have a sarg or indexes, we read everything - if (sarg == null || rowIndexStride == 0) { + if (sargApp == null) { return null; } readRowIndex(currentStripe); - long rowsInStripe = stripes.get(currentStripe).getNumberOfRows(); - int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / - rowIndexStride); - boolean[] result = new boolean[groupsInStripe]; - TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; - for(int rowGroup=0; rowGroup < result.length; ++rowGroup) { - for(int pred=0; pred < leafValues.length; ++pred) { - if (filterColumns[pred] != -1) { - OrcProto.ColumnStatistics stats = - indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics(); - leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred)); - if (LOG.isDebugEnabled()) { - LOG.debug("Stats = " + stats); - LOG.debug("Setting " + sargLeaves.get(pred) + " to " + - leafValues[pred]); - } - } else { - // the column is a virtual column - leafValues[pred] = TruthValue.YES_NO_NULL; - } - } - result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); - if (LOG.isDebugEnabled()) { - LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + - (rowIndexStride * (rowGroup+1) - 1) + " is " + - (result[rowGroup] ? "" : "not ") + "included."); - } - } + return sargApp.pickRowGroups(stripes.get(currentStripe), indexes); + } - // if we found something to skip, use the array. otherwise, return null. - for(boolean b: result) { - if (!b) { - return result; - } - } - return null; + @Override + public List<ColumnEncoding> getCurrentColumnEncodings() throws IOException { + return stripeFooter.getColumnsList(); } private void clearStreams() throws IOException { @@ -2657,22 +2686,8 @@ public class RecordReaderImpl implements * @throws IOException */ private void readStripe() throws IOException { - StripeInformation stripe = stripes.get(currentStripe); - stripeFooter = readStripeFooter(stripe); - clearStreams(); - // setup the position in the stripe - rowCountInStripe = stripe.getNumberOfRows(); - rowInStripe = 0; - rowBaseInStripe = 0; - for(int i=0; i < currentStripe; ++i) { - rowBaseInStripe += stripes.get(i).getNumberOfRows(); - } - // reset all of the indexes - for(int i=0; i < indexes.length; ++i) { - indexes[i] = null; - } + StripeInformation stripe = beginReadStripe(); includedRowGroups = pickRowGroups(); - // move forward to the first unskipped row if (includedRowGroups != null) { while (rowInStripe < rowCountInStripe && @@ -2680,7 +2695,6 @@ public class RecordReaderImpl implements rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride); } } - // if we haven't skipped the whole stripe, read the data if (rowInStripe < rowCountInStripe) { // if we aren't projecting columns or filtering rows, just read it all @@ -2697,6 +2711,24 @@ public class RecordReaderImpl implements } } + private StripeInformation beginReadStripe() throws IOException { + StripeInformation stripe = stripes.get(currentStripe); + stripeFooter = readStripeFooter(stripe); + clearStreams(); + // setup the position in the stripe + rowCountInStripe = stripe.getNumberOfRows(); + rowInStripe = 0; + rowBaseInStripe = 0; + for(int i=0; i < currentStripe; ++i) { + rowBaseInStripe += stripes.get(i).getNumberOfRows(); + } + // reset all of the indexes + for(int i=0; i < indexes.length; ++i) { + indexes[i] = null; + } + return stripe; + } + private void readAllDataStreams(StripeInformation stripe ) throws IOException { long start = stripe.getIndexLength(); @@ -2756,7 +2788,7 @@ public class RecordReaderImpl implements } public static class CacheChunk extends DiskRange { - public final LlapMemoryBuffer buffer; + public LlapMemoryBuffer buffer; public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) { super(offset, end); @@ -2890,56 +2922,20 @@ public class RecordReaderImpl implements LinkedList<DiskRange> result = new LinkedList<DiskRange>(); long offset = 0; // figure out which columns have a present stream - boolean[] hasNull = new boolean[types.size()]; - for(OrcProto.Stream stream: streamList) { - if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) { - hasNull[stream.getColumn()] = true; - } - } - for(OrcProto.Stream stream: streamList) { + boolean[] hasNull = findPresentStreamsByColumn(streamList, types); + DiskRange lastRange = null; + for (OrcProto.Stream stream : streamList) { long length = stream.getLength(); int column = stream.getColumn(); OrcProto.Stream.Kind streamKind = stream.getKind(); - if (StreamName.getArea(streamKind) == StreamName.Area.DATA && - includedColumns[column]) { + if (StreamName.getArea(streamKind) == StreamName.Area.DATA && includedColumns[column]) { // if we aren't filtering or it is a dictionary, load it. - if (includedRowGroups == null || - isDictionary(streamKind, encodings.get(column))) { - result.add(new DiskRange(offset, offset + length)); - } else { - DiskRange lastRange = null; - for(int group=0; group < includedRowGroups.length; ++group) { - if (includedRowGroups[group]) { - int posn = getIndexPosition(encodings.get(column).getKind(), - types.get(column).getKind(), stream.getKind(), isCompressed, - hasNull[column]); - long start = indexes[column].getEntry(group).getPositions(posn); - final long nextGroupOffset; - if (group < includedRowGroups.length - 1) { - nextGroupOffset = indexes[column].getEntry(group + 1).getPositions(posn); - } else { - nextGroupOffset = length; - } - // figure out the worst case last location - - // if adjacent groups have the same compressed block offset then stretch the slop - // by factor of 2 to safely accommodate the next compression block. - // One for the current compression block and another for the next compression block. - final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + compressionSize) - : WORST_UNCOMPRESSED_SLOP; - long end = (group == includedRowGroups.length - 1) ? length : Math.min(length, - nextGroupOffset + slop); - start += offset; - end += offset; - if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) { - lastRange.offset = Math.min(lastRange.offset, start); - lastRange.end = Math.max(lastRange.end, end); - } else { - lastRange = new DiskRange(start, end); - result.add(lastRange); - } - } - } + if (includedRowGroups == null || isDictionary(streamKind, encodings.get(column))) { + lastRange = addEntireStreamToResult(offset, length, lastRange, result); + } else { + lastRange = addRgFilteredStreamToResult(stream, includedRowGroups, + isCompressed, indexes[column], encodings.get(column), types.get(column), + compressionSize, hasNull[column], offset, length, lastRange, result); } } offset += length; @@ -2947,6 +2943,68 @@ public class RecordReaderImpl implements return result; } + private static DiskRange addEntireStreamToResult(long offset, long length, + DiskRange lastRange, LinkedList<DiskRange> result) { + long end = offset + length; + if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset, end)) { + lastRange.offset = Math.min(lastRange.offset, offset); + lastRange.end = Math.max(lastRange.end, end); + } else { + lastRange = new DiskRange(offset, end); + result.add(lastRange); + } + return lastRange; + } + + private static boolean[] findPresentStreamsByColumn(List<OrcProto.Stream> streamList, + List<OrcProto.Type> types) { + boolean[] hasNull = new boolean[types.size()]; + for(OrcProto.Stream stream: streamList) { + if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) { + hasNull[stream.getColumn()] = true; + } + } + return hasNull; + } + + private static DiskRange addRgFilteredStreamToResult(OrcProto.Stream stream, + boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, + OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, + long offset, long length, DiskRange lastRange, LinkedList<DiskRange> result) { + for (int group = 0; group < includedRowGroups.length; ++group) { + if (!includedRowGroups[group]) continue; + // TODO#: this code is relevant + int posn = getIndexPosition( + encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); + long start = index.getEntry(group).getPositions(posn); + final long nextGroupOffset; + if (group < includedRowGroups.length - 1) { + nextGroupOffset = index.getEntry(group + 1).getPositions(posn); + } else { + nextGroupOffset = length; + } + // figure out the worst case last location + + // if adjacent groups have the same compressed block offset then stretch the slop + // by factor of 2 to safely accommodate the next compression block. + // One for the current compression block and another for the next compression block. + final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + compressionSize) + : WORST_UNCOMPRESSED_SLOP; + long end = (group == includedRowGroups.length - 1) ? length : Math.min(length, + nextGroupOffset + slop); + start += offset; + end += offset; + if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) { + lastRange.offset = Math.min(lastRange.offset, start); + lastRange.end = Math.max(lastRange.end, end); + } else { + lastRange = new DiskRange(start, end); + result.add(lastRange); + } + } + return lastRange; + } + /** * Update the disk ranges to collapse adjacent or overlapping ranges. It * assumes that the ranges are sorted. @@ -2985,15 +3043,20 @@ public class RecordReaderImpl implements while (rangeIter.hasNext()) { DiskRange range = rangeIter.next(); if (range.hasData()) continue; - rangeIter.remove(); - rangeIter.previous(); // TODO: is this valid on single-element list? int len = (int) (range.end - range.offset); long off = range.offset; file.seek(base + off); - if(zcr != null) { - while(len > 0) { + if (zcr != null) { + boolean hasReplaced = false; + while (len > 0) { ByteBuffer partial = zcr.readBuffer(len, false); - ranges.add(new BufferChunk(partial, off)); + BufferChunk bc = new BufferChunk(partial, off); + if (!hasReplaced) { + rangeIter.set(bc); + hasReplaced = true; + } else { + rangeIter.add(bc); + } int read = partial.remaining(); len -= read; off += read; @@ -3001,7 +3064,7 @@ public class RecordReaderImpl implements } else { byte[] buffer = new byte[len]; file.readFully(buffer, 0, buffer.length); - ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); + rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); } } } @@ -3047,52 +3110,57 @@ public class RecordReaderImpl implements Map<StreamName, InStream> streams, LowLevelCache cache) throws IOException { long streamOffset = 0; - for(OrcProto.Stream streamDesc: streamDescriptions) { + for (OrcProto.Stream streamDesc: streamDescriptions) { int column = streamDesc.getColumn(); if ((includeColumn != null && !includeColumn[column]) || StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) { streamOffset += streamDesc.getLength(); continue; } - long streamEnd = streamOffset + streamDesc.getLength(); - // TODO: This assumes sorted ranges (as do many other parts of ORC code. - ArrayList<DiskRange> buffers = new ArrayList<DiskRange>(); - boolean inRange = false; - for (int i = 0; i < ranges.size(); ++i) { - DiskRange range = ranges.get(i); - boolean isLast = range.end >= streamEnd; - if (!inRange) { - if (range.end >= streamOffset) continue; // Skip until we are in range. - inRange = true; - if (range.offset < streamOffset) { - // Partial first buffer, add a slice of it. - buffers.add(range.slice(Math.max(range.offset, streamOffset), - Math.min(streamEnd, range.end))); - if (isLast) break; // Partial first buffer is also partial last buffer. - continue; - } - } - if (range.end > streamEnd) { - // Partial last buffer (may also be the first buffer), add a slice of it. - buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end))); - break; - } - buffers.add(range); // Normal buffer. - } + List<DiskRange> buffers = getStreamBuffers(ranges, streamOffset, streamDesc); StreamName name = new StreamName(column, streamDesc.getKind()); streams.put(name, InStream.create(fileName, name.toString(), buffers, - streamEnd - streamOffset, codec, bufferSize, cache)); + streamDesc.getLength(), codec, bufferSize, cache)); streamOffset += streamDesc.getLength(); } } + private List<DiskRange> getStreamBuffers(List<DiskRange> ranges, + long streamOffset, OrcProto.Stream streamDesc) { + // This assumes sorted ranges (as do many other parts of ORC code. + ArrayList<DiskRange> buffers = new ArrayList<DiskRange>(); + long streamEnd = streamOffset + streamDesc.getLength(); + boolean inRange = false; + for (int i = 0; i < ranges.size(); ++i) { + DiskRange range = ranges.get(i); + boolean isLast = range.end >= streamEnd; + if (!inRange) { + if (range.end >= streamOffset) continue; // Skip until we are in range. + inRange = true; + if (range.offset < streamOffset) { + // Partial first buffer, add a slice of it. + buffers.add(range.slice(Math.max(range.offset, streamOffset), + Math.min(streamEnd, range.end))); + if (isLast) break; // Partial first buffer is also partial last buffer. + continue; + } + } + if (range.end > streamEnd) { + // Partial last buffer (may also be the first buffer), add a slice of it. + buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end))); + break; + } + buffers.add(range); // Normal buffer. + } + return buffers; + } + private LowLevelCache cache = null; public void setCache(LowLevelCache cache) { this.cache = cache; } - private void readPartialDataStreams(StripeInformation stripe - ) throws IOException { + private void readPartialDataStreams(StripeInformation stripe) throws IOException { List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); LinkedList<DiskRange> rangesToRead = planReadPartialDataStreams(streamList, @@ -3283,6 +3351,17 @@ public class RecordReaderImpl implements throw new IllegalArgumentException("Seek after the end of reader range"); } + @Override + public OrcProto.RowIndex[] getCurrentRowIndexEntries() throws IOException { + return readRowIndex(currentStripe); + } + + @Override + public void setRowIndex(OrcProto.RowIndex[] rowIndex) { + assert rowIndex.length == indexes.length; + System.arraycopy(rowIndex, 0, indexes, 0, rowIndex.length); + } + protected OrcProto.RowIndex[] readRowIndex(int stripeIndex) throws IOException { long offset = stripes.get(stripeIndex).getOffset(); OrcProto.StripeFooter stripeFooter; @@ -3342,16 +3421,178 @@ public class RecordReaderImpl implements } readRowIndex(currentStripe); - // if we aren't to the right row yet, advanance in the stripe. + // if we aren't to the right row yet, advance in the stripe. advanceToNextRow(reader, rowNumber, true); } @Override - public void readEncodedColumns(long[][] colRgs, int rgCount, - Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) { + public void prepareEncodedColumnRead() throws IOException { + assert currentStripe < 1 : "Reader is supposed to be per stripe"; + if (currentStripe == 0) return; + ++currentStripe; + beginReadStripe(); + } + + private static class ColumnReadContext { + public ColumnReadContext(long offset, int colIx, ColumnEncoding encoding, RowIndex rowIndex) { + this.encoding = encoding; + this.rowIndex = rowIndex; + this.colIx = colIx; + } + public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE; + final long[] streamOffsets = new long[MAX_STREAMS]; + final OrcProto.Stream[] streams = new OrcProto.Stream[MAX_STREAMS]; + List<DiskRange>[] buffers; + ListIterator<DiskRange>[] bufferIters; + StreamBuffer[] stripeLevelStreams; + final ColumnEncoding encoding; + final OrcProto.RowIndex rowIndex; + final int colIx; + int streamCount = 0; + + public void addStream(long offset, OrcProto.Stream stream) { + streams[streamCount] = stream; + streamOffsets[streamCount] = offset; + ++streamCount; + } + } + + @Override + public void readEncodedColumns(int stripeIx, boolean[][] colRgs, LowLevelCache cache, + Consumer<EncodedColumn<OrcBatchKey>> consumer) throws IOException { + // Note: for now we don't have to setError here, caller will setError if we throw. + // We are also not supposed to call setDone, since we are only part of the operation. + StripeInformation stripe = stripes.get(currentStripe); + // TODO## GET FROM METADATA? same for indexes, remove set... method + List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); + List<ColumnEncoding> encodings = stripeFooter.getColumnsList(); + LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>(); + long offset = 0; + // Figure out which columns have a present stream + boolean[] hasNull = findPresentStreamsByColumn(streamList, types); + DiskRange lastRange = null; + // We assume stream list is sorted by column and that non-data + // streams do not interleave data streams for the same column. + // With that in mind, determine disk ranges to read/get from cache (not by stream). + int colRgIx = -1, lastColIx = -1; + ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length]; + boolean[] includedRgs = null; + for (OrcProto.Stream stream : streamList) { + long length = stream.getLength(); + int colIx = stream.getColumn(); + OrcProto.Stream.Kind streamKind = stream.getKind(); + if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { + offset += length; + continue; + } + if (lastColIx != colIx) { + ++colRgIx; + lastColIx = colIx; + includedRgs = colRgs[colRgIx]; + colCtxs[colRgIx] = new ColumnReadContext( + offset, colIx, encodings.get(colIx), indexes[colIx]); + } + colCtxs[colRgIx].addStream(offset, stream); + if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) { + lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead); + } else { + lastRange = addRgFilteredStreamToResult(stream, includedRgs, + codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx), + bufferSize, hasNull[colIx], offset, length, lastRange, rangesToRead); + } + offset += length; + } + + // Now, read all of these from cache or disk. + if (LOG.isDebugEnabled()) { + LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead)); + } + mergeDiskRanges(rangesToRead); + if (this.cache != null) { + cache.getFileData(fileName, rangesToRead); + } + readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead); + + // Separate buffers for each stream from the data we have. + // TODO: given how we read, we could potentially get rid of this step? + for (ColumnReadContext colCtx : colCtxs) { + colCtx.buffers = new List[colCtx.streamCount]; + for (int i = 0; i < colCtx.streamCount; ++i) { + colCtx.buffers[i] = getStreamBuffers( + rangesToRead, colCtx.streamOffsets[i], colCtx.streams[i]); + colCtx.bufferIters[i] = colCtx.buffers[i].listIterator(); + } + } + + // Finally, decompress data, map per RG, and return to caller. + // We go by RG and not by column because that is how data is processed. + // TODO# We could build RG x all cols batches here cheaper and avoid building them on higher + // level (except for HL cache data that higher level would add). Esp. useful before we + // implement high-level cache. We could even alloc one return object and not per column! + int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride); + for (int rgIx = 0; rgIx < rgCount; ++rgIx) { + boolean isLastRg = rgIx == rgCount - 1; + OrcBatchKey bk = new OrcBatchKey(fileName, stripeIx, rgIx); + for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) { + if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // RG x col filtered. + ColumnReadContext ctx = colCtxs[colIxMod]; + EncodedColumn<OrcBatchKey> encodedColumn = new EncodedColumn<OrcBatchKey>( + bk, ctx.colIx, ctx.streamCount); + RowIndexEntry index = indexes[ctx.colIx].getEntry(rgIx); + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + OrcProto.Stream stream = ctx.streams[streamIx]; + if (isStripeLevelStream(stream.getKind(), ctx.encoding.getKind())) { + // This stream is for entire stripe and needed for every RG; uncompress once and reuse. + if (ctx.stripeLevelStreams == null) { + ctx.stripeLevelStreams = new StreamBuffer[ctx.streamCount]; + } + StreamBuffer cb = ctx.stripeLevelStreams[streamIx]; + if (cb == null) { + long streamOffset = ctx.streamOffsets[streamIx]; + cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer(0, -1); + // We will be using this for each RG while also sending RGs to processing. + // To avoid buffers being unlocked, run refcount one ahead; we will not increase + // it when building the last RG, so each RG processing will decref once, and the + // last one will unlock the buffers. Cheaper than locking the buffers 500 times. + cb.incRef(); + InStream.uncompressStream(fileName, ctx.bufferIters[streamIx], + codec, bufferSize, cache, -1, -1, cb); + ctx.buffers[streamIx] = null; + } + if (!isLastRg) { + cb.incRef(); + } + encodedColumn.streamData[streamIx] = cb; + } else { + // This stream can be separated by RG using index. Let's do that. + // TODO#: determine start offset, end offset from index; nexts can be end of stream. + long cOffset = 0, nextCOffset = 0, nextNextCOffset = 0; + int ucOffset = 0, nextUcOffset = 0; + StreamBuffer cb = new StreamBuffer(0, -1); + cb.incRef(); + cb.firstOffset = ucOffset; // We go by compression block, so this is always true. + long startCOffset = cOffset; + long endCOffset = (nextUcOffset == 0) ? nextCOffset : nextNextCOffset; + // TODO#: HERE + InStream.uncompressStream(fileName, + ctx.bufferIters[streamIx], codec, bufferSize, cache, startCOffset, endCOffset, cb); + } + } + consumer.consumeData(encodedColumn); + } + } + throw new UnsupportedOperationException("not implemented"); } + private boolean isStripeLevelStream(Kind kind, ColumnEncoding.Kind encoding) { + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA + || kind == OrcProto.Stream.Kind.DICTIONARY_COUNT + || (kind == OrcProto.Stream.Kind.LENGTH + && (encoding == ColumnEncoding.Kind.DICTIONARY + || encoding == ColumnEncoding.Kind.DICTIONARY_V2)); + } + /* Old prototype code to read stripes one column at a time, with limited output space. /** * Iterator-like context to read ORC as a sequence of column x stripe "cells".
Modified: hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original) +++ hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Mon Feb 2 22:28:15 2015 @@ -151,6 +151,12 @@ public final class ColumnProjectionUtils return result; } + public static String[] getReadColumnNames(Configuration conf) { + String names = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (names == null) return null; + return names.split(","); + } + private static void setReadColumnIDConf(Configuration conf, String id) { if (id.trim().isEmpty()) { conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);
