http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..7237b2e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
@@ -0,0 +1,832 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ * <p>
+ * There are four types of lightweight integer compression
+ * <ul>
+ * <li>SHORT_REPEAT</li>
+ * <li>DIRECT</li>
+ * <li>PATCHED_BASE</li>
+ * <li>DELTA</li>
+ * </ul>
+ * </p>
+ * The description and format for these types are as below:
+ * <p>
+ * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
+ * <ul>
+ * <li>1 byte header
+ * <ul>
+ * <li>2 bits for encoding type</li>
+ * <li>3 bits for bytes required for repeating value</li>
+ * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
+ * </ul>
+ * </li>
+ * <li>Blob - repeat value (fixed bytes)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DIRECT:</b> Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ * <ul>
+ * <li>4 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * <ul>
+ * 3rd byte
+ * <li>3 bits for bytes required to encode base value</li>
+ * <li>5 bits for patch width</li>
+ * </ul>
+ * <ul>
+ * 4th byte
+ * <li>3 bits for patch gap width</li>
+ * <li>5 bits for patch length</li>
+ * </ul>
+ * </li>
+ * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.</li>
+ * <li>Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.</li>
+ * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Base value - encoded as varint</li>
+ * <li>Delta base - encoded as varint</li>
+ * <li>Delta blob - only positive values. monotonicity and orderness are 
decided
+ * based on the sign of the base value and delta base</li>
+ * </ul>
+ * </p>
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+  public enum EncodingType {
+    SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+  }
+
+  static final int MAX_SCOPE = 512;
+  static final int MIN_REPEAT = 3;
+  private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+  private long prevDelta = 0;
+  private int fixedRunLength = 0;
+  private int variableRunLength = 0;
+  private final long[] literals = new long[MAX_SCOPE];
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private EncodingType encoding;
+  private int numLiterals;
+  private final long[] zigzagLiterals = new long[MAX_SCOPE];
+  private final long[] baseRedLiterals = new long[MAX_SCOPE];
+  private final long[] adjDeltas = new long[MAX_SCOPE];
+  private long fixedDelta;
+  private int zzBits90p;
+  private int zzBits100p;
+  private int brBits95p;
+  private int brBits100p;
+  private int bitsDeltaMax;
+  private int patchWidth;
+  private int patchGapWidth;
+  private int patchLength;
+  private long[] gapVsPatchList;
+  private long min;
+  private boolean isFixedDelta;
+  private SerializationUtils utils;
+  private boolean alignedBitpacking;
+
+  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+    this(output, signed, true);
+  }
+
+  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+      boolean alignedBitpacking) {
+    this.output = output;
+    this.signed = signed;
+    this.alignedBitpacking = alignedBitpacking;
+    this.utils = new SerializationUtils();
+    clear();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+
+      if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+        writeShortRepeatValues();
+      } else if (encoding.equals(EncodingType.DIRECT)) {
+        writeDirectValues();
+      } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+        writePatchedBaseValues();
+      } else {
+        writeDeltaValues();
+      }
+
+      // clear all the variables
+      clear();
+    }
+  }
+
+  private void writeDeltaValues() throws IOException {
+    int len = 0;
+    int fb = bitsDeltaMax;
+    int efb = 0;
+
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
+    if (isFixedDelta) {
+      // if fixed run length is greater than threshold then it will be fixed
+      // delta sequence with delta value 0 else fixed delta sequence with
+      // non-zero delta value
+      if (fixedRunLength > MIN_REPEAT) {
+        // ex. sequence: 2 2 2 2 2 2 2 2
+        len = fixedRunLength - 1;
+        fixedRunLength = 0;
+      } else {
+        // ex. sequence: 4 6 8 10 12 14 16
+        len = variableRunLength - 1;
+        variableRunLength = 0;
+      }
+    } else {
+      // fixed width 0 is used for long repeating values.
+      // sequences that require only 1 bit to encode will have an additional 
bit
+      if (fb == 1) {
+        fb = 2;
+      }
+      efb = utils.encodeBitWidth(fb);
+      efb = efb << 1;
+      len = variableRunLength - 1;
+      variableRunLength = 0;
+    }
+
+    // extract the 9th bit of run length
+    final int tailBits = (len & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = len & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // store the first value from zigzag literal array
+    if (signed) {
+      utils.writeVslong(output, literals[0]);
+    } else {
+      utils.writeVulong(output, literals[0]);
+    }
+
+    if (isFixedDelta) {
+      // if delta is fixed then we don't need to store delta blob
+      utils.writeVslong(output, fixedDelta);
+    } else {
+      // store the first value as delta value using zigzag encoding
+      utils.writeVslong(output, adjDeltas[0]);
+
+      // adjacent delta values are bit packed. The length of adjDeltas array is
+      // always one less than the number of literals (delta difference for n
+      // elements is n-1). We have already written one element, write the
+      // remaining numLiterals - 2 elements here
+      utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
+    }
+  }
+
+  private void writePatchedBaseValues() throws IOException {
+
+    // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+    // because patch is applied to MSB bits. For example: If fixed bit width of
+    // base value is 7 bits and if patch is 3 bits, the actual value is
+    // constructed by shifting the patch to left by 7 positions.
+    // actual_value = patch << 7 | base_value
+    // So, if we align base_value then actual_value can not be reconstructed.
+
+    // write the number of fixed bits required in next 5 bits
+    final int fb = brBits95p;
+    final int efb = utils.encodeBitWidth(fb) << 1;
+
+    // adjust variable run length, they are one off
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = variableRunLength & 0xff;
+
+    // if the min value is negative toggle the sign
+    final boolean isNegative = min < 0 ? true : false;
+    if (isNegative) {
+      min = -min;
+    }
+
+    // find the number of bytes required for base and shift it by 5 bits
+    // to accommodate patch width. The additional bit is used to store the sign
+    // of the base value.
+    final int baseWidth = utils.findClosestNumBits(min) + 1;
+    final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) 
+ 1;
+    final int bb = (baseBytes - 1) << 5;
+
+    // if the base value is negative then set MSB to 1
+    if (isNegative) {
+      min |= (1L << ((baseBytes * 8) - 1));
+    }
+
+    // third byte contains 3 bits for number of bytes occupied by base
+    // and 5 bits for patchWidth
+    final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
+
+    // fourth byte contains 3 bits for page gap width and 5 bits for
+    // patch length
+    final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+    output.write(headerThirdByte);
+    output.write(headerFourthByte);
+
+    // write the base value using fixed bytes in big endian order
+    for(int i = baseBytes - 1; i >= 0; i--) {
+      byte b = (byte) ((min >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    // base reduced literals are bit packed
+    int closestFixedBits = utils.getClosestFixedBits(fb);
+
+    utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
+        output);
+
+    // write patch list
+    closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+    utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+        output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  /**
+   * Store the opcode in 2 MSB bits
+   * @return opcode
+   */
+  private int getOpcode() {
+    return encoding.ordinal() << 6;
+  }
+
+  private void writeDirectValues() throws IOException {
+
+    // write the number of fixed bits required in next 5 bits
+    int fb = zzBits100p;
+
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
+    final int efb = utils.encodeBitWidth(fb) << 1;
+
+    // adjust variable run length
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = variableRunLength & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // bit packing the zigzag encoded literals
+    utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  private void writeShortRepeatValues() throws IOException {
+    // get the value that is repeating, compute the bits and bytes required
+    long repeatVal = 0;
+    if (signed) {
+      repeatVal = utils.zigzagEncode(literals[0]);
+    } else {
+      repeatVal = literals[0];
+    }
+
+    final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+    final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal 
>>> 3
+        : (numBitsRepeatVal >>> 3) + 1;
+
+    // write encoding type in top 2 bits
+    int header = getOpcode();
+
+    // write the number of bytes required for the value
+    header |= ((numBytesRepeatVal - 1) << 3);
+
+    // write the run length
+    fixedRunLength -= MIN_REPEAT;
+    header |= fixedRunLength;
+
+    // write the header
+    output.write(header);
+
+    // write the repeating value in big endian byte order
+    for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+      int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    fixedRunLength = 0;
+  }
+
+  private void determineEncoding() {
+
+    // we need to compute zigzag values for DIRECT encoding if we decide to
+    // break early for delta overflows or for shorter runs
+    computeZigZagLiterals();
+
+    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+
+    // not a big win for shorter runs to determine encoding
+    if (numLiterals <= MIN_REPEAT) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+
+    // DELTA encoding check
+
+    // for identifying monotonic sequences
+    boolean isIncreasing = true;
+    boolean isDecreasing = true;
+    this.isFixedDelta = true;
+
+    this.min = literals[0];
+    long max = literals[0];
+    final long initialDelta = literals[1] - literals[0];
+    long currDelta = initialDelta;
+    long deltaMax = initialDelta;
+    this.adjDeltas[0] = initialDelta;
+
+    for (int i = 1; i < numLiterals; i++) {
+      final long l1 = literals[i];
+      final long l0 = literals[i - 1];
+      currDelta = l1 - l0;
+      min = Math.min(min, l1);
+      max = Math.max(max, l1);
+
+      isIncreasing &= (l0 <= l1);
+      isDecreasing &= (l0 >= l1);
+
+      isFixedDelta &= (currDelta == initialDelta);
+      if (i > 1) {
+        adjDeltas[i - 1] = Math.abs(currDelta);
+        deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+      }
+    }
+
+    // its faster to exit under delta overflow condition without checking for
+    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+    // overhead than PATCHED_BASE
+    if (!utils.isSafeSubtract(max, min)) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+
+    // invariant - subtracting any number from any other in the literals after
+    // this point won't overflow
+
+    // if initialDelta is 0 then we cannot delta encode as we cannot identify
+    // the sign of deltas (increasing or decreasing)
+    if (initialDelta != 0) {
+
+      // if min is equal to max then the delta is 0, this condition happens for
+      // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+      if (min == max) {
+        assert isFixedDelta : min + "==" + max +
+            ", isFixedDelta cannot be false";
+        assert currDelta == 0 : min + "==" + max + ", currDelta should be 
zero";
+        fixedDelta = 0;
+        encoding = EncodingType.DELTA;
+        return;
+      }
+
+      if (isFixedDelta) {
+        assert currDelta == initialDelta
+            : "currDelta should be equal to initialDelta for fixed delta 
encoding";
+        encoding = EncodingType.DELTA;
+        fixedDelta = currDelta;
+        return;
+      }
+
+      // stores the number of bits required for packing delta blob in
+      // delta encoding
+      bitsDeltaMax = utils.findClosestNumBits(deltaMax);
+
+      // monotonic condition
+      if (isIncreasing || isDecreasing) {
+        encoding = EncodingType.DELTA;
+        return;
+      }
+    }
+
+    // PATCHED_BASE encoding check
+
+    // percentile values are computed for the zigzag encoded values. if the
+    // number of bit requirement between 90th and 100th percentile varies
+    // beyond a threshold then we need to patch the values. if the variation
+    // is not significant then we can use direct encoding
+
+    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
+    int diffBitsLH = zzBits100p - zzBits90p;
+
+    // if the difference between 90th percentile and 100th percentile fixed
+    // bits is > 1 then we need patch the values
+    if (diffBitsLH > 1) {
+
+      // patching is done only on base reduced values.
+      // remove base from literals
+      for (int i = 0; i < numLiterals; i++) {
+        baseRedLiterals[i] = literals[i] - min;
+      }
+
+      // 95th percentile width is used to determine max allowed value
+      // after which patching will be done
+      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
+
+      // 100th percentile is used to compute the max patch width
+      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
+
+      // after base reducing the values, if the difference in bits between
+      // 95th percentile and 100th percentile value is zero then there
+      // is no point in patching the values, in which case we will
+      // fallback to DIRECT encoding.
+      // The decision to use patched base was based on zigzag values, but the
+      // actual patching is done on base reduced literals.
+      if ((brBits100p - brBits95p) != 0) {
+        encoding = EncodingType.PATCHED_BASE;
+        preparePatchedBlob();
+        return;
+      } else {
+        encoding = EncodingType.DIRECT;
+        return;
+      }
+    } else {
+      // if difference in bits between 95th percentile and 100th percentile is
+      // 0, then patch length will become 0. Hence we will fallback to direct
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+  }
+
+  private void computeZigZagLiterals() {
+    // populate zigzag encoded literals
+    long zzEncVal = 0;
+    for (int i = 0; i < numLiterals; i++) {
+      if (signed) {
+        zzEncVal = utils.zigzagEncode(literals[i]);
+      } else {
+        zzEncVal = literals[i];
+      }
+      zigzagLiterals[i] = zzEncVal;
+    }
+  }
+
+  private void preparePatchedBlob() {
+    // mask will be max value beyond which patch will be generated
+    long mask = (1L << brBits95p) - 1;
+
+    // since we are considering only 95 percentile, the size of gap and
+    // patch array can contain only be 5% values
+    patchLength = (int) Math.ceil((numLiterals * 0.05));
+
+    int[] gapList = new int[patchLength];
+    long[] patchList = new long[patchLength];
+
+    // #bit for patch
+    patchWidth = brBits100p - brBits95p;
+    patchWidth = utils.getClosestFixedBits(patchWidth);
+
+    // if patch bit requirement is 64 then it will not possible to pack
+    // gap and patch together in a long. To make sure gap and patch can be
+    // packed together adjust the patch width
+    if (patchWidth == 64) {
+      patchWidth = 56;
+      brBits95p = 8;
+      mask = (1L << brBits95p) - 1;
+    }
+
+    int gapIdx = 0;
+    int patchIdx = 0;
+    int prev = 0;
+    int gap = 0;
+    int maxGap = 0;
+
+    for(int i = 0; i < numLiterals; i++) {
+      // if value is above mask then create the patch and record the gap
+      if (baseRedLiterals[i] > mask) {
+        gap = i - prev;
+        if (gap > maxGap) {
+          maxGap = gap;
+        }
+
+        // gaps are relative, so store the previous patched value index
+        prev = i;
+        gapList[gapIdx++] = gap;
+
+        // extract the most significant bits that are over mask bits
+        long patch = baseRedLiterals[i] >>> brBits95p;
+        patchList[patchIdx++] = patch;
+
+        // strip off the MSB to enable safe bit packing
+        baseRedLiterals[i] &= mask;
+      }
+    }
+
+    // adjust the patch length to number of entries in gap list
+    patchLength = gapIdx;
+
+    // if the element to be patched is the first and only element then
+    // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+    if (maxGap == 0 && patchLength != 0) {
+      patchGapWidth = 1;
+    } else {
+      patchGapWidth = utils.findClosestNumBits(maxGap);
+    }
+
+    // special case: if the patch gap width is greater than 256, then
+    // we need 9 bits to encode the gap width. But we only have 3 bits in
+    // header to record the gap width. To deal with this case, we will save
+    // two entries in patch list in the following way
+    // 256 gap width => 0 for patch value
+    // actual gap - 256 => actual patch value
+    // We will do the same for gap width = 511. If the element to be patched is
+    // the last element in the scope then gap width will be 511. In this case 
we
+    // will have 3 entries in the patch list in the following way
+    // 255 gap width => 0 for patch value
+    // 255 gap width => 0 for patch value
+    // 1 gap width => actual patch value
+    if (patchGapWidth > 8) {
+      patchGapWidth = 8;
+      // for gap = 511, we need two additional entries in patch list
+      if (maxGap == 511) {
+        patchLength += 2;
+      } else {
+        patchLength += 1;
+      }
+    }
+
+    // create gap vs patch list
+    gapIdx = 0;
+    patchIdx = 0;
+    gapVsPatchList = new long[patchLength];
+    for(int i = 0; i < patchLength; i++) {
+      long g = gapList[gapIdx++];
+      long p = patchList[patchIdx++];
+      while (g > 255) {
+        gapVsPatchList[i++] = (255L << patchWidth);
+        g -= 255;
+      }
+
+      // store patch value in LSBs and gap in MSBs
+      gapVsPatchList[i] = (g << patchWidth) | p;
+    }
+  }
+
+  /**
+   * clears all the variables
+   */
+  private void clear() {
+    numLiterals = 0;
+    encoding = null;
+    prevDelta = 0;
+    fixedDelta = 0;
+    zzBits90p = 0;
+    zzBits100p = 0;
+    brBits95p = 0;
+    brBits100p = 0;
+    bitsDeltaMax = 0;
+    patchGapWidth = 0;
+    patchLength = 0;
+    patchWidth = 0;
+    gapVsPatchList = null;
+    min = 0;
+    isFixedDelta = true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (numLiterals != 0) {
+      if (variableRunLength != 0) {
+        determineEncoding();
+        writeValues();
+      } else if (fixedRunLength != 0) {
+        if (fixedRunLength < MIN_REPEAT) {
+          variableRunLength = fixedRunLength;
+          fixedRunLength = 0;
+          determineEncoding();
+          writeValues();
+        } else if (fixedRunLength >= MIN_REPEAT
+            && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+          encoding = EncodingType.SHORT_REPEAT;
+          writeValues();
+        } else {
+          encoding = EncodingType.DELTA;
+          isFixedDelta = true;
+          writeValues();
+        }
+      }
+    }
+    output.flush();
+  }
+
+  @Override
+  public void write(long val) throws IOException {
+    if (numLiterals == 0) {
+      initializeLiterals(val);
+    } else {
+      if (numLiterals == 1) {
+        prevDelta = val - literals[0];
+        literals[numLiterals++] = val;
+        // if both values are same count as fixed run else variable run
+        if (val == literals[0]) {
+          fixedRunLength = 2;
+          variableRunLength = 0;
+        } else {
+          fixedRunLength = 0;
+          variableRunLength = 2;
+        }
+      } else {
+        long currentDelta = val - literals[numLiterals - 1];
+        if (prevDelta == 0 && currentDelta == 0) {
+          // fixed delta run
+
+          literals[numLiterals++] = val;
+
+          // if variable run is non-zero then we are seeing repeating
+          // values at the end of variable run in which case keep
+          // updating variable and fixed runs
+          if (variableRunLength > 0) {
+            fixedRunLength = 2;
+          }
+          fixedRunLength += 1;
+
+          // if fixed run met the minimum condition and if variable
+          // run is non-zero then flush the variable run and shift the
+          // tail fixed runs to start of the buffer
+          if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+            numLiterals -= MIN_REPEAT;
+            variableRunLength -= MIN_REPEAT - 1;
+            // copy the tail fixed runs
+            long[] tailVals = new long[MIN_REPEAT];
+            System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+            // determine variable encoding and flush values
+            determineEncoding();
+            writeValues();
+
+            // shift tail fixed runs to beginning of the buffer
+            for(long l : tailVals) {
+              literals[numLiterals++] = l;
+            }
+          }
+
+          // if fixed runs reached max repeat length then write values
+          if (fixedRunLength == MAX_SCOPE) {
+            determineEncoding();
+            writeValues();
+          }
+        } else {
+          // variable delta run
+
+          // if fixed run length is non-zero and if it satisfies the
+          // short repeat conditions then write the values as short repeats
+          // else use delta encoding
+          if (fixedRunLength >= MIN_REPEAT) {
+            if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+              encoding = EncodingType.SHORT_REPEAT;
+              writeValues();
+            } else {
+              encoding = EncodingType.DELTA;
+              isFixedDelta = true;
+              writeValues();
+            }
+          }
+
+          // if fixed run length is <MIN_REPEAT and current value is
+          // different from previous then treat it as variable run
+          if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
+            if (val != literals[numLiterals - 1]) {
+              variableRunLength = fixedRunLength;
+              fixedRunLength = 0;
+            }
+          }
+
+          // after writing values re-initialize the variables
+          if (numLiterals == 0) {
+            initializeLiterals(val);
+          } else {
+            // keep updating variable run lengths
+            prevDelta = val - literals[numLiterals - 1];
+            literals[numLiterals++] = val;
+            variableRunLength += 1;
+
+            // if variable run length reach the max scope, write it
+            if (variableRunLength == MAX_SCOPE) {
+              determineEncoding();
+              writeValues();
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void initializeLiterals(long val) {
+    literals[numLiterals++] = val;
+    fixedRunLength = 1;
+    variableRunLength = 1;
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java
new file mode 100644
index 0000000..53687b7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigInteger;
+
+final class SerializationUtils {
+
+  private final static int BUFFER_SIZE = 64;
+  private final byte[] readBuffer;
+  private final byte[] writeBuffer;
+
+  public SerializationUtils() {
+    this.readBuffer = new byte[BUFFER_SIZE];
+    this.writeBuffer = new byte[BUFFER_SIZE];
+  }
+
+  void writeVulong(OutputStream output, long value) throws IOException {
+    while (true) {
+      if ((value & ~0x7f) == 0) {
+        output.write((byte) value);
+        return;
+      } else {
+        output.write((byte) (0x80 | (value & 0x7f)));
+        value >>>= 7;
+      }
+    }
+  }
+
+  void writeVslong(OutputStream output, long value) throws IOException {
+    writeVulong(output, (value << 1) ^ (value >> 63));
+  }
+
+
+  long readVulong(InputStream in) throws IOException {
+    long result = 0;
+    long b;
+    int offset = 0;
+    do {
+      b = in.read();
+      if (b == -1) {
+        throw new EOFException("Reading Vulong past EOF");
+      }
+      result |= (0x7f & b) << offset;
+      offset += 7;
+    } while (b >= 0x80);
+    return result;
+  }
+
+  long readVslong(InputStream in) throws IOException {
+    long result = readVulong(in);
+    return (result >>> 1) ^ -(result & 1);
+  }
+
+  float readFloat(InputStream in) throws IOException {
+    int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
+      (in.read() << 24);
+    return Float.intBitsToFloat(ser);
+  }
+
+  void writeFloat(OutputStream output, float value) throws IOException {
+    int ser = Float.floatToIntBits(value);
+    output.write(ser & 0xff);
+    output.write((ser >> 8) & 0xff);
+    output.write((ser >> 16) & 0xff);
+    output.write((ser >> 24) & 0xff);
+  }
+
+  double readDouble(InputStream in) throws IOException {
+    return Double.longBitsToDouble(readLongLE(in));
+  }
+
+  long readLongLE(InputStream in) throws IOException {
+    in.read(readBuffer, 0, 8);
+    return (((readBuffer[0] & 0xff) << 0)
+        + ((readBuffer[1] & 0xff) << 8)
+        + ((readBuffer[2] & 0xff) << 16)
+        + ((long) (readBuffer[3] & 0xff) << 24)
+        + ((long) (readBuffer[4] & 0xff) << 32)
+        + ((long) (readBuffer[5] & 0xff) << 40)
+        + ((long) (readBuffer[6] & 0xff) << 48)
+        + ((long) (readBuffer[7] & 0xff) << 56));
+  }
+
+  void writeDouble(OutputStream output, double value) throws IOException {
+    writeLongLE(output, Double.doubleToLongBits(value));
+  }
+
+  private void writeLongLE(OutputStream output, long value) throws IOException 
{
+    writeBuffer[0] = (byte) ((value >> 0)  & 0xff);
+    writeBuffer[1] = (byte) ((value >> 8)  & 0xff);
+    writeBuffer[2] = (byte) ((value >> 16) & 0xff);
+    writeBuffer[3] = (byte) ((value >> 24) & 0xff);
+    writeBuffer[4] = (byte) ((value >> 32) & 0xff);
+    writeBuffer[5] = (byte) ((value >> 40) & 0xff);
+    writeBuffer[6] = (byte) ((value >> 48) & 0xff);
+    writeBuffer[7] = (byte) ((value >> 56) & 0xff);
+    output.write(writeBuffer, 0, 8);
+  }
+
+  /**
+   * Write the arbitrarily sized signed BigInteger in vint format.
+   *
+   * Signed integers are encoded using the low bit as the sign bit using zigzag
+   * encoding.
+   *
+   * Each byte uses the low 7 bits for data and the high bit for stop/continue.
+   *
+   * Bytes are stored LSB first.
+   * @param output the stream to write to
+   * @param value the value to output
+   * @throws IOException
+   */
+  static void writeBigInteger(OutputStream output,
+                              BigInteger value) throws IOException {
+    // encode the signed number as a positive integer
+    value = value.shiftLeft(1);
+    int sign = value.signum();
+    if (sign < 0) {
+      value = value.negate();
+      value = value.subtract(BigInteger.ONE);
+    }
+    int length = value.bitLength();
+    while (true) {
+      long lowBits = value.longValue() & 0x7fffffffffffffffL;
+      length -= 63;
+      // write out the next 63 bits worth of data
+      for(int i=0; i < 9; ++i) {
+        // if this is the last byte, leave the high bit off
+        if (length <= 0 && (lowBits & ~0x7f) == 0) {
+          output.write((byte) lowBits);
+          return;
+        } else {
+          output.write((byte) (0x80 | (lowBits & 0x7f)));
+          lowBits >>>= 7;
+        }
+      }
+      value = value.shiftRight(63);
+    }
+  }
+
+  /**
+   * Read the signed arbitrary sized BigInteger BigInteger in vint format
+   * @param input the stream to read from
+   * @return the read BigInteger
+   * @throws IOException
+   */
+  static BigInteger readBigInteger(InputStream input) throws IOException {
+    BigInteger result = BigInteger.ZERO;
+    long work = 0;
+    int offset = 0;
+    long b;
+    do {
+      b = input.read();
+      if (b == -1) {
+        throw new EOFException("Reading BigInteger past EOF from " + input);
+      }
+      work |= (0x7f & b) << (offset % 63);
+      offset += 7;
+      // if we've read 63 bits, roll them into the result
+      if (offset == 63) {
+        result = BigInteger.valueOf(work);
+        work = 0;
+      } else if (offset % 63 == 0) {
+        result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63));
+        work = 0;
+      }
+    } while (b >= 0x80);
+    if (work != 0) {
+      result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63));
+    }
+    // convert back to a signed number
+    boolean isNegative = result.testBit(0);
+    if (isNegative) {
+      result = result.add(BigInteger.ONE);
+      result = result.negate();
+    }
+    result = result.shiftRight(1);
+    return result;
+  }
+
+  enum FixedBitSizes {
+    ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+    THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+    TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+    TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+  }
+
+  /**
+   * Count the number of bits required to encode the given value
+   * @param value
+   * @return bits required to store value
+   */
+  int findClosestNumBits(long value) {
+    int count = 0;
+    while (value != 0) {
+      count++;
+      value = value >>> 1;
+    }
+    return getClosestFixedBits(count);
+  }
+
+  /**
+   * zigzag encode the given value
+   * @param val
+   * @return zigzag encoded value
+   */
+  long zigzagEncode(long val) {
+    return (val << 1) ^ (val >> 63);
+  }
+
+  /**
+   * zigzag decode the given value
+   * @param val
+   * @return zizag decoded value
+   */
+  long zigzagDecode(long val) {
+    return (val >>> 1) ^ -(val & 1);
+  }
+
+  /**
+   * Compute the bits required to represent pth percentile value
+   * @param data - array
+   * @param p - percentile value (>=0.0 to <=1.0)
+   * @return pth percentile bits
+   */
+  int percentileBits(long[] data, int offset, int length, double p) {
+    if ((p > 1.0) || (p <= 0.0)) {
+      return -1;
+    }
+
+    // histogram that store the encoded bit requirement for each values.
+    // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+    int[] hist = new int[32];
+
+    // compute the histogram
+    for(int i = offset; i < (offset + length); i++) {
+      int idx = encodeBitWidth(findClosestNumBits(data[i]));
+      hist[idx] += 1;
+    }
+
+    int perLen = (int) (length * (1.0 - p));
+
+    // return the bits required by pth percentile length
+    for(int i = hist.length - 1; i >= 0; i--) {
+      perLen -= hist[i];
+      if (perLen < 0) {
+        return decodeBitWidth(i);
+      }
+    }
+
+    return 0;
+  }
+
+  /**
+   * Calculate the number of bytes required
+   * @param n - number of values
+   * @param numBits - bit width
+   * @return number of bytes required
+   */
+  int getTotalBytesRequired(int n, int numBits) {
+    return (n * numBits + 7) / 8;
+  }
+
+  /**
+   * For a given fixed bit this function will return the closest available 
fixed
+   * bit
+   * @param n
+   * @return closest valid fixed bit
+   */
+  int getClosestFixedBits(int n) {
+    if (n == 0) {
+      return 1;
+    }
+
+    if (n >= 1 && n <= 24) {
+      return n;
+    } else if (n > 24 && n <= 26) {
+      return 26;
+    } else if (n > 26 && n <= 28) {
+      return 28;
+    } else if (n > 28 && n <= 30) {
+      return 30;
+    } else if (n > 30 && n <= 32) {
+      return 32;
+    } else if (n > 32 && n <= 40) {
+      return 40;
+    } else if (n > 40 && n <= 48) {
+      return 48;
+    } else if (n > 48 && n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  public int getClosestAlignedFixedBits(int n) {
+    if (n == 0 ||  n == 1) {
+      return 1;
+    } else if (n > 1 && n <= 2) {
+      return 2;
+    } else if (n > 2 && n <= 4) {
+      return 4;
+    } else if (n > 4 && n <= 8) {
+      return 8;
+    } else if (n > 8 && n <= 16) {
+      return 16;
+    } else if (n > 16 && n <= 24) {
+      return 24;
+    } else if (n > 24 && n <= 32) {
+      return 32;
+    } else if (n > 32 && n <= 40) {
+      return 40;
+    } else if (n > 40 && n <= 48) {
+      return 48;
+    } else if (n > 48 && n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Finds the closest available fixed bit width match and returns its encoded
+   * value (ordinal)
+   * @param n - fixed bit width to encode
+   * @return encoded fixed bit width
+   */
+  int encodeBitWidth(int n) {
+    n = getClosestFixedBits(n);
+
+    if (n >= 1 && n <= 24) {
+      return n - 1;
+    } else if (n > 24 && n <= 26) {
+      return FixedBitSizes.TWENTYSIX.ordinal();
+    } else if (n > 26 && n <= 28) {
+      return FixedBitSizes.TWENTYEIGHT.ordinal();
+    } else if (n > 28 && n <= 30) {
+      return FixedBitSizes.THIRTY.ordinal();
+    } else if (n > 30 && n <= 32) {
+      return FixedBitSizes.THIRTYTWO.ordinal();
+    } else if (n > 32 && n <= 40) {
+      return FixedBitSizes.FORTY.ordinal();
+    } else if (n > 40 && n <= 48) {
+      return FixedBitSizes.FORTYEIGHT.ordinal();
+    } else if (n > 48 && n <= 56) {
+      return FixedBitSizes.FIFTYSIX.ordinal();
+    } else {
+      return FixedBitSizes.SIXTYFOUR.ordinal();
+    }
+  }
+
+  /**
+   * Decodes the ordinal fixed bit value to actual fixed bit width value
+   * @param n - encoded fixed bit width
+   * @return decoded fixed bit width
+   */
+  int decodeBitWidth(int n) {
+    if (n >= FixedBitSizes.ONE.ordinal()
+        && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+      return n + 1;
+    } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+      return 26;
+    } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+      return 28;
+    } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+      return 30;
+    } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+      return 32;
+    } else if (n == FixedBitSizes.FORTY.ordinal()) {
+      return 40;
+    } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+      return 48;
+    } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Bitpack and write the input values to underlying output stream
+   * @param input - values to write
+   * @param offset - offset
+   * @param len - length
+   * @param bitSize - bit width
+   * @param output - output stream
+   * @throws IOException
+   */
+  void writeInts(long[] input, int offset, int len, int bitSize,
+                        OutputStream output) throws IOException {
+    if (input == null || input.length < 1 || offset < 0 || len < 1
+        || bitSize < 1) {
+      return;
+    }
+
+    switch (bitSize) {
+    case 1:
+      unrolledBitPack1(input, offset, len, output);
+      return;
+    case 2:
+      unrolledBitPack2(input, offset, len, output);
+      return;
+    case 4:
+      unrolledBitPack4(input, offset, len, output);
+      return;
+    case 8:
+      unrolledBitPack8(input, offset, len, output);
+      return;
+    case 16:
+      unrolledBitPack16(input, offset, len, output);
+      return;
+    case 24:
+      unrolledBitPack24(input, offset, len, output);
+      return;
+    case 32:
+      unrolledBitPack32(input, offset, len, output);
+      return;
+    case 40:
+      unrolledBitPack40(input, offset, len, output);
+      return;
+    case 48:
+      unrolledBitPack48(input, offset, len, output);
+      return;
+    case 56:
+      unrolledBitPack56(input, offset, len, output);
+      return;
+    case 64:
+      unrolledBitPack64(input, offset, len, output);
+      return;
+    default:
+      break;
+    }
+
+    int bitsLeft = 8;
+    byte current = 0;
+    for(int i = offset; i < (offset + len); i++) {
+      long value = input[i];
+      int bitsToWrite = bitSize;
+      while (bitsToWrite > bitsLeft) {
+        // add the bits to the bottom of the current word
+        current |= value >>> (bitsToWrite - bitsLeft);
+        // subtract out the bits we just added
+        bitsToWrite -= bitsLeft;
+        // zero out the bits above bitsToWrite
+        value &= (1L << bitsToWrite) - 1;
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+      bitsLeft -= bitsToWrite;
+      current |= value << bitsLeft;
+      if (bitsLeft == 0) {
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+    }
+
+    // flush
+    if (bitsLeft != 8) {
+      output.write(current);
+      current = 0;
+      bitsLeft = 8;
+    }
+  }
+
+  private void unrolledBitPack1(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 1) << 7)
+          | ((input[i + 1] & 1) << 6)
+          | ((input[i + 2] & 1) << 5)
+          | ((input[i + 3] & 1) << 4)
+          | ((input[i + 4] & 1) << 3)
+          | ((input[i + 5] & 1) << 2)
+          | ((input[i + 6] & 1) << 1)
+          | (input[i + 7]) & 1);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 7;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 1) << startShift);
+        startShift -= 1;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack2(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 4;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 3) << 6)
+          | ((input[i + 1] & 3) << 4)
+          | ((input[i + 2] & 3) << 2)
+          | (input[i + 3]) & 3);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 6;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 3) << startShift);
+        startShift -= 2;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack4(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 2;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 4;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 15) << startShift);
+        startShift -= 4;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack8(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 1);
+  }
+
+  private void unrolledBitPack16(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 2);
+  }
+
+  private void unrolledBitPack24(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 3);
+  }
+
+  private void unrolledBitPack32(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 4);
+  }
+
+  private void unrolledBitPack40(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 5);
+  }
+
+  private void unrolledBitPack48(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 6);
+  }
+
+  private void unrolledBitPack56(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 7);
+  }
+
+  private void unrolledBitPack64(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 8);
+  }
+
+  private void unrolledBitPackBytes(long[] input, int offset, int len, 
OutputStream output, int numBytes) throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int i = offset;
+    for (; i < endUnroll; i = i + numHops) {
+      writeLongBE(output, input, i, numHops, numBytes);
+    }
+
+    if (remainder > 0) {
+      writeRemainingLongs(output, i, input, remainder, numBytes);
+    }
+  }
+
+  private void writeRemainingLongs(OutputStream output, int offset, long[] 
input, int remainder,
+      int numBytes) throws IOException {
+    final int numHops = remainder;
+
+    int idx = 0;
+    switch (numBytes) {
+    case 1:
+      while (remainder > 0) {
+        writeBuffer[idx] = (byte) (input[offset + idx] & 255);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 2:
+      while (remainder > 0) {
+        writeLongBE2(output, input[offset + idx], idx * 2);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 3:
+      while (remainder > 0) {
+        writeLongBE3(output, input[offset + idx], idx * 3);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 4:
+      while (remainder > 0) {
+        writeLongBE4(output, input[offset + idx], idx * 4);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 5:
+      while (remainder > 0) {
+        writeLongBE5(output, input[offset + idx], idx * 5);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 6:
+      while (remainder > 0) {
+        writeLongBE6(output, input[offset + idx], idx * 6);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 7:
+      while (remainder > 0) {
+        writeLongBE7(output, input[offset + idx], idx * 7);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 8:
+      while (remainder > 0) {
+        writeLongBE8(output, input[offset + idx], idx * 8);
+        remainder--;
+        idx++;
+      }
+      break;
+    default:
+      break;
+    }
+
+    final int toWrite = numHops * numBytes;
+    output.write(writeBuffer, 0, toWrite);
+  }
+
+  private void writeLongBE(OutputStream output, long[] input, int offset, int 
numHops, int numBytes) throws IOException {
+
+    switch (numBytes) {
+    case 1:
+      writeBuffer[0] = (byte) (input[offset + 0] & 255);
+      writeBuffer[1] = (byte) (input[offset + 1] & 255);
+      writeBuffer[2] = (byte) (input[offset + 2] & 255);
+      writeBuffer[3] = (byte) (input[offset + 3] & 255);
+      writeBuffer[4] = (byte) (input[offset + 4] & 255);
+      writeBuffer[5] = (byte) (input[offset + 5] & 255);
+      writeBuffer[6] = (byte) (input[offset + 6] & 255);
+      writeBuffer[7] = (byte) (input[offset + 7] & 255);
+      break;
+    case 2:
+      writeLongBE2(output, input[offset + 0], 0);
+      writeLongBE2(output, input[offset + 1], 2);
+      writeLongBE2(output, input[offset + 2], 4);
+      writeLongBE2(output, input[offset + 3], 6);
+      writeLongBE2(output, input[offset + 4], 8);
+      writeLongBE2(output, input[offset + 5], 10);
+      writeLongBE2(output, input[offset + 6], 12);
+      writeLongBE2(output, input[offset + 7], 14);
+      break;
+    case 3:
+      writeLongBE3(output, input[offset + 0], 0);
+      writeLongBE3(output, input[offset + 1], 3);
+      writeLongBE3(output, input[offset + 2], 6);
+      writeLongBE3(output, input[offset + 3], 9);
+      writeLongBE3(output, input[offset + 4], 12);
+      writeLongBE3(output, input[offset + 5], 15);
+      writeLongBE3(output, input[offset + 6], 18);
+      writeLongBE3(output, input[offset + 7], 21);
+      break;
+    case 4:
+      writeLongBE4(output, input[offset + 0], 0);
+      writeLongBE4(output, input[offset + 1], 4);
+      writeLongBE4(output, input[offset + 2], 8);
+      writeLongBE4(output, input[offset + 3], 12);
+      writeLongBE4(output, input[offset + 4], 16);
+      writeLongBE4(output, input[offset + 5], 20);
+      writeLongBE4(output, input[offset + 6], 24);
+      writeLongBE4(output, input[offset + 7], 28);
+      break;
+    case 5:
+      writeLongBE5(output, input[offset + 0], 0);
+      writeLongBE5(output, input[offset + 1], 5);
+      writeLongBE5(output, input[offset + 2], 10);
+      writeLongBE5(output, input[offset + 3], 15);
+      writeLongBE5(output, input[offset + 4], 20);
+      writeLongBE5(output, input[offset + 5], 25);
+      writeLongBE5(output, input[offset + 6], 30);
+      writeLongBE5(output, input[offset + 7], 35);
+      break;
+    case 6:
+      writeLongBE6(output, input[offset + 0], 0);
+      writeLongBE6(output, input[offset + 1], 6);
+      writeLongBE6(output, input[offset + 2], 12);
+      writeLongBE6(output, input[offset + 3], 18);
+      writeLongBE6(output, input[offset + 4], 24);
+      writeLongBE6(output, input[offset + 5], 30);
+      writeLongBE6(output, input[offset + 6], 36);
+      writeLongBE6(output, input[offset + 7], 42);
+      break;
+    case 7:
+      writeLongBE7(output, input[offset + 0], 0);
+      writeLongBE7(output, input[offset + 1], 7);
+      writeLongBE7(output, input[offset + 2], 14);
+      writeLongBE7(output, input[offset + 3], 21);
+      writeLongBE7(output, input[offset + 4], 28);
+      writeLongBE7(output, input[offset + 5], 35);
+      writeLongBE7(output, input[offset + 6], 42);
+      writeLongBE7(output, input[offset + 7], 49);
+      break;
+    case 8:
+      writeLongBE8(output, input[offset + 0], 0);
+      writeLongBE8(output, input[offset + 1], 8);
+      writeLongBE8(output, input[offset + 2], 16);
+      writeLongBE8(output, input[offset + 3], 24);
+      writeLongBE8(output, input[offset + 4], 32);
+      writeLongBE8(output, input[offset + 5], 40);
+      writeLongBE8(output, input[offset + 6], 48);
+      writeLongBE8(output, input[offset + 7], 56);
+      break;
+      default:
+        break;
+    }
+
+    final int toWrite = numHops * numBytes;
+    output.write(writeBuffer, 0, toWrite);
+  }
+
+  private void writeLongBE2(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE3(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE4(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE5(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE6(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE7(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 48);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 6] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE8(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 56);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 48);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 6] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 7] =  (byte) (val >>> 0);
+  }
+
+  // Do not want to use Guava LongMath.checkedSubtract() here as it will throw
+  // ArithmeticException in case of overflow
+  public boolean isSafeSubtract(long left, long right) {
+    return (left ^ right) >= 0 | (left ^ (left - right)) >= 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java
new file mode 100644
index 0000000..285a32a
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.iq80.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
+
+  Boolean direct = null;
+
+  @Override
+  public boolean compress(ByteBuffer in, ByteBuffer out,
+                          ByteBuffer overflow) throws IOException {
+    int inBytes = in.remaining();
+    // I should work on a patch for Snappy to support an overflow buffer
+    // to prevent the extra buffer copy.
+    byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)];
+    int outBytes =
+        Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
+          compressed, 0);
+    if (outBytes < inBytes) {
+      int remaining = out.remaining();
+      if (remaining >= outBytes) {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), outBytes);
+        out.position(out.position() + outBytes);
+      } else {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), remaining);
+        out.position(out.limit());
+        System.arraycopy(compressed, remaining, overflow.array(),
+            overflow.arrayOffset(), outBytes - remaining);
+        overflow.position(outBytes - remaining);
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    if(in.isDirect() && out.isDirect()) {
+      directDecompress(in, out);
+      return;
+    }
+    int inOffset = in.position();
+    int uncompressLen =
+        Snappy.uncompress(in.array(), in.arrayOffset() + inOffset,
+          in.limit() - inOffset, out.array(), out.arrayOffset() + 
out.position());
+    out.position(uncompressLen + out.position());
+    out.flip();
+  }
+
+  @Override
+  public boolean isAvailable() {
+    if (direct == null) {
+      try {
+        if (ShimLoader.getHadoopShims().getDirectDecompressor(
+            DirectCompressionType.SNAPPY) != null) {
+          direct = Boolean.valueOf(true);
+        } else {
+          direct = Boolean.valueOf(false);
+        }
+      } catch (UnsatisfiedLinkError ule) {
+        direct = Boolean.valueOf(false);
+      }
+    }
+    return direct.booleanValue();
+  }
+
+  @Override
+  public void directDecompress(ByteBuffer in, ByteBuffer out)
+      throws IOException {
+    DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
+        .getDirectDecompressor(DirectCompressionType.SNAPPY);
+    decompressShim.decompress(in, out);
+    out.flip(); // flip for read
+  }
+
+  @Override
+  public CompressionCodec modify(EnumSet<Modifier> modifiers) {
+    // snappy allows no modifications
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java
new file mode 100644
index 0000000..3821645
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * The name of a stream within a stripe.
+ */
+class StreamName implements Comparable<StreamName> {
+  private final int column;
+  private final OrcProto.Stream.Kind kind;
+
+  public enum Area {
+    DATA, INDEX
+  }
+
+  public StreamName(int column, OrcProto.Stream.Kind kind) {
+    this.column = column;
+    this.kind = kind;
+  }
+
+  public boolean equals(Object obj) {
+    if (obj != null && obj instanceof StreamName) {
+      StreamName other = (StreamName) obj;
+      return other.column == column && other.kind == kind;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int compareTo(StreamName streamName) {
+    if (streamName == null) {
+      return -1;
+    }
+    Area area = getArea(kind);
+    Area otherArea = StreamName.getArea(streamName.kind);
+    if (area != otherArea) {
+      return -area.compareTo(otherArea);
+    }
+    if (column != streamName.column) {
+      return column < streamName.column ? -1 : 1;
+    }
+    return kind.compareTo(streamName.kind);
+  }
+
+  public int getColumn() {
+    return column;
+  }
+
+  public OrcProto.Stream.Kind getKind() {
+    return kind;
+  }
+
+  public Area getArea() {
+    return getArea(kind);
+  }
+
+  public static Area getArea(OrcProto.Stream.Kind kind) {
+    switch (kind) {
+      case ROW_INDEX:
+      case DICTIONARY_COUNT:
+      case BLOOM_FILTER:
+        return Area.INDEX;
+      default:
+        return Area.DATA;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Stream for column " + column + " kind " + kind;
+  }
+
+  @Override
+  public int hashCode() {
+    return column * 101 + kind.getNumber();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java
new file mode 100644
index 0000000..4248664
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics for string columns.
+ */
+public interface StringColumnStatistics extends ColumnStatistics {
+  /**
+   * Get the minimum string.
+   * @return the minimum
+   */
+  String getMinimum();
+
+  /**
+   * Get the maximum string.
+   * @return the maximum
+   */
+  String getMaximum();
+
+  /**
+   * Get the total length of all strings
+   * @return the sum (total length)
+   */
+  long getSum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java
new file mode 100644
index 0000000..8835cef
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A red-black tree that stores strings. The strings are stored as UTF-8 bytes
+ * and an offset for each entry.
+ */
+class StringRedBlackTree extends RedBlackTree {
+  private final DynamicByteArray byteArray = new DynamicByteArray();
+  private final DynamicIntArray keyOffsets;
+  private String newKey;
+
+  public StringRedBlackTree(int initialCapacity) {
+    super(initialCapacity);
+    keyOffsets = new DynamicIntArray(initialCapacity);
+  }
+
+  public int add(String value) {
+    newKey = value;
+    return addNewKey();
+  }
+
+  private int addNewKey() {
+    // if the newKey is actually new, add it to our byteArray and store the 
offset & length
+    if (add()) {
+      int len = newKey.length();
+      keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len));
+    }
+    return lastAdd;
+  }
+
+  public int add(Text value) {
+    newKey = value.toString();
+    return addNewKey();
+  }
+
+  @Override
+  protected int compareValue(int position) {
+    int start = keyOffsets.get(position);
+    int end;
+    if (position + 1 == keyOffsets.size()) {
+      end = byteArray.size();
+    } else {
+      end = keyOffsets.get(position+1);
+    }
+    return byteArray.compare(newKey.getBytes(), 0, newKey.length(),
+                             start, end - start);
+  }
+
+  /**
+   * The information about each node.
+   */
+  public interface VisitorContext {
+    /**
+     * Get the position where the key was originally added.
+     * @return the number returned by add.
+     */
+    int getOriginalPosition();
+
+    /**
+     * Write the bytes for the string to the given output stream.
+     * @param out the stream to write to.
+     * @throws IOException
+     */
+    void writeBytes(OutputStream out) throws IOException;
+
+    /**
+     * Get the original string.
+     * @return the string
+     */
+    Text getText();
+
+    /**
+     * Get the number of bytes.
+     * @return the string's length in bytes
+     */
+    int getLength();
+  }
+
+  /**
+   * The interface for visitors.
+   */
+  public interface Visitor {
+    /**
+     * Called once for each node of the tree in sort order.
+     * @param context the information about each node
+     * @throws IOException
+     */
+    void visit(VisitorContext context) throws IOException;
+  }
+
+  private class VisitorContextImpl implements VisitorContext {
+    private int originalPosition;
+    private int start;
+    private int end;
+    private final Text text = new Text();
+
+    public int getOriginalPosition() {
+      return originalPosition;
+    }
+
+    public Text getText() {
+      byteArray.setText(text, start, end - start);
+      return text;
+    }
+
+    public void writeBytes(OutputStream out) throws IOException {
+      byteArray.write(out, start, end - start);
+    }
+
+    public int getLength() {
+      return end - start;
+    }
+
+    void setPosition(int position) {
+      originalPosition = position;
+      start = keyOffsets.get(originalPosition);
+      if (position + 1 == keyOffsets.size()) {
+        end = byteArray.size();
+      } else {
+        end = keyOffsets.get(originalPosition + 1);
+      }
+    }
+  }
+
+  private void recurse(int node, Visitor visitor, VisitorContextImpl context
+                      ) throws IOException {
+    if (node != NULL) {
+      recurse(getLeft(node), visitor, context);
+      context.setPosition(node);
+      visitor.visit(context);
+      recurse(getRight(node), visitor, context);
+    }
+  }
+
+  /**
+   * Visit all of the nodes in the tree in sorted order.
+   * @param visitor the action to be applied to each node
+   * @throws IOException
+   */
+  public void visit(Visitor visitor) throws IOException {
+    recurse(root, visitor, new VisitorContextImpl());
+  }
+
+  /**
+   * Reset the table to empty.
+   */
+  public void clear() {
+    super.clear();
+    byteArray.clear();
+    keyOffsets.clear();
+  }
+
+  public void getText(Text result, int originalPosition) {
+    int offset = keyOffsets.get(originalPosition);
+    int length;
+    if (originalPosition + 1 == keyOffsets.size()) {
+      length = byteArray.size() - offset;
+    } else {
+      length = keyOffsets.get(originalPosition + 1) - offset;
+    }
+    byteArray.setText(result, offset, length);
+  }
+
+  /**
+   * Get the size of the character data in the table.
+   * @return the bytes used by the table
+   */
+  public int getCharacterSize() {
+    return byteArray.size();
+  }
+
+  /**
+   * Calculate the approximate size in memory.
+   * @return the number of bytes used in storing the tree.
+   */
+  public long getSizeInBytes() {
+    return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() +
+      super.getSizeInBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java
new file mode 100644
index 0000000..62819c1
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Information about the stripes in an ORC file that is provided by the Reader.
+ */
+public interface StripeInformation {
+  /**
+   * Get the byte offset of the start of the stripe.
+   * @return the bytes from the start of the file
+   */
+  long getOffset();
+
+  /**
+   * Get the total length of the stripe in bytes.
+   * @return the number of bytes in the stripe
+   */
+  long getLength();
+
+  /**
+   * Get the length of the stripe's indexes.
+   * @return the number of bytes in the index
+   */
+  long getIndexLength();
+
+  /**
+   * Get the length of the stripe's data.
+   * @return the number of bytes in the stripe
+   */
+  long getDataLength();
+
+  /**
+   * Get the length of the stripe's tail section, which contains its index.
+   * @return the number of bytes in the tail
+   */
+  long getFooterLength();
+
+  /**
+   * Get the number of rows in the stripe.
+   * @return a count of the number of rows
+   */
+  long getNumberOfRows();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java
new file mode 100644
index 0000000..013fc8e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.util.List;
+
+public class StripeStatistics {
+  private final List<OrcProto.ColumnStatistics> cs;
+
+  StripeStatistics(List<OrcProto.ColumnStatistics> list) {
+    this.cs = list;
+  }
+
+  /**
+   * Return list of column statistics
+   *
+   * @return column stats
+   */
+  public ColumnStatistics[] getColumnStatistics() {
+    ColumnStatistics[] result = new ColumnStatistics[cs.size()];
+    for (int i = 0; i < result.length; ++i) {
+      result[i] = ColumnStatisticsImpl.deserialize(cs.get(i));
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java
new file mode 100644
index 0000000..6fad0ac
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.sql.Timestamp;
+
+/**
+ * Statistics for Timestamp columns.
+ */
+public interface TimestampColumnStatistics extends ColumnStatistics {
+  /**
+   * Get the minimum value for the column.
+   * @return minimum value
+   */
+  Timestamp getMinimum();
+
+  /**
+   * Get the maximum value for the column.
+   * @return maximum value
+   */
+  Timestamp getMaximum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java
new file mode 100644
index 0000000..669b44f
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * The interface for writing ORC files.
+ */
+public interface Writer {
+  /**
+   * Add arbitrary meta-data to the ORC file. This may be called at any point
+   * until the Writer is closed. If the same key is passed a second time, the
+   * second value will replace the first.
+   * @param key a key to label the data with.
+   * @param value the contents of the metadata.
+   */
+  void addUserMetadata(String key, ByteBuffer value);
+
+  void addTuple(Tuple tuple) throws IOException;
+
+  /**
+   * Flush all of the buffers and close the file. No methods on this writer
+   * should be called afterwards.
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * Return the deserialized data size. Raw data size will be compute when
+   * writing the file footer. Hence raw data size value will be available only
+   * after closing the writer.
+   *
+   * @return raw data size
+   */
+  long getRawDataSize();
+
+  /**
+   * Return the number of rows in file. Row count gets updated when flushing
+   * the stripes. To get accurate row count this method should be called after
+   * closing the writer.
+   *
+   * @return row count
+   */
+  long getNumberOfRows();
+
+  /**
+   * Write an intermediate footer on the file such that if the file is
+   * truncated to the returned offset, it would be a valid ORC file.
+   * @return the offset that would be a valid end location for an ORC file
+   */
+  long writeIntermediateFooter() throws IOException;
+
+  /**
+   * Fast stripe append to ORC file. This interface is used for fast ORC file
+   * merge with other ORC files. When merging, the file to be merged should 
pass
+   * stripe in binary form along with stripe information and stripe statistics.
+   * After appending last stripe of a file, use appendUserMetadata() to append
+   * any user metadata.
+   * @param stripe - stripe as byte array
+   * @param offset - offset within byte array
+   * @param length - length of stripe within byte array
+   * @param stripeInfo - stripe information
+   * @param stripeStatistics - stripe statistics (Protobuf objects can be
+   *                         merged directly)
+   * @throws IOException
+   */
+  public void appendStripe(byte[] stripe, int offset, int length,
+                           StripeInformation stripeInfo,
+                           OrcProto.StripeStatistics stripeStatistics) throws 
IOException;
+
+  /**
+   * When fast stripe append is used for merging ORC stripes, after appending
+   * the last stripe from a file, this interface must be used to merge any
+   * user metadata.
+   * @param userMetadata - user metadata
+   */
+  public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata);
+}

Reply via email to