Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 Mon Feb  2 22:28:15 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.common.typ
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
@@ -60,6 +61,10 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream.Kind;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -112,13 +117,9 @@ public class RecordReaderImpl implements
   List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
-  private final SearchArgument sarg;
-  // the leaf predicates for the sarg
-  private final List<PredicateLeaf> sargLeaves;
+  private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
-  // an array the same length as the sargLeaves that map them to column ids
-  private final int[] filterColumns;
   private final Configuration conf;
 
   private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
@@ -259,13 +260,12 @@ public class RecordReaderImpl implements
     this.bufferSize = bufferSize;
     this.included = options.getInclude();
     this.conf = conf;
-    this.sarg = options.getSearchArgument();
-    if (sarg != null) {
-      sargLeaves = sarg.getLeaves();
-      filterColumns = mapSargColumns(sargLeaves, options.getColumnNames(), 0);
+    this.rowIndexStride = strideRate;
+    SearchArgument sarg = options.getSearchArgument();
+    if (sarg != null && strideRate != 0) {
+      sargApp = new SargApplier(sarg, options.getColumnNames(), strideRate);
     } else {
-      sargLeaves = null;
-      filterColumns = null;
+      sargApp = null;
     }
     long rows = 0;
     long skippedRows = 0;
@@ -297,7 +297,6 @@ public class RecordReaderImpl implements
     totalRowCount = rows;
     reader = createTreeReader(path, 0, types, included, conf);
     indexes = new OrcProto.RowIndex[types.size()];
-    rowIndexStride = strideRate;
     advanceToNextRow(reader, 0L, true);
   }
 
@@ -2583,6 +2582,66 @@ public class RecordReaderImpl implements
     }
     return statsObj;
   }
+  
+  public static class SargApplier {
+    private final SearchArgument sarg;
+    private final List<PredicateLeaf> sargLeaves;
+    private final int[] filterColumns;
+    private final long rowIndexStride;
+
+    public SargApplier(SearchArgument sarg, String[] columnNames, long 
rowIndexStride) {
+      this.sarg = sarg;
+      sargLeaves = sarg.getLeaves();
+      filterColumns = mapSargColumns(sargLeaves, columnNames, 0);
+      this.rowIndexStride = rowIndexStride;
+    }
+
+    /**
+     * Pick the row groups that we need to load from the current stripe.
+     * @return an array with a boolean for each row group or null if all of the
+     *    row groups must be read.
+     * @throws IOException
+     */
+    public boolean[] pickRowGroups(
+        StripeInformation stripe, OrcProto.RowIndex[] indexes) throws 
IOException {
+      long rowsInStripe = stripe.getNumberOfRows();
+      int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / 
rowIndexStride);
+      boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
+      TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+      for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
+        for(int pred=0; pred < leafValues.length; ++pred) {
+          if (filterColumns[pred] != -1) {
+            OrcProto.ColumnStatistics stats =
+                
indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+            leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Stats = " + stats);
+              LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+                  leafValues[pred]);
+            }
+          } else {
+            // the column is a virtual column
+            leafValues[pred] = TruthValue.YES_NO_NULL;
+          }
+        }
+        result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+              (rowIndexStride * (rowGroup+1) - 1) + " is " +
+              (result[rowGroup] ? "" : "not ") + "included.");
+        }
+      }
+
+      // if we found something to skip, use the array. otherwise, return null.
+      for (boolean b: result) {
+        if (!b) {
+          return result;
+        }
+      }
+      return null;
+    }
+
+  }
 
   /**
    * Pick the row groups that we need to load from the current stripe.
@@ -2592,46 +2651,16 @@ public class RecordReaderImpl implements
    */
   protected boolean[] pickRowGroups() throws IOException {
     // if we don't have a sarg or indexes, we read everything
-    if (sarg == null || rowIndexStride == 0) {
+    if (sargApp == null) {
       return null;
     }
     readRowIndex(currentStripe);
-    long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
-    int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
-        rowIndexStride);
-    boolean[] result = new boolean[groupsInStripe];
-    TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
-    for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
-      for(int pred=0; pred < leafValues.length; ++pred) {
-        if (filterColumns[pred] != -1) {
-          OrcProto.ColumnStatistics stats =
-              indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
-          leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Stats = " + stats);
-            LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
-                leafValues[pred]);
-          }
-        } else {
-          // the column is a virtual column
-          leafValues[pred] = TruthValue.YES_NO_NULL;
-        }
-      }
-      result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
-            (rowIndexStride * (rowGroup+1) - 1) + " is " +
-            (result[rowGroup] ? "" : "not ") + "included.");
-      }
-    }
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes);
+  }
 
-    // if we found something to skip, use the array. otherwise, return null.
-    for(boolean b: result) {
-      if (!b) {
-        return result;
-      }
-    }
-    return null;
+  @Override
+  public List<ColumnEncoding> getCurrentColumnEncodings() throws IOException {
+    return stripeFooter.getColumnsList();
   }
 
   private void clearStreams() throws IOException {
@@ -2657,22 +2686,8 @@ public class RecordReaderImpl implements
    * @throws IOException
    */
   private void readStripe() throws IOException {
-    StripeInformation stripe = stripes.get(currentStripe);
-    stripeFooter = readStripeFooter(stripe);
-    clearStreams();
-    // setup the position in the stripe
-    rowCountInStripe = stripe.getNumberOfRows();
-    rowInStripe = 0;
-    rowBaseInStripe = 0;
-    for(int i=0; i < currentStripe; ++i) {
-      rowBaseInStripe += stripes.get(i).getNumberOfRows();
-    }
-    // reset all of the indexes
-    for(int i=0; i < indexes.length; ++i) {
-      indexes[i] = null;
-    }
+    StripeInformation stripe = beginReadStripe();
     includedRowGroups = pickRowGroups();
-
     // move forward to the first unskipped row
     if (includedRowGroups != null) {
       while (rowInStripe < rowCountInStripe &&
@@ -2680,7 +2695,6 @@ public class RecordReaderImpl implements
         rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
       }
     }
-
     // if we haven't skipped the whole stripe, read the data
     if (rowInStripe < rowCountInStripe) {
       // if we aren't projecting columns or filtering rows, just read it all
@@ -2697,6 +2711,24 @@ public class RecordReaderImpl implements
     }
   }
 
+  private StripeInformation beginReadStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    clearStreams();
+    // setup the position in the stripe
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
+    rowBaseInStripe = 0;
+    for(int i=0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    // reset all of the indexes
+    for(int i=0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+    return stripe;
+  }
+
   private void readAllDataStreams(StripeInformation stripe
                                   ) throws IOException {
     long start = stripe.getIndexLength();
@@ -2756,7 +2788,7 @@ public class RecordReaderImpl implements
   }
 
   public static class CacheChunk extends DiskRange {
-    public final LlapMemoryBuffer buffer;
+    public LlapMemoryBuffer buffer;
 
     public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
       super(offset, end);
@@ -2890,56 +2922,20 @@ public class RecordReaderImpl implements
     LinkedList<DiskRange> result = new LinkedList<DiskRange>();
     long offset = 0;
     // figure out which columns have a present stream
-    boolean[] hasNull = new boolean[types.size()];
-    for(OrcProto.Stream stream: streamList) {
-      if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
-        hasNull[stream.getColumn()] = true;
-      }
-    }
-    for(OrcProto.Stream stream: streamList) {
+    boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
+    DiskRange lastRange = null;
+    for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int column = stream.getColumn();
       OrcProto.Stream.Kind streamKind = stream.getKind();
-      if (StreamName.getArea(streamKind) == StreamName.Area.DATA &&
-          includedColumns[column]) {
+      if (StreamName.getArea(streamKind) == StreamName.Area.DATA && 
includedColumns[column]) {
         // if we aren't filtering or it is a dictionary, load it.
-        if (includedRowGroups == null ||
-            isDictionary(streamKind, encodings.get(column))) {
-          result.add(new DiskRange(offset, offset + length));
-        } else {
-          DiskRange lastRange = null;
-          for(int group=0; group < includedRowGroups.length; ++group) {
-            if (includedRowGroups[group]) {
-              int posn = getIndexPosition(encodings.get(column).getKind(),
-                  types.get(column).getKind(), stream.getKind(), isCompressed,
-                  hasNull[column]);
-              long start = indexes[column].getEntry(group).getPositions(posn);
-              final long nextGroupOffset;
-              if (group < includedRowGroups.length - 1) {
-                nextGroupOffset = indexes[column].getEntry(group + 
1).getPositions(posn);
-              } else {
-                nextGroupOffset = length;
-              }
-              // figure out the worst case last location
-
-              // if adjacent groups have the same compressed block offset then 
stretch the slop
-              // by factor of 2 to safely accommodate the next compression 
block.
-              // One for the current compression block and another for the 
next compression block.
-              final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + 
compressionSize)
-                  : WORST_UNCOMPRESSED_SLOP;
-              long end = (group == includedRowGroups.length - 1) ? length : 
Math.min(length,
-                  nextGroupOffset + slop);
-              start += offset;
-              end += offset;
-              if (lastRange != null && overlap(lastRange.offset, 
lastRange.end, start, end)) {
-                lastRange.offset = Math.min(lastRange.offset, start);
-                lastRange.end = Math.max(lastRange.end, end);
-              } else {
-                lastRange = new DiskRange(start, end);
-                result.add(lastRange);
-              }
-            }
-          }
+        if (includedRowGroups == null || isDictionary(streamKind, 
encodings.get(column))) {
+          lastRange = addEntireStreamToResult(offset, length, lastRange, 
result);
+        } else {
+          lastRange = addRgFilteredStreamToResult(stream, includedRowGroups,
+              isCompressed, indexes[column], encodings.get(column), 
types.get(column),
+              compressionSize, hasNull[column], offset, length, lastRange, 
result);
         }
       }
       offset += length;
@@ -2947,6 +2943,68 @@ public class RecordReaderImpl implements
     return result;
   }
 
+  private static DiskRange addEntireStreamToResult(long offset, long length,
+      DiskRange lastRange, LinkedList<DiskRange> result) {
+    long end = offset + length;
+    if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset, 
end)) {
+      lastRange.offset = Math.min(lastRange.offset, offset);
+      lastRange.end = Math.max(lastRange.end, end);
+    } else {
+      lastRange = new DiskRange(offset, end);
+      result.add(lastRange);
+    }
+    return lastRange;
+  }
+
+  private static boolean[] findPresentStreamsByColumn(List<OrcProto.Stream> 
streamList,
+      List<OrcProto.Type> types) {
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    return hasNull;
+  }
+
+  private static DiskRange addRgFilteredStreamToResult(OrcProto.Stream stream,
+      boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex 
index,
+      OrcProto.ColumnEncoding encoding, OrcProto.Type type, int 
compressionSize, boolean hasNull,
+      long offset, long length, DiskRange lastRange, LinkedList<DiskRange> 
result) {
+    for (int group = 0; group < includedRowGroups.length; ++group) {
+      if (!includedRowGroups[group]) continue;
+      // TODO#: this code is relevant
+      int posn = getIndexPosition(
+          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, 
hasNull);
+      long start = index.getEntry(group).getPositions(posn);
+      final long nextGroupOffset;
+      if (group < includedRowGroups.length - 1) {
+        nextGroupOffset = index.getEntry(group + 1).getPositions(posn);
+      } else {
+        nextGroupOffset = length;
+      }
+      // figure out the worst case last location
+
+      // if adjacent groups have the same compressed block offset then stretch 
the slop
+      // by factor of 2 to safely accommodate the next compression block.
+      // One for the current compression block and another for the next 
compression block.
+      final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + 
compressionSize)
+          : WORST_UNCOMPRESSED_SLOP;
+      long end = (group == includedRowGroups.length - 1) ? length : 
Math.min(length,
+          nextGroupOffset + slop);
+      start += offset;
+      end += offset;
+      if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, 
end)) {
+        lastRange.offset = Math.min(lastRange.offset, start);
+        lastRange.end = Math.max(lastRange.end, end);
+      } else {
+        lastRange = new DiskRange(start, end);
+        result.add(lastRange);
+      }
+    }
+    return lastRange;
+  }
+
   /**
    * Update the disk ranges to collapse adjacent or overlapping ranges. It
    * assumes that the ranges are sorted.
@@ -2985,15 +3043,20 @@ public class RecordReaderImpl implements
     while (rangeIter.hasNext()) {
       DiskRange range = rangeIter.next();
       if (range.hasData()) continue;
-      rangeIter.remove();
-      rangeIter.previous(); // TODO: is this valid on single-element list?
       int len = (int) (range.end - range.offset);
       long off = range.offset;
       file.seek(base + off);
-      if(zcr != null) {
-        while(len > 0) {
+      if (zcr != null) {
+        boolean hasReplaced = false;
+        while (len > 0) {
           ByteBuffer partial = zcr.readBuffer(len, false);
-          ranges.add(new BufferChunk(partial, off));
+          BufferChunk bc = new BufferChunk(partial, off);
+          if (!hasReplaced) {
+            rangeIter.set(bc);
+            hasReplaced = true;
+          } else {
+            rangeIter.add(bc);
+          }
           int read = partial.remaining();
           len -= read;
           off += read;
@@ -3001,7 +3064,7 @@ public class RecordReaderImpl implements
       } else {
         byte[] buffer = new byte[len];
         file.readFully(buffer, 0, buffer.length);
-        ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+        rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
       }
     }
   }
@@ -3047,52 +3110,57 @@ public class RecordReaderImpl implements
                             Map<StreamName, InStream> streams,
                             LowLevelCache cache) throws IOException {
     long streamOffset = 0;
-    for(OrcProto.Stream streamDesc: streamDescriptions) {
+    for (OrcProto.Stream streamDesc: streamDescriptions) {
       int column = streamDesc.getColumn();
       if ((includeColumn != null && !includeColumn[column]) ||
           StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) {
         streamOffset += streamDesc.getLength();
         continue;
       }
-      long streamEnd = streamOffset + streamDesc.getLength();
-      // TODO: This assumes sorted ranges (as do many other parts of ORC code.
-      ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
-      boolean inRange = false;
-      for (int i = 0; i < ranges.size(); ++i) {
-        DiskRange range = ranges.get(i);
-        boolean isLast = range.end >= streamEnd;
-        if (!inRange) {
-          if (range.end >= streamOffset) continue; // Skip until we are in 
range.
-          inRange = true;
-          if (range.offset < streamOffset) {
-            // Partial first buffer, add a slice of it.
-            buffers.add(range.slice(Math.max(range.offset, streamOffset),
-                Math.min(streamEnd, range.end)));
-            if (isLast) break; // Partial first buffer is also partial last 
buffer.
-            continue;
-          }
-        }
-        if (range.end > streamEnd) {
-          // Partial last buffer (may also be the first buffer), add a slice 
of it.
-          buffers.add(range.slice(range.offset, Math.min(streamEnd, 
range.end)));
-          break;
-        }
-        buffers.add(range); // Normal buffer.
-      }
+      List<DiskRange> buffers = getStreamBuffers(ranges, streamOffset, 
streamDesc);
       StreamName name = new StreamName(column, streamDesc.getKind());
       streams.put(name, InStream.create(fileName, name.toString(), buffers,
-          streamEnd - streamOffset, codec, bufferSize, cache));
+          streamDesc.getLength(), codec, bufferSize, cache));
       streamOffset += streamDesc.getLength();
     }
   }
 
+  private List<DiskRange> getStreamBuffers(List<DiskRange> ranges,
+      long streamOffset, OrcProto.Stream streamDesc) {
+    // This assumes sorted ranges (as do many other parts of ORC code.
+    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+    long streamEnd = streamOffset + streamDesc.getLength();
+    boolean inRange = false;
+    for (int i = 0; i < ranges.size(); ++i) {
+      DiskRange range = ranges.get(i);
+      boolean isLast = range.end >= streamEnd;
+      if (!inRange) {
+        if (range.end >= streamOffset) continue; // Skip until we are in range.
+        inRange = true;
+        if (range.offset < streamOffset) {
+          // Partial first buffer, add a slice of it.
+          buffers.add(range.slice(Math.max(range.offset, streamOffset),
+              Math.min(streamEnd, range.end)));
+          if (isLast) break; // Partial first buffer is also partial last 
buffer.
+          continue;
+        }
+      }
+      if (range.end > streamEnd) {
+        // Partial last buffer (may also be the first buffer), add a slice of 
it.
+        buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end)));
+        break;
+      }
+      buffers.add(range); // Normal buffer.
+    }
+    return buffers;
+  }
+
   private LowLevelCache cache = null;
   public void setCache(LowLevelCache cache) {
     this.cache = cache;
   }
 
-  private void readPartialDataStreams(StripeInformation stripe
-                                      ) throws IOException {
+  private void readPartialDataStreams(StripeInformation stripe) throws 
IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
     LinkedList<DiskRange> rangesToRead =
         planReadPartialDataStreams(streamList,
@@ -3283,6 +3351,17 @@ public class RecordReaderImpl implements
     throw new IllegalArgumentException("Seek after the end of reader range");
   }
 
+  @Override
+  public OrcProto.RowIndex[] getCurrentRowIndexEntries() throws IOException {
+    return readRowIndex(currentStripe);
+  }
+
+  @Override
+  public void setRowIndex(OrcProto.RowIndex[] rowIndex) {
+    assert rowIndex.length == indexes.length;
+    System.arraycopy(rowIndex, 0, indexes, 0, rowIndex.length);
+  }
+
   protected OrcProto.RowIndex[] readRowIndex(int stripeIndex) throws 
IOException {
     long offset = stripes.get(stripeIndex).getOffset();
     OrcProto.StripeFooter stripeFooter;
@@ -3342,16 +3421,178 @@ public class RecordReaderImpl implements
     }
     readRowIndex(currentStripe);
 
-    // if we aren't to the right row yet, advanance in the stripe.
+    // if we aren't to the right row yet, advance in the stripe.
     advanceToNextRow(reader, rowNumber, true);
   }
 
   @Override
-  public void readEncodedColumns(long[][] colRgs, int rgCount,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) {
+  public void prepareEncodedColumnRead() throws IOException {
+    assert currentStripe < 1 : "Reader is supposed to be per stripe";
+    if (currentStripe == 0) return;
+    ++currentStripe;
+    beginReadStripe();
+  }
+
+  private static class ColumnReadContext {
+    public ColumnReadContext(long offset, int colIx, ColumnEncoding encoding, 
RowIndex rowIndex) {
+      this.encoding = encoding;
+      this.rowIndex = rowIndex;
+      this.colIx = colIx;
+    }
+    public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
+    final long[] streamOffsets = new long[MAX_STREAMS];
+    final OrcProto.Stream[] streams = new OrcProto.Stream[MAX_STREAMS];
+    List<DiskRange>[] buffers;
+    ListIterator<DiskRange>[] bufferIters;
+    StreamBuffer[] stripeLevelStreams;
+    final ColumnEncoding encoding;
+    final OrcProto.RowIndex rowIndex;
+    final int colIx;
+    int streamCount = 0;
+
+    public void addStream(long offset, OrcProto.Stream stream) {
+      streams[streamCount] = stream;
+      streamOffsets[streamCount] = offset;
+      ++streamCount;
+    }
+  }
+
+  @Override
+  public void readEncodedColumns(int stripeIx, boolean[][] colRgs, 
LowLevelCache cache,
+      Consumer<EncodedColumn<OrcBatchKey>> consumer) throws IOException {
+    // Note: for now we don't have to setError here, caller will setError if 
we throw.
+    // We are also not supposed to call setDone, since we are only part of the 
operation.
+    StripeInformation stripe = stripes.get(currentStripe);
+    // TODO## GET FROM METADATA? same for indexes, remove set... method
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    List<ColumnEncoding> encodings = stripeFooter.getColumnsList();
+    LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
+    long offset = 0;
+    // Figure out which columns have a present stream
+    boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
+    DiskRange lastRange = null;
+    // We assume stream list is sorted by column and that non-data
+    // streams do not interleave data streams for the same column.
+    // With that in mind, determine disk ranges to read/get from cache (not by 
stream).
+    int colRgIx = -1, lastColIx = -1;
+    ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
+    boolean[] includedRgs = null;
+    for (OrcProto.Stream stream : streamList) {
+      long length = stream.getLength();
+      int colIx = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      if (!included[colIx] || StreamName.getArea(streamKind) != 
StreamName.Area.DATA) {
+        offset += length;
+        continue;
+      }
+      if (lastColIx != colIx) {
+        ++colRgIx;
+        lastColIx = colIx;
+        includedRgs = colRgs[colRgIx];
+        colCtxs[colRgIx] = new ColumnReadContext(
+            offset, colIx, encodings.get(colIx), indexes[colIx]);
+      }
+      colCtxs[colRgIx].addStream(offset, stream);
+      if (includedRgs == null || isDictionary(streamKind, 
encodings.get(colIx))) {
+        lastRange = addEntireStreamToResult(offset, length, lastRange, 
rangesToRead);
+      } else {
+        lastRange = addRgFilteredStreamToResult(stream, includedRgs,
+            codec != null, indexes[colIx], encodings.get(colIx), 
types.get(colIx),
+            bufferSize, hasNull[colIx], offset, length, lastRange, 
rangesToRead);
+      }
+      offset += length;
+    }
+
+    // Now, read all of these from cache or disk.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
+    }
+    mergeDiskRanges(rangesToRead);
+    if (this.cache != null) {
+      cache.getFileData(fileName, rangesToRead);
+    }
+    readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
+
+    // Separate buffers for each stream from the data we have.
+    // TODO: given how we read, we could potentially get rid of this step?
+    for (ColumnReadContext colCtx : colCtxs) {
+      colCtx.buffers = new List[colCtx.streamCount];
+      for (int i = 0; i < colCtx.streamCount; ++i) {
+        colCtx.buffers[i] = getStreamBuffers(
+            rangesToRead, colCtx.streamOffsets[i], colCtx.streams[i]);
+        colCtx.bufferIters[i] = colCtx.buffers[i].listIterator();
+      }
+    }
+
+    // Finally, decompress data, map per RG, and return to caller.
+    // We go by RG and not by column because that is how data is processed.
+    // TODO# We could build RG x all cols batches here cheaper and avoid 
building them on higher
+    //       level (except for HL cache data that higher level would add). 
Esp. useful before we
+    //       implement high-level cache. We could even alloc one return object 
and not per column!
+    int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride);
+    for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
+      boolean isLastRg = rgIx == rgCount - 1;
+      OrcBatchKey bk = new OrcBatchKey(fileName, stripeIx, rgIx);
+      for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
+        if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // 
RG x col filtered.
+        ColumnReadContext ctx = colCtxs[colIxMod];
+        EncodedColumn<OrcBatchKey> encodedColumn = new 
EncodedColumn<OrcBatchKey>(
+            bk, ctx.colIx, ctx.streamCount);
+        RowIndexEntry index = indexes[ctx.colIx].getEntry(rgIx);
+        for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+          OrcProto.Stream stream = ctx.streams[streamIx];
+          if (isStripeLevelStream(stream.getKind(), ctx.encoding.getKind())) {
+            // This stream is for entire stripe and needed for every RG; 
uncompress once and reuse.
+            if (ctx.stripeLevelStreams == null) {
+              ctx.stripeLevelStreams = new StreamBuffer[ctx.streamCount];
+            }
+            StreamBuffer cb = ctx.stripeLevelStreams[streamIx];
+            if (cb == null) {
+              long streamOffset = ctx.streamOffsets[streamIx];
+              cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer(0, -1);
+              // We will be using this for each RG while also sending RGs to 
processing.
+              // To avoid buffers being unlocked, run refcount one ahead; we 
will not increase
+              // it when building the last RG, so each RG processing will 
decref once, and the
+              // last one will unlock the buffers. Cheaper than locking the 
buffers 500 times.
+              cb.incRef();
+              InStream.uncompressStream(fileName, ctx.bufferIters[streamIx],
+                  codec, bufferSize, cache, -1, -1, cb);
+              ctx.buffers[streamIx] = null;
+            }
+            if (!isLastRg) {
+              cb.incRef();
+            }
+            encodedColumn.streamData[streamIx] = cb;
+          } else {
+            // This stream can be separated by RG using index. Let's do that.
+            // TODO#: determine start offset, end offset from index; nexts can 
be end of stream.
+            long cOffset = 0, nextCOffset = 0, nextNextCOffset = 0;
+            int ucOffset = 0, nextUcOffset = 0;
+            StreamBuffer cb = new StreamBuffer(0, -1);
+            cb.incRef();
+            cb.firstOffset = ucOffset; // We go by compression block, so this 
is always true.
+            long startCOffset = cOffset;
+            long endCOffset = (nextUcOffset == 0) ? nextCOffset : 
nextNextCOffset;
+            // TODO#: HERE
+            InStream.uncompressStream(fileName,
+                ctx.bufferIters[streamIx], codec, bufferSize, cache, 
startCOffset, endCOffset, cb);
+          }
+        }
+        consumer.consumeData(encodedColumn);
+      }
+    }
+
     throw new UnsupportedOperationException("not implemented");
   }
 
+  private boolean isStripeLevelStream(Kind kind, ColumnEncoding.Kind encoding) 
{
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA
+        || kind == OrcProto.Stream.Kind.DICTIONARY_COUNT
+        || (kind == OrcProto.Stream.Kind.LENGTH 
+        && (encoding == ColumnEncoding.Kind.DICTIONARY
+        || encoding == ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
   /* Old prototype code to read stripes one column at a time, with limited 
output space.
   /**
    * Iterator-like context to read ORC as a sequence of column x stripe 
"cells".

Modified: 
hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- 
hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
 (original)
+++ 
hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
 Mon Feb  2 22:28:15 2015
@@ -151,6 +151,12 @@ public final class ColumnProjectionUtils
     return result;
   }
 
+  public static String[] getReadColumnNames(Configuration conf) {
+    String names = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (names == null) return null;
+    return names.split(",");
+  }
+
   private static void setReadColumnIDConf(Configuration conf, String id) {
     if (id.trim().isEmpty()) {
       conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);


Reply via email to