http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
new file mode 100644
index 0000000..1067957
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.orc.StripeInformation;
+
+/**
+ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
+ */
+public class RecordReaderUtils {
+  private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+
+  private static class DefaultDataReader implements DataReader {
+    private FSDataInputStream file = null;
+    private final ByteBufferAllocatorPool pool;
+    private HadoopShims.ZeroCopyReaderShim zcr = null;
+    private final FileSystem fs;
+    private final Path path;
+    private final boolean useZeroCopy;
+    private final CompressionCodec codec;
+    private final int bufferSize;
+    private final int typeCount;
+
+    private DefaultDataReader(DefaultDataReader other) {
+      this.pool = other.pool;
+      this.bufferSize = other.bufferSize;
+      this.typeCount = other.typeCount;
+      this.fs = other.fs;
+      this.path = other.path;
+      this.useZeroCopy = other.useZeroCopy;
+      this.codec = other.codec;
+    }
+
+    private DefaultDataReader(DataReaderProperties properties) {
+      this.fs = properties.getFileSystem();
+      this.path = properties.getPath();
+      this.useZeroCopy = properties.getZeroCopy();
+      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.bufferSize = properties.getBufferSize();
+      this.typeCount = properties.getTypeCount();
+      if (useZeroCopy) {
+        this.pool = new ByteBufferAllocatorPool();
+      } else {
+        this.pool = null;
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      this.file = fs.open(path);
+      if (useZeroCopy) {
+        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+      } else {
+        zcr = null;
+      }
+    }
+
+    @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      if (file == null) {
+        open();
+      }
+      if (footer == null) {
+        footer = readStripeFooter(stripe);
+      }
+      if (indexes == null) {
+        indexes = new OrcProto.RowIndex[typeCount];
+      }
+      if (bloomFilterIndices == null) {
+        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+      }
+      long offset = stripe.getOffset();
+      List<OrcProto.Stream> streams = footer.getStreamsList();
+      for (int i = 0; i < streams.size(); i++) {
+        OrcProto.Stream stream = streams.get(i);
+        OrcProto.Stream nextStream = null;
+        if (i < streams.size() - 1) {
+          nextStream = streams.get(i+1);
+        }
+        int col = stream.getColumn();
+        int len = (int) stream.getLength();
+        // row index stream and bloom filter are interlaced, check if the sarg 
column contains bloom
+        // filter and combine the io to read row index and bloom filters for 
that column together
+        if (stream.hasKind() && (stream.getKind() == 
OrcProto.Stream.Kind.ROW_INDEX)) {
+          boolean readBloomFilter = false;
+          if (sargColumns != null && sargColumns[col] &&
+              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+            len += nextStream.getLength();
+            i += 1;
+            readBloomFilter = true;
+          }
+          if ((included == null || included[col]) && indexes[col] == null) {
+            byte[] buffer = new byte[len];
+            file.readFully(offset, buffer, 0, buffer.length);
+            ByteBuffer bb = ByteBuffer.wrap(buffer);
+            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), 
stream.getLength(),
+                codec, bufferSize));
+            if (readBloomFilter) {
+              bb.position((int) stream.getLength());
+              bloomFilterIndices[col] = 
OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                  "bloom_filter", Lists.<DiskRange>newArrayList(new 
BufferChunk(bb, 0)),
+                  nextStream.getLength(), codec, bufferSize));
+            }
+          }
+        }
+        offset += len;
+      }
+
+      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+      return index;
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) 
throws IOException {
+      if (file == null) {
+        open();
+      }
+      long offset = stripe.getOffset() + stripe.getIndexLength() + 
stripe.getDataLength();
+      int tailLength = (int) stripe.getFooterLength();
+
+      // read the footer
+      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), 
tailLength);
+      return 
OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+          tailLength, codec, bufferSize));
+    }
+
+    @Override
+    public DiskRangeList readFileData(
+        DiskRangeList range, long baseOffset, boolean doForceDirect) throws 
IOException {
+      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, 
doForceDirect);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (pool != null) {
+        pool.clear();
+      }
+      // close both zcr and file
+      try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
+        if (file != null) {
+          file.close();
+        }
+      }
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return zcr != null;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer buffer) {
+      zcr.releaseBuffer(buffer);
+    }
+
+    @Override
+    public DataReader clone() {
+      return new DefaultDataReader(this);
+    }
+
+  }
+
+  public static DataReader createDefaultDataReader(DataReaderProperties 
properties) {
+    return new DefaultDataReader(properties);
+  }
+
+  public static boolean[] findPresentStreamsByColumn(
+      List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.hasKind() && (stream.getKind() == 
OrcProto.Stream.Kind.PRESENT)) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    return hasNull;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both 
sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  public static void addEntireStreamToRanges(
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
+  }
+
+  public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
+      boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex 
index,
+      OrcProto.ColumnEncoding encoding, OrcProto.Type type, int 
compressionSize, boolean hasNull,
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    for (int group = 0; group < includedRowGroups.length; ++group) {
+      if (!includedRowGroups[group]) continue;
+      int posn = getIndexPosition(
+          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, 
hasNull);
+      long start = index.getEntry(group).getPositions(posn);
+      final long nextGroupOffset;
+      boolean isLast = group == (includedRowGroups.length - 1);
+      nextGroupOffset = isLast ? length : index.getEntry(group + 
1).getPositions(posn);
+
+      start += offset;
+      long end = offset + estimateRgEndOffset(
+          isCompressed, isLast, nextGroupOffset, length, compressionSize);
+      list.addOrMerge(start, end, doMergeBuffers, true);
+    }
+  }
+
+  public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+      long nextGroupOffset, long streamLength, int bufferSize) {
+    // figure out the worst case last location
+    // if adjacent groups have the same compressed block offset then stretch 
the slop
+    // by factor of 2 to safely accommodate the next compression block.
+    // One for the current compression block and another for the next 
compression block.
+    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : 
WORST_UNCOMPRESSED_SLOP;
+    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + 
slop);
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 
1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 
1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param columnEncoding the encoding of the column
+   * @param columnType the type of the column
+   * @param streamType the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind 
columnEncoding,
+                              OrcProto.Type.Kind columnType,
+                              OrcProto.Stream.Kind streamType,
+                              boolean isCompressed,
+                              boolean hasNulls) {
+    if (streamType == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (columnType) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case DATE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (streamType == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + columnType);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  public static boolean isDictionary(OrcProto.Stream.Kind kind,
+                              OrcProto.ColumnEncoding encoding) {
+    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+      (kind == OrcProto.Stream.Kind.LENGTH &&
+       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param range ranges to stringify
+   * @return the resulting string
+   */
+  public static String stringifyDiskRanges(DiskRangeList range) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    boolean isFirst = true;
+    while (range != null) {
+      if (!isFirst) {
+        buffer.append(", {");
+      } else {
+        buffer.append("{");
+      }
+      isFirst = false;
+      buffer.append(range.toString());
+      buffer.append("}");
+      range = range.next;
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  /**
+   * Read the list of ranges from the file.
+   * @param file the file to read
+   * @param base the base of the stripe
+   * @param range the disk ranges within the stripe to read
+   * @return the bytes read for each disk range, which is the same length as
+   *    ranges
+   * @throws IOException
+   */
+  static DiskRangeList readDiskRanges(FSDataInputStream file,
+                                      HadoopShims.ZeroCopyReaderShim zcr,
+                                 long base,
+                                 DiskRangeList range,
+                                 boolean doForceDirect) throws IOException {
+    if (range == null) return null;
+    DiskRangeList prev = range.prev;
+    if (prev == null) {
+      prev = new MutateHelper(range);
+    }
+    while (range != null) {
+      if (range.hasData()) {
+        range = range.next;
+        continue;
+      }
+      int len = (int) (range.getEnd() - range.getOffset());
+      long off = range.getOffset();
+      if (zcr != null) {
+        file.seek(base + off);
+        boolean hasReplaced = false;
+        while (len > 0) {
+          ByteBuffer partial = zcr.readBuffer(len, false);
+          BufferChunk bc = new BufferChunk(partial, off);
+          if (!hasReplaced) {
+            range.replaceSelfWith(bc);
+            hasReplaced = true;
+          } else {
+            range.insertAfter(bc);
+          }
+          range = bc;
+          int read = partial.remaining();
+          len -= read;
+          off += read;
+        }
+      } else {
+        // Don't use HDFS ByteBuffer API because it has no readFully, and is 
buggy and pointless.
+        byte[] buffer = new byte[len];
+        file.readFully((base + off), buffer, 0, buffer.length);
+        ByteBuffer bb = null;
+        if (doForceDirect) {
+          bb = ByteBuffer.allocateDirect(len);
+          bb.put(buffer);
+          bb.position(0);
+          bb.limit(len);
+        } else {
+          bb = ByteBuffer.wrap(buffer);
+        }
+        range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+      }
+      range = range.next;
+    }
+    return prev.next;
+  }
+
+
+  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, 
long length) {
+    // This assumes sorted ranges (as do many other parts of ORC code.
+    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+    if (length == 0) return buffers;
+    long streamEnd = offset + length;
+    boolean inRange = false;
+    while (range != null) {
+      if (!inRange) {
+        if (range.getEnd() <= offset) {
+          range = range.next;
+          continue; // Skip until we are in range.
+        }
+        inRange = true;
+        if (range.getOffset() < offset) {
+          // Partial first buffer, add a slice of it.
+          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, 
range.getEnd()), -offset));
+          if (range.getEnd() >= streamEnd) break; // Partial first buffer is 
also partial last buffer.
+          range = range.next;
+          continue;
+        }
+      } else if (range.getOffset() >= streamEnd) {
+        break;
+      }
+      if (range.getEnd() > streamEnd) {
+        // Partial last buffer (may also be the first buffer), add a slice of 
it.
+        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, 
-offset));
+        break;
+      }
+      // Buffer that belongs entirely to one stream.
+      // TODO: ideally we would want to reuse the object and remove it from 
the list, but we cannot
+      //       because bufferChunks is also used by clearStreams for zcr. 
Create a useless dup.
+      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), 
-offset));
+      if (range.getEnd() == streamEnd) break;
+      range = range.next;
+    }
+    return buffers;
+  }
+
+  static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream 
file,
+      CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException 
{
+    if ((codec == null || ((codec instanceof DirectDecompressionCodec)
+            && ((DirectDecompressionCodec) codec).isAvailable()))) {
+      /* codec is null or is available */
+      return SHIMS.getZeroCopyReader(file, pool);
+    }
+    return null;
+  }
+
+  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
+  // which lacks a clear()/clean() operation
+  public final static class ByteBufferAllocatorPool implements 
HadoopShims.ByteBufferPoolShim {
+    private static final class Key implements Comparable<Key> {
+      private final int capacity;
+      private final long insertionGeneration;
+
+      Key(int capacity, long insertionGeneration) {
+        this.capacity = capacity;
+        this.insertionGeneration = insertionGeneration;
+      }
+
+      @Override
+      public int compareTo(Key other) {
+        return ComparisonChain.start().compare(capacity, other.capacity)
+            .compare(insertionGeneration, other.insertionGeneration).result();
+      }
+
+      @Override
+      public boolean equals(Object rhs) {
+        if (rhs == null) {
+          return false;
+        }
+        try {
+          Key o = (Key) rhs;
+          return (compareTo(o) == 0);
+        } catch (ClassCastException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return new 
HashCodeBuilder().append(capacity).append(insertionGeneration)
+            .toHashCode();
+      }
+    }
+
+    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, 
ByteBuffer>();
+
+    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, 
ByteBuffer>();
+
+    private long currentGeneration = 0;
+
+    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+      return direct ? directBuffers : buffers;
+    }
+
+    public void clear() {
+      buffers.clear();
+      directBuffers.clear();
+    }
+
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
+      if (entry == null) {
+        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
+            .allocate(length);
+      }
+      tree.remove(entry.getKey());
+      return entry.getValue();
+    }
+
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+      while (true) {
+        Key key = new Key(buffer.capacity(), currentGeneration++);
+        if (!tree.containsKey(key)) {
+          tree.put(key, buffer);
+          return;
+        }
+        // Buffers are indexed by (capacity, generation).
+        // If our key is not unique on the first try, we try again
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RedBlackTree.java 
b/java/core/src/java/org/apache/orc/impl/RedBlackTree.java
new file mode 100644
index 0000000..41aa4b9
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RedBlackTree.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.impl.DynamicIntArray;
+
+/**
+ * A memory efficient red-black tree that does not allocate any objects per
+ * an element. This class is abstract and assumes that the child class
+ * handles the key and comparisons with the key.
+ */
+abstract class RedBlackTree {
+  public static final int NULL = -1;
+
+  // Various values controlling the offset of the data within the array.
+  private static final int LEFT_OFFSET = 0;
+  private static final int RIGHT_OFFSET = 1;
+  private static final int ELEMENT_SIZE = 2;
+
+  protected int size = 0;
+  private final DynamicIntArray data;
+  protected int root = NULL;
+  protected int lastAdd = 0;
+  private boolean wasAdd = false;
+
+  /**
+   * Create a set with the given initial capacity.
+   */
+  public RedBlackTree(int initialCapacity) {
+    data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
+  }
+
+  /**
+   * Insert a new node into the data array, growing the array as necessary.
+   *
+   * @return Returns the position of the new node.
+   */
+  private int insert(int left, int right, boolean isRed) {
+    int position = size;
+    size += 1;
+    setLeft(position, left, isRed);
+    setRight(position, right);
+    return position;
+  }
+
+  /**
+   * Compare the value at the given position to the new value.
+   * @return 0 if the values are the same, -1 if the new value is smaller and
+   *         1 if the new value is larger.
+   */
+  protected abstract int compareValue(int position);
+
+  /**
+   * Is the given node red as opposed to black? To prevent having an extra word
+   * in the data array, we just the low bit on the left child index.
+   */
+  protected boolean isRed(int position) {
+    return position != NULL &&
+        (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
+  }
+
+  /**
+   * Set the red bit true or false.
+   */
+  private void setRed(int position, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    if (isRed) {
+      data.set(offset, data.get(offset) | 1);
+    } else {
+      data.set(offset, data.get(offset) & ~1);
+    }
+  }
+
+  /**
+   * Get the left field of the given position.
+   */
+  protected int getLeft(int position) {
+    return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
+  }
+
+  /**
+   * Get the right field of the given position.
+   */
+  protected int getRight(int position) {
+    return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left 
pointer.
+   */
+  private void setLeft(int position, int left) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (data.get(offset) & 1));
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left 
pointer.
+   */
+  private void setLeft(int position, int left, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (isRed ? 1 : 0));
+  }
+
+  /**
+   * Set the right field of the given position.
+   */
+  private void setRight(int position, int right) {
+    data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
+  }
+
+  /**
+   * Insert or find a given key in the tree and rebalance the tree correctly.
+   * Rebalancing restores the red-black aspect of the tree to maintain the
+   * invariants:
+   *   1. If a node is red, both of its children are black.
+   *   2. Each child of a node has the same black height (the number of black
+   *      nodes between it and the leaves of the tree).
+   *
+   * Inserted nodes are at the leaves and are red, therefore there is at most a
+   * violation of rule 1 at the node we just put in. Instead of always keeping
+   * the parents, this routine passing down the context.
+   *
+   * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
+   * left-right mirror images of each other). See Algorighms by Cormen,
+   * Leiserson, and Rivest for the explaination of the subcases.
+   *
+   * @param node The node that we are fixing right now.
+   * @param fromLeft Did we come down from the left?
+   * @param parent Nodes' parent
+   * @param grandparent Parent's parent
+   * @param greatGrandparent Grandparent's parent
+   * @return Does parent also need to be checked and/or fixed?
+   */
+  private boolean add(int node, boolean fromLeft, int parent,
+                      int grandparent, int greatGrandparent) {
+    if (node == NULL) {
+      if (root == NULL) {
+        lastAdd = insert(NULL, NULL, false);
+        root = lastAdd;
+        wasAdd = true;
+        return false;
+      } else {
+        lastAdd = insert(NULL, NULL, true);
+        node = lastAdd;
+        wasAdd = true;
+        // connect the new node into the tree
+        if (fromLeft) {
+          setLeft(parent, node);
+        } else {
+          setRight(parent, node);
+        }
+      }
+    } else {
+      int compare = compareValue(node);
+      boolean keepGoing;
+
+      // Recurse down to find where the node needs to be added
+      if (compare < 0) {
+        keepGoing = add(getLeft(node), true, node, parent, grandparent);
+      } else if (compare > 0) {
+        keepGoing = add(getRight(node), false, node, parent, grandparent);
+      } else {
+        lastAdd = node;
+        wasAdd = false;
+        return false;
+      }
+
+      // we don't need to fix the root (because it is always set to black)
+      if (node == root || !keepGoing) {
+        return false;
+      }
+    }
+
+
+    // Do we need to fix this node? Only if there are two reds right under each
+    // other.
+    if (isRed(node) && isRed(parent)) {
+      if (parent == getLeft(grandparent)) {
+        int uncle = getRight(grandparent);
+        if (isRed(uncle)) {
+          // case 1.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getRight(parent)) {
+            // case 1.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // left-rotate on node
+            setLeft(grandparent, parent);
+            setRight(node, getLeft(parent));
+            setLeft(parent, node);
+          }
+
+          // case 1.2 and 1.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+
+          // right-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getLeft(greatGrandparent) == grandparent) {
+            setLeft(greatGrandparent, parent);
+          } else {
+            setRight(greatGrandparent, parent);
+          }
+          setLeft(grandparent, getRight(parent));
+          setRight(parent, grandparent);
+          return false;
+        }
+      } else {
+        int uncle = getLeft(grandparent);
+        if (isRed(uncle)) {
+          // case 2.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getLeft(parent)) {
+            // case 2.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // right-rotate on node
+            setRight(grandparent, parent);
+            setLeft(node, getRight(parent));
+            setRight(parent, node);
+          }
+          // case 2.2 and 2.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+          // left-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getRight(greatGrandparent) == grandparent) {
+            setRight(greatGrandparent, parent);
+          } else {
+            setLeft(greatGrandparent, parent);
+          }
+          setRight(grandparent, getLeft(parent));
+          setLeft(parent, grandparent);
+          return false;
+        }
+      }
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Add the new key to the tree.
+   * @return true if the element is a new one.
+   */
+  protected boolean add() {
+    add(root, false, NULL, NULL, NULL);
+    if (wasAdd) {
+      setRed(root, false);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get the number of elements in the set.
+   */
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Reset the table to empty.
+   */
+  public void clear() {
+    root = NULL;
+    size = 0;
+    data.clear();
+  }
+
+  /**
+   * Get the buffer size in bytes.
+   */
+  public long getSizeInBytes() {
+    return data.getSizeInBytes();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java 
b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
new file mode 100644
index 0000000..24bd051
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * A reader that reads a sequence of bytes. A control byte is read before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
+ * byte is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteReader {
+  private InStream input;
+  private final byte[] literals =
+    new byte[RunLengthByteWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int used = 0;
+  private boolean repeat = false;
+
+  public RunLengthByteReader(InStream input) throws IOException {
+    this.input = input;
+  }
+
+  public void setInStream(InStream input) {
+    this.input = input;
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    used = 0;
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of buffer RLE byte from " + 
input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      repeat = true;
+      numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
+      int val = input.read();
+      if (val == -1) {
+        throw new EOFException("Reading RLE byte got EOF");
+      }
+      literals[0] = (byte) val;
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      int bytes = 0;
+      while (bytes < numLiterals) {
+        int result = input.read(literals, bytes, numLiterals - bytes);
+        if (result == -1) {
+          throw new EOFException("Reading RLE byte literal got EOF in " + 
this);
+        }
+        bytes += result;
+      }
+    }
+  }
+
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  public byte next() throws IOException {
+    byte result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0];
+    } else {
+      result = literals[used];
+    }
+    ++used;
+    return result;
+  }
+
+  public void nextVector(ColumnVector previous, long[] data, long size)
+      throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < size; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int types in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for 
isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && ((data[0] != data[i]) ||
+              (previous.isNull[0] != previous.isNull[i]))) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  /**
+   * Read the next size bytes into the data array, skipping over any slots
+   * where isNull is true.
+   * @param isNull if non-null, skip any rows where isNull[r] is true
+   * @param data the array to read into
+   * @param size the number of elements to read
+   * @throws IOException
+   */
+  public void nextVector(boolean[] isNull, int[] data,
+                         long size) throws IOException {
+    if (isNull == null) {
+      for(int i=0; i < size; ++i) {
+        data[i] = next();
+      }
+    } else {
+      for(int i=0; i < size; ++i) {
+        if (!isNull[i]) {
+          data[i] = next();
+        }
+      }
+    }
+  }
+
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void skip(long items) throws IOException {
+    while (items > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(items, numLiterals - used);
+      used += consume;
+      items -= consume;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+        used + "/" + numLiterals + " from " + input;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java 
b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
new file mode 100644
index 0000000..09108b2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written 
before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_LITERAL_SIZE = 128;
+  static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+
+  public RunLengthByteWriter(PositionedOutputStream output) {
+    this.output = output;
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write(literals, 0, 1);
+     } else {
+        output.write(-numLiterals);
+        output.write(literals, 0, numLiterals);
+      }
+      repeat = false;
+      tailRunLength = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  public void write(byte value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0]) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (value == literals[numLiterals - 1]) {
+        tailRunLength += 1;
+      } else {
+        tailRunLength = 1;
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          writeValues();
+          literals[0] = value;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java 
b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
new file mode 100644
index 0000000..b91a263
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * A reader that reads a sequence of integers.
+ * */
+public class RunLengthIntegerReader implements IntegerReader {
+  private InStream input;
+  private final boolean signed;
+  private final long[] literals =
+    new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int delta = 0;
+  private int used = 0;
+  private boolean repeat = false;
+  private SerializationUtils utils;
+
+  public RunLengthIntegerReader(InStream input, boolean signed) throws 
IOException {
+    this.input = input;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
+      used = 0;
+      repeat = true;
+      delta = input.read();
+      if (delta == -1) {
+        throw new EOFException("End of stream in RLE Integer from " + input);
+      }
+      // convert from 0 to 255 to -128 to 127 by converting to a signed byte
+      delta = (byte) (0 + delta);
+      if (signed) {
+        literals[0] = utils.readVslong(input);
+      } else {
+        literals[0] = utils.readVulong(input);
+      }
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      used = 0;
+      for(int i=0; i < numLiterals; ++i) {
+        if (signed) {
+          literals[i] = utils.readVslong(input);
+        } else {
+          literals[i] = utils.readVulong(input);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0] + (used++) * delta;
+    } else {
+      result = literals[used++];
+    }
+    return result;
+  }
+
+  @Override
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for 
isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) 
{
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git 
a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java 
b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..610d9b5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+public class RunLengthIntegerReaderV2 implements IntegerReader {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RunLengthIntegerReaderV2.class);
+
+  private InStream input;
+  private final boolean signed;
+  private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+  private boolean isRepeating = false;
+  private int numLiterals = 0;
+  private int used = 0;
+  private final boolean skipCorrupt;
+  private final SerializationUtils utils;
+  private RunLengthIntegerWriterV2.EncodingType currentEncoding;
+
+  public RunLengthIntegerReaderV2(InStream input, boolean signed,
+      boolean skipCorrupt) throws IOException {
+    this.input = input;
+    this.signed = signed;
+    this.skipCorrupt = skipCorrupt;
+    this.utils = new SerializationUtils();
+  }
+
+  private final static RunLengthIntegerWriterV2.EncodingType[] encodings = 
RunLengthIntegerWriterV2.EncodingType.values();
+  private void readValues(boolean ignoreEof) throws IOException {
+    // read the first 2 bits and determine the encoding type
+    isRepeating = false;
+    int firstByte = input.read();
+    if (firstByte < 0) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    }
+    currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+    switch (currentEncoding) {
+    case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+    case DIRECT: readDirectValues(firstByte); break;
+    case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+    case DELTA: readDeltaValues(firstByte); break;
+    default: throw new IOException("Unknown encoding " + currentEncoding);
+    }
+  }
+
+  private void readDeltaValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fb = (firstByte >>> 1) & 0x1f;
+    if (fb != 0) {
+      fb = utils.decodeBitWidth(fb);
+    }
+
+    // extract the blob run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+
+    // read the first value stored as vint
+    long firstVal = 0;
+    if (signed) {
+      firstVal = utils.readVslong(input);
+    } else {
+      firstVal = utils.readVulong(input);
+    }
+
+    // store first value to result buffer
+    long prevVal = firstVal;
+    literals[numLiterals++] = firstVal;
+
+    // if fixed bits is 0 then all values have fixed delta
+    if (fb == 0) {
+      // read the fixed delta value stored as vint (deltas can be negative even
+      // if all number are positive)
+      long fd = utils.readVslong(input);
+      if (fd == 0) {
+        isRepeating = true;
+        assert numLiterals == 1;
+        Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+        numLiterals += len;
+      } else {
+        // add fixed deltas to adjacent values
+        for(int i = 0; i < len; i++) {
+          literals[numLiterals++] = literals[numLiterals - 2] + fd;
+        }
+      }
+    } else {
+      long deltaBase = utils.readVslong(input);
+      // add delta base and first value
+      literals[numLiterals++] = firstVal + deltaBase;
+      prevVal = literals[numLiterals - 1];
+      len -= 1;
+
+      // write the unpacked values, add it to previous value and store final
+      // value to result buffer. if the delta base value is negative then it
+      // is a decreasing sequence else an increasing sequence
+      utils.readInts(literals, numLiterals, len, fb, input);
+      while (len > 0) {
+        if (deltaBase < 0) {
+          literals[numLiterals] = prevVal - literals[numLiterals];
+        } else {
+          literals[numLiterals] = prevVal + literals[numLiterals];
+        }
+        prevVal = literals[numLiterals];
+        len--;
+        numLiterals++;
+      }
+    }
+  }
+
+  private void readPatchedBaseValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = utils.decodeBitWidth(fbo);
+
+    // extract the run length of data blob
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are always one off
+    len += 1;
+
+    // extract the number of bytes occupied by base
+    int thirdByte = input.read();
+    int bw = (thirdByte >>> 5) & 0x07;
+    // base width is one off
+    bw += 1;
+
+    // extract patch width
+    int pwo = thirdByte & 0x1f;
+    int pw = utils.decodeBitWidth(pwo);
+
+    // read fourth byte and extract patch gap width
+    int fourthByte = input.read();
+    int pgw = (fourthByte >>> 5) & 0x07;
+    // patch gap width is one off
+    pgw += 1;
+
+    // extract the length of the patch list
+    int pl = fourthByte & 0x1f;
+
+    // read the next base width number of bytes to extract base value
+    long base = utils.bytesToLongBE(input, bw);
+    long mask = (1L << ((bw * 8) - 1));
+    // if MSB of base value is 1 then base is negative value else positive
+    if ((base & mask) != 0) {
+      base = base & ~mask;
+      base = -base;
+    }
+
+    // unpack the data blob
+    long[] unpacked = new long[len];
+    utils.readInts(unpacked, 0, len, fb, input);
+
+    // unpack the patch blob
+    long[] unpackedPatch = new long[pl];
+
+    if ((pw + pgw) > 64 && !skipCorrupt) {
+      throw new IOException("Corruption in ORC data encountered. To skip" +
+          " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" +
+          " true");
+    }
+    int bitSize = utils.getClosestFixedBits(pw + pgw);
+    utils.readInts(unpackedPatch, 0, pl, bitSize, input);
+
+    // apply the patch directly when decoding the packed data
+    int patchIdx = 0;
+    long currGap = 0;
+    long currPatch = 0;
+    long patchMask = ((1L << pw) - 1);
+    currGap = unpackedPatch[patchIdx] >>> pw;
+    currPatch = unpackedPatch[patchIdx] & patchMask;
+    long actualGap = 0;
+
+    // special case: gap is >255 then patch value will be 0.
+    // if gap is <=255 then patch value cannot be 0
+    while (currGap == 255 && currPatch == 0) {
+      actualGap += 255;
+      patchIdx++;
+      currGap = unpackedPatch[patchIdx] >>> pw;
+      currPatch = unpackedPatch[patchIdx] & patchMask;
+    }
+    // add the left over gap
+    actualGap += currGap;
+
+    // unpack data blob, patch it (if required), add base to get final result
+    for(int i = 0; i < unpacked.length; i++) {
+      if (i == actualGap) {
+        // extract the patch value
+        long patchedVal = unpacked[i] | (currPatch << fb);
+
+        // add base to patched value
+        literals[numLiterals++] = base + patchedVal;
+
+        // increment the patch to point to next entry in patch list
+        patchIdx++;
+
+        if (patchIdx < pl) {
+          // read the next gap and patch
+          currGap = unpackedPatch[patchIdx] >>> pw;
+          currPatch = unpackedPatch[patchIdx] & patchMask;
+          actualGap = 0;
+
+          // special case: gap is >255 then patch will be 0. if gap is
+          // <=255 then patch cannot be 0
+          while (currGap == 255 && currPatch == 0) {
+            actualGap += 255;
+            patchIdx++;
+            currGap = unpackedPatch[patchIdx] >>> pw;
+            currPatch = unpackedPatch[patchIdx] & patchMask;
+          }
+          // add the left over gap
+          actualGap += currGap;
+
+          // next gap is relative to the current gap
+          actualGap += i;
+        }
+      } else {
+        // no patching required. add base to unpacked value to get final value
+        literals[numLiterals++] = base + unpacked[i];
+      }
+    }
+
+  }
+
+  private void readDirectValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = utils.decodeBitWidth(fbo);
+
+    // extract the run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are one off
+    len += 1;
+
+    // write the unpacked values and zigzag decode to result buffer
+    utils.readInts(literals, numLiterals, len, fb, input);
+    if (signed) {
+      for(int i = 0; i < len; i++) {
+        literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
+        numLiterals++;
+      }
+    } else {
+      numLiterals += len;
+    }
+  }
+
+  private void readShortRepeatValues(int firstByte) throws IOException {
+
+    // read the number of bytes occupied by the value
+    int size = (firstByte >>> 3) & 0x07;
+    // #bytes are one off
+    size += 1;
+
+    // read the run length
+    int len = firstByte & 0x07;
+    // run lengths values are stored only after MIN_REPEAT value is met
+    len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+    // read the repeated value which is store using fixed bytes
+    long val = utils.bytesToLongBE(input, size);
+
+    if (signed) {
+      val = utils.zigzagDecode(val);
+    }
+
+    if (numLiterals != 0) {
+      // Currently this always holds, which makes peekNextAvailLength simpler.
+      // If this changes, peekNextAvailLength should be adjusted accordingly.
+      throw new AssertionError("readValues called with existing values 
present");
+    }
+    // repeat the value for length times
+    isRepeating = true;
+    // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if 
delta == 0
+    for(int i = 0; i < len; i++) {
+      literals[i] = val;
+    }
+    numLiterals = len;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      numLiterals = 0;
+      used = 0;
+      readValues(false);
+    }
+    result = literals[used++];
+    return result;
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two
+      // parts
+      while (consumed > 0) {
+        numLiterals = 0;
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        numLiterals = 0;
+        used = 0;
+        readValues(false);
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for 
isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && (data[0] != data[i] ||
+          previous.isNull[0] != previous.isNull[i])) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java 
b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
new file mode 100644
index 0000000..3e5f2e2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of integers. A control byte is 
written before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
+ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
+ * literal vint values follow.
+ */
+public class RunLengthIntegerWriter implements IntegerWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_DELTA = 127;
+  static final int MIN_DELTA = -128;
+  static final int MAX_LITERAL_SIZE = 128;
+  private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private final long[] literals = new long[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private long delta = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+  private SerializationUtils utils;
+
+  public RunLengthIntegerWriter(PositionedOutputStream output,
+                         boolean signed) {
+    this.output = output;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write((byte) delta);
+        if (signed) {
+          utils.writeVslong(output, literals[0]);
+        } else {
+          utils.writeVulong(output, literals[0]);
+        }
+      } else {
+        output.write(-numLiterals);
+        for(int i=0; i < numLiterals; ++i) {
+          if (signed) {
+            utils.writeVslong(output, literals[i]);
+          } else {
+            utils.writeVulong(output, literals[i]);
+          }
+        }
+      }
+      repeat = false;
+      numLiterals = 0;
+      tailRunLength = 0;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  @Override
+  public void write(long value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0] + delta * numLiterals) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (tailRunLength == 1) {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      } else if (value == literals[numLiterals - 1] + delta) {
+        tailRunLength += 1;
+      } else {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          long base = literals[numLiterals];
+          writeValues();
+          literals[0] = base;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+
+}

Reply via email to