http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
new file mode 100644
index 0000000..a347706
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Dynamic int array that uses primitive types and chunks to avoid copying
+ * large number of integers when it resizes.
+ *
+ * The motivation for this class is memory optimization, i.e. space efficient
+ * storage of potentially huge arrays without good a-priori size guesses.
+ *
+ * The API of this class is between a primitive array and a AbstractList. It's
+ * not a Collection implementation because it handles primitive types, but the
+ * API could be extended to support iterators and the like.
+ *
+ * NOTE: Like standard Collection implementations/arrays, this class is not
+ * synchronized.
+ */
+final class DynamicIntArray {
+  static final int DEFAULT_CHUNKSIZE = 8 * 1024;
+  static final int INIT_CHUNKS = 128;
+
+  private final int chunkSize;       // our allocation size
+  private int[][] data;              // the real data
+  private int length;                // max set element index +1
+  private int initializedChunks = 0; // the number of created chunks
+
+  public DynamicIntArray() {
+    this(DEFAULT_CHUNKSIZE);
+  }
+
+  public DynamicIntArray(int chunkSize) {
+    this.chunkSize = chunkSize;
+
+    data = new int[INIT_CHUNKS][];
+  }
+
+  /**
+   * Ensure that the given index is valid.
+   */
+  private void grow(int chunkIndex) {
+    if (chunkIndex >= initializedChunks) {
+      if (chunkIndex >= data.length) {
+        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+        int[][] newChunk = new int[newSize][];
+        System.arraycopy(data, 0, newChunk, 0, data.length);
+        data = newChunk;
+      }
+      for (int i=initializedChunks; i <= chunkIndex; ++i) {
+        data[i] = new int[chunkSize];
+      }
+      initializedChunks = chunkIndex + 1;
+    }
+  }
+
+  public int get(int index) {
+    if (index >= length) {
+      throw new IndexOutOfBoundsException("Index " + index +
+                                            " is outside of 0.." +
+                                            (length - 1));
+    }
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    return data[i][j];
+  }
+
+  public void set(int index, int value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] = value;
+  }
+
+  public void increment(int index, int value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] += value;
+  }
+
+  public void add(int value) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow(i);
+    data[i][j] = value;
+    length += 1;
+  }
+
+  public int size() {
+    return length;
+  }
+
+  public void clear() {
+    length = 0;
+    for(int i=0; i < data.length; ++i) {
+      data[i] = null;
+    }
+    initializedChunks = 0;
+  }
+
+  public String toString() {
+    int i;
+    StringBuilder sb = new StringBuilder(length * 4);
+
+    sb.append('{');
+    int l = length - 1;
+    for (i=0; i<l; i++) {
+      sb.append(get(i));
+      sb.append(',');
+    }
+    sb.append(get(i));
+    sb.append('}');
+
+    return sb.toString();
+  }
+
+  public int getSizeInBytes() {
+    return 4 * initializedChunks * chunkSize;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
new file mode 100644
index 0000000..208454f
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics for all of the integer columns, such as byte, short, int, and
+ * long.
+ */
+public interface IntegerColumnStatistics extends ColumnStatistics {
+  /**
+   * Get the smallest value in the column. Only defined if getNumberOfValues
+   * is non-zero.
+   * @return the minimum
+   */
+  long getMinimum();
+
+  /**
+   * Get the largest value in the column. Only defined if getNumberOfValues
+   * is non-zero.
+   * @return the maximum
+   */
+  long getMaximum();
+
+  /**
+   * Is the sum defined? If the sum overflowed the counter this will be false.
+   * @return is the sum available
+   */
+  boolean isSumDefined();
+
+  /**
+   * Get the sum of the column. Only valid if isSumDefined returns true.
+   * @return the sum of the column
+   */
+  long getSum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
new file mode 100644
index 0000000..6872882
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+  /**
+   * Get position from the stream.
+   * @param recorder
+   * @throws IOException
+   */
+  void getPosition(PositionRecorder recorder) throws IOException;
+
+  /**
+   * Write the integer value
+   * @param value
+   * @throws IOException
+   */
+  void write(long value) throws IOException;
+
+  /**
+   * Flush the buffer
+   * @throws IOException
+   */
+  void flush() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
new file mode 100644
index 0000000..8cd40f7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ *
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
+ */
+class MemoryManager {
+
+  private static final Log LOG = LogFactory.getLog(MemoryManager.class);
+
+  /**
+   * How often should we check the memory sizes? Measured in rows added
+   * to all of the writers.
+   */
+  private static final int ROWS_BETWEEN_CHECKS = 5000;
+  private final long totalMemoryPool;
+  private final Map<Path, WriterInfo> writerList =
+      new HashMap<Path, WriterInfo>();
+  private long totalAllocation = 0;
+  private double currentScale = 1;
+  private int rowsAddedSinceCheck = 0;
+  private final OwnedLock ownerLock = new OwnedLock();
+
+  @SuppressWarnings("serial")
+  private static class OwnedLock extends ReentrantLock {
+    public Thread getOwner() {
+      return super.getOwner();
+    }
+  }
+
+  private static class WriterInfo {
+    long allocation;
+    Callback callback;
+    WriterInfo(long allocation, Callback callback) {
+      this.allocation = allocation;
+      this.callback = callback;
+    }
+  }
+
+  public interface Callback {
+    /**
+     * The writer needs to check its memory usage
+     * @param newScale the current scale factor for memory allocations
+     * @return true if the writer was over the limit
+     * @throws IOException
+     */
+    boolean checkMemory(double newScale) throws IOException;
+  }
+
+  /**
+   * Create the memory manager.
+   * @param conf use the configuration to find the maximum size of the memory
+   *             pool.
+   */
+  MemoryManager(Configuration conf) {
+    HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
+    double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+        getHeapMemoryUsage().getMax() * maxLoad);
+    ownerLock.lock();
+  }
+
+  /**
+   * Light weight thread-safety check for multi-threaded access patterns
+   */
+  private void checkOwner() {
+    Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
+      "Owner thread expected %s, got %s",
+      ownerLock.getOwner(),
+      Thread.currentThread());
+  }
+
+  /**
+   * Add a new writer's memory allocation to the pool. We use the path
+   * as a unique key to ensure that we don't get duplicates.
+   * @param path the file that is being written
+   * @param requestedAllocation the requested buffer size
+   */
+  void addWriter(Path path, long requestedAllocation,
+                              Callback callback) throws IOException {
+    checkOwner();
+    WriterInfo oldVal = writerList.get(path);
+    // this should always be null, but we handle the case where the memory
+    // manager wasn't told that a writer wasn't still in use and the task
+    // starts writing to the same path.
+    if (oldVal == null) {
+      oldVal = new WriterInfo(requestedAllocation, callback);
+      writerList.put(path, oldVal);
+      totalAllocation += requestedAllocation;
+    } else {
+      // handle a new writer that is writing to the same path
+      totalAllocation += requestedAllocation - oldVal.allocation;
+      oldVal.allocation = requestedAllocation;
+      oldVal.callback = callback;
+    }
+    updateScale(true);
+  }
+
+  /**
+   * Remove the given writer from the pool.
+   * @param path the file that has been closed
+   */
+  void removeWriter(Path path) throws IOException {
+    checkOwner();
+    WriterInfo val = writerList.get(path);
+    if (val != null) {
+      writerList.remove(path);
+      totalAllocation -= val.allocation;
+      if (writerList.isEmpty()) {
+        rowsAddedSinceCheck = 0;
+      }
+      updateScale(false);
+    }
+    if(writerList.isEmpty()) {
+      rowsAddedSinceCheck = 0;
+    }
+  }
+
+  /**
+   * Get the total pool size that is available for ORC writers.
+   * @return the number of bytes in the pool
+   */
+  long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * The scaling factor for each allocation to ensure that the pool isn't
+   * oversubscribed.
+   * @return a fraction between 0.0 and 1.0 of the requested size that is
+   * available for each writer.
+   */
+  double getAllocationScale() {
+    return currentScale;
+  }
+
+  /**
+   * Give the memory manager an opportunity for doing a memory check.
+   * @throws IOException
+   */
+  void addedRow() throws IOException {
+    if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+      notifyWriters();
+    }
+  }
+
+  /**
+   * Notify all of the writers that they should check their memory usage.
+   * @throws IOException
+   */
+  void notifyWriters() throws IOException {
+    checkOwner();
+    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+    for(WriterInfo writer: writerList.values()) {
+      boolean flushed = writer.callback.checkMemory(currentScale);
+      if (LOG.isDebugEnabled() && flushed) {
+        LOG.debug("flushed " + writer.toString());
+      }
+    }
+    rowsAddedSinceCheck = 0;
+  }
+
+  /**
+   * Update the currentScale based on the current allocation and pool size.
+   * This also updates the notificationTrigger.
+   * @param isAllocate is this an allocation?
+   */
+  private void updateScale(boolean isAllocate) throws IOException {
+    if (totalAllocation <= totalMemoryPool) {
+      currentScale = 1;
+    } else {
+      currentScale = (double) totalMemoryPool / totalAllocation;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
new file mode 100644
index 0000000..dfa4c36
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class Metadata {
+
+  private final OrcProto.Metadata metadata;
+
+  Metadata(OrcProto.Metadata m) {
+    this.metadata = m;
+  }
+
+  /**
+   * Return list of stripe level column statistics
+   *
+   * @return list of stripe statistics
+   */
+  public List<StripeStatistics> getStripeStatistics() {
+    List<StripeStatistics> result = Lists.newArrayList();
+    for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) {
+      result.add(new StripeStatistics(ss.getColStatsList()));
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java
new file mode 100644
index 0000000..b704666
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java
@@ -0,0 +1,149 @@
+//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.conf.Configuration;
+
+// All configs in this class also appear in HiveConf, so any changes here 
should also be made there
+// This is because only HiveConf can provide type checking through the CLI, 
and Presto depends on
+// open source Hive, and so won't work with any variables not in open source 
HiveConf
+public class OrcConf {
+
+  public enum ConfVars {
+    
HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold",
 0.8f),
+
+    // Maximum fraction of heap that can be used by ORC file writers
+    HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+    HIVE_ORC_FILE_MIN_MEMORY_ALLOCATION("hive.exec.orc.min.mem.allocation", 
4194304L), // 4 Mb
+    HIVE_ORC_FILE_ENABLE_LOW_MEMORY_MODE("hive.exec.orc.low.memory", false),
+    HIVE_ORC_ROW_BUFFER_SIZE("hive.exec.orc.row.buffer.size", 100),
+
+    HIVE_ORC_EAGER_HDFS_READ("hive.exec.orc.eager.hdfs.read", true),
+    HIVE_ORC_EAGER_HDFS_READ_BYTES("hive.exec.orc.eager.hdfs.read.bytes", 
193986560), // 185 Mb
+
+    
HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK("hive.orc.row.index.stride.dictionary.check",
 true),
+    HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size", 64L * 
1024 * 1024),
+    HIVE_ORC_DEFAULT_BLOCK_SIZE("hive.exec.orc.default.block.size", 256L * 
1024 * 1024),
+    
HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 
10000),
+    HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 
1024),
+    HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", 
true),
+    HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
+    HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", null), // 0.11 or 0.12
+    HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED"),
+    HIVE_ORC_COMPRESSION_STRATEGY("hive.exec.orc.compression.strategy", 
"SPEED"),
+    HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 
0.05f),
+
+    ;
+
+    public final String varname;
+    public final String defaultVal;
+    public final int defaultIntVal;
+    public final long defaultLongVal;
+    public final float defaultFloatVal;
+    public final boolean defaultBoolVal;
+
+
+    ConfVars(String varname, String defaultVal) {
+      this.varname = varname;
+      this.defaultVal = defaultVal;
+      this.defaultIntVal = -1;
+      this.defaultLongVal = -1;
+      this.defaultFloatVal = -1;
+      this.defaultBoolVal = false;
+    }
+
+    ConfVars(String varname, int defaultIntVal) {
+      this.varname = varname;
+      this.defaultVal = Integer.toString(defaultIntVal);
+      this.defaultIntVal = defaultIntVal;
+      this.defaultLongVal = -1;
+      this.defaultFloatVal = -1;
+      this.defaultBoolVal = false;
+    }
+
+    ConfVars(String varname, long defaultLongVal) {
+      this.varname = varname;
+      this.defaultVal = Long.toString(defaultLongVal);
+      this.defaultIntVal = -1;
+      this.defaultLongVal = defaultLongVal;
+      this.defaultFloatVal = -1;
+      this.defaultBoolVal = false;
+    }
+
+    ConfVars(String varname, float defaultFloatVal) {
+      this.varname = varname;
+      this.defaultVal = Float.toString(defaultFloatVal);
+      this.defaultIntVal = -1;
+      this.defaultLongVal = -1;
+      this.defaultFloatVal = defaultFloatVal;
+      this.defaultBoolVal = false;
+    }
+
+    ConfVars(String varname, boolean defaultBoolVal) {
+      this.varname = varname;
+      this.defaultVal = Boolean.toString(defaultBoolVal);
+      this.defaultIntVal = -1;
+      this.defaultLongVal = -1;
+      this.defaultFloatVal = -1;
+      this.defaultBoolVal = defaultBoolVal;
+    }
+  }
+
+  public static int getIntVar(Configuration conf, ConfVars var) {
+    return conf.getInt(var.varname, var.defaultIntVal);
+  }
+
+  public static void setIntVar(Configuration conf, ConfVars var, int val) {
+    conf.setInt(var.varname, val);
+  }
+
+  public static long getLongVar(Configuration conf, ConfVars var) {
+    return conf.getLong(var.varname, var.defaultLongVal);
+  }
+
+  public static void setLongVar(Configuration conf, ConfVars var, long val) {
+    conf.setLong(var.varname, val);
+  }
+
+  public static float getFloatVar(Configuration conf, ConfVars var) {
+    return conf.getFloat(var.varname, var.defaultFloatVal);
+  }
+
+  public static void setFloatVar(Configuration conf, ConfVars var, float val) {
+    conf.setFloat(var.varname, val);
+  }
+
+  public static boolean getBoolVar(Configuration conf, ConfVars var) {
+    return conf.getBoolean(var.varname, var.defaultBoolVal);
+  }
+
+  public static void setBoolVar(Configuration conf, ConfVars var, boolean val) 
{
+    conf.setBoolean(var.varname, val);
+  }
+
+  public static String getVar(Configuration conf, ConfVars var) {
+    return conf.get(var.varname, var.defaultVal);
+  }
+
+  public static void setVar(Configuration conf, ConfVars var, String val) {
+    conf.set(var.varname, val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
new file mode 100644
index 0000000..a291953
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import static org.apache.tajo.storage.thirdparty.orc.OrcConf.ConfVars.*;
+
+import java.io.IOException;
+import java.util.TimeZone;
+
+/**
+ * Contains factory methods to read or write ORC files.
+ */
+public final class OrcFile {
+
+  public static final String MAGIC = "ORC";
+
+  /**
+   * Create a version number for the ORC file format, so that we can add
+   * non-forward compatible changes in the future. To make it easier for users
+   * to understand the version numbers, we use the Hive release number that
+   * first wrote that version of ORC files.
+   *
+   * Thus, if you add new encodings or other non-forward compatible changes
+   * to ORC files, which prevent the old reader from reading the new format,
+   * you should change these variable to reflect the next Hive release number.
+   * Non-forward compatible changes should never be added in patch releases.
+   *
+   * Do not make any changes that break backwards compatibility, which would
+   * prevent the new reader from reading ORC files generated by any released
+   * version of Hive.
+   */
+  public static enum Version {
+    V_0_11("0.11", 0, 11),
+      V_0_12("0.12", 0, 12);
+
+    public static final Version CURRENT = V_0_12;
+
+    private final String name;
+    private final int major;
+    private final int minor;
+
+    private Version(String name, int major, int minor) {
+      this.name = name;
+      this.major = major;
+      this.minor = minor;
+    }
+
+    public static Version byName(String name) {
+      for(Version version: values()) {
+        if (version.name.equals(name)) {
+          return version;
+        }
+      }
+      throw new IllegalArgumentException("Unknown ORC version " + name);
+    }
+
+    /**
+     * Get the human readable name for the version.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Get the major version number.
+     */
+    public int getMajor() {
+      return major;
+    }
+
+    /**
+     * Get the minor version number.
+     */
+    public int getMinor() {
+      return minor;
+    }
+  }
+
+  /**
+   * Records the version of the writer in terms of which bugs have been fixed.
+   * For bugs in the writer, but the old readers already read the new data
+   * correctly, bump this version instead of the Version.
+   */
+  public static enum WriterVersion {
+    ORIGINAL(0),
+      HIVE_8732(1); // corrupted stripe/file maximum column statistics
+
+    private final int id;
+
+    public int getId() {
+      return id;
+    }
+
+    private WriterVersion(int id) {
+      this.id = id;
+    }
+  }
+
+  public static enum EncodingStrategy {
+    SPEED, COMPRESSION;
+  }
+
+  public static enum CompressionStrategy {
+    SPEED, COMPRESSION;
+  }
+
+  // Note : these string definitions for table properties are deprecated,
+  // and retained only for backward compatibility, please do not add to
+  // them, add to OrcTableProperties below instead
+  @Deprecated public static final String COMPRESSION = "orc.compress";
+  @Deprecated public static final String COMPRESSION_BLOCK_SIZE = 
"orc.compress.size";
+  @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size";
+  @Deprecated public static final String ROW_INDEX_STRIDE = 
"orc.row.index.stride";
+  @Deprecated public static final String ENABLE_INDEXES = "orc.create.index";
+  @Deprecated public static final String BLOCK_PADDING = "orc.block.padding";
+
+  /**
+   * Enum container for all orc table properties.
+   * If introducing a new orc-specific table property,
+   * add it here.
+   */
+  public static enum OrcTableProperties {
+    COMPRESSION("orc.compress"),
+    COMPRESSION_BLOCK_SIZE("orc.compress.size"),
+    STRIPE_SIZE("orc.stripe.size"),
+    BLOCK_SIZE("orc.block.size"),
+    ROW_INDEX_STRIDE("orc.row.index.stride"),
+    ENABLE_INDEXES("orc.create.index"),
+    BLOCK_PADDING("orc.block.padding"),
+    ENCODING_STRATEGY("orc.encoding.strategy"),
+    BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"),
+    BLOOM_FILTER_FPP("orc.bloom.filter.fpp");
+
+    private final String propName;
+
+    OrcTableProperties(String propName) {
+      this.propName = propName;
+    }
+
+    public String getPropName(){
+      return this.propName;
+    }
+  }
+
+  // unused
+  private OrcFile() {}
+
+  public static interface WriterContext {
+    Writer getWriter();
+  }
+
+  public static interface WriterCallback {
+    public void preStripeWrite(WriterContext context) throws IOException;
+    public void preFooterWrite(WriterContext context) throws IOException;
+  }
+
+  /**
+   * Options for creating ORC file writers.
+   */
+  public static class WriterOptions {
+    private final Configuration configuration;
+    private FileSystem fileSystemValue = null;
+    private ObjectInspector inspectorValue = null;
+    private long stripeSizeValue;
+    private long blockSizeValue;
+    private int rowIndexStrideValue;
+    private int bufferSizeValue;
+    private boolean blockPaddingValue;
+    private CompressionKind compressValue;
+    private MemoryManager memoryManagerValue;
+    private Version versionValue;
+    private WriterCallback callback;
+    private EncodingStrategy encodingStrategy;
+    private CompressionStrategy compressionStrategy;
+    private float paddingTolerance;
+    private String bloomFilterColumns;
+    private double bloomFilterFpp;
+    private TimeZone timezone;
+
+    WriterOptions(Configuration conf) {
+      configuration = conf;
+      memoryManagerValue = getMemoryManager(conf);
+      stripeSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE);
+      blockSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE);
+      rowIndexStrideValue = OrcConf.getIntVar(conf, 
HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE);
+      bufferSizeValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE);
+      blockPaddingValue = OrcConf.getBoolVar(conf, 
HIVE_ORC_DEFAULT_BLOCK_PADDING);
+      compressValue = CompressionKind.valueOf(OrcConf.getVar(conf, 
HIVE_ORC_DEFAULT_COMPRESS));
+      String versionName = OrcConf.getVar(conf, HIVE_ORC_WRITE_FORMAT);
+      if (versionName == null) {
+        versionValue = Version.CURRENT;
+      } else {
+        versionValue = Version.byName(versionName);
+      }
+      String enString =
+          conf.get(OrcConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
+      if (enString == null) {
+        encodingStrategy = EncodingStrategy.SPEED;
+      } else {
+        encodingStrategy = EncodingStrategy.valueOf(enString);
+      }
+
+      String compString = conf
+          .get(OrcConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname);
+      if (compString == null) {
+        compressionStrategy = CompressionStrategy.SPEED;
+      } else {
+        compressionStrategy = CompressionStrategy.valueOf(compString);
+      }
+
+      paddingTolerance = 
conf.getFloat(OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
+          OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
+      bloomFilterFpp = BloomFilterIO.DEFAULT_FPP;
+    }
+
+    /**
+     * Provide the filesystem for the path, if the client has it available.
+     * If it is not provided, it will be found from the path.
+     */
+    public WriterOptions fileSystem(FileSystem value) {
+      fileSystemValue = value;
+      return this;
+    }
+
+    /**
+     * Set the stripe size for the file. The writer stores the contents of the
+     * stripe in memory until this memory limit is reached and the stripe
+     * is flushed to the HDFS file and the next stripe started.
+     */
+    public WriterOptions stripeSize(long value) {
+      stripeSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Set the file system block size for the file. For optimal performance,
+     * set the block size to be multiple factors of stripe size.
+     */
+    public WriterOptions blockSize(long value) {
+      blockSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Set the distance between entries in the row index. The minimum value is
+     * 1000 to prevent the index from overwhelming the data. If the stride is
+     * set to 0, no indexes will be included in the file.
+     */
+    public WriterOptions rowIndexStride(int value) {
+      rowIndexStrideValue = value;
+      return this;
+    }
+
+    /**
+     * The size of the memory buffers used for compressing and storing the
+     * stripe in memory.
+     */
+    public WriterOptions bufferSize(int value) {
+      bufferSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Sets whether the HDFS blocks are padded to prevent stripes from
+     * straddling blocks. Padding improves locality and thus the speed of
+     * reading, but costs space.
+     */
+    public WriterOptions blockPadding(boolean value) {
+      blockPaddingValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the encoding strategy that is used to encode the data.
+     */
+    public WriterOptions encodingStrategy(EncodingStrategy strategy) {
+      encodingStrategy = strategy;
+      return this;
+    }
+
+    /**
+     * Sets the tolerance for block padding as a percentage of stripe size.
+     */
+    public WriterOptions paddingTolerance(float value) {
+      paddingTolerance = value;
+      return this;
+    }
+
+    /**
+     * Comma separated values of column names for which bloom filter is to be 
created.
+     */
+    public WriterOptions bloomFilterColumns(String columns) {
+      bloomFilterColumns = columns;
+      return this;
+    }
+
+    /**
+     * Specify the false positive probability for bloom filter.
+     * @param fpp - false positive probability
+     * @return
+     */
+    public WriterOptions bloomFilterFpp(double fpp) {
+      bloomFilterFpp = fpp;
+      return this;
+    }
+
+    /**
+     * Sets the generic compression that is used to compress the data.
+     */
+    public WriterOptions compress(CompressionKind value) {
+      compressValue = value;
+      return this;
+    }
+
+    /**
+     * A required option that sets the object inspector for the rows. Used
+     * to determine the schema for the file.
+     */
+    public WriterOptions inspector(ObjectInspector value) {
+      inspectorValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the version of the file that will be written.
+     */
+    public WriterOptions version(Version value) {
+      versionValue = value;
+      return this;
+    }
+
+    /**
+     * Add a listener for when the stripe and file are about to be closed.
+     * @param callback the object to be called when the stripe is closed
+     * @return
+     */
+    public WriterOptions callback(WriterCallback callback) {
+      this.callback = callback;
+      return this;
+    }
+
+    /**
+     * A package local option to set the memory manager.
+     */
+    WriterOptions memory(MemoryManager value) {
+      memoryManagerValue = value;
+      return this;
+    }
+
+    /**
+     * Tajo-specific
+     */
+    WriterOptions timezone(TimeZone value) {
+      timezone = value;
+      return this;
+    }
+  }
+
+  /**
+   * Create a default set of write options that can be modified.
+   */
+  public static WriterOptions writerOptions(Configuration conf) {
+    return new WriterOptions(conf);
+  }
+
+  /**
+   * Create an ORC file writer. This is the public interface for creating
+   * writers going forward and new options will only be added to this method.
+   * @param path filename to write to
+   * @param opts the options
+   * @return a new ORC file writer
+   * @throws IOException
+   */
+  public static Writer createWriter(Path path,
+                                    WriterOptions opts
+                                    ) throws IOException {
+    FileSystem fs = opts.fileSystemValue == null ?
+      path.getFileSystem(opts.configuration) : opts.fileSystemValue;
+
+    return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+                          opts.stripeSizeValue, opts.compressValue,
+                          opts.bufferSizeValue, opts.rowIndexStrideValue,
+                          opts.memoryManagerValue, opts.blockPaddingValue,
+                          opts.versionValue, opts.callback,
+                          opts.encodingStrategy, opts.compressionStrategy,
+                          opts.paddingTolerance, opts.blockSizeValue,
+                          opts.bloomFilterColumns, opts.bloomFilterFpp,
+                          opts.timezone);
+  }
+
+  /**
+   * Create an ORC file writer. This method is provided for API backward
+   * compatability with Hive 0.11.
+   * @param fs file system
+   * @param path filename to write to
+   * @param inspector the ObjectInspector that inspects the rows
+   * @param stripeSize the number of bytes in a stripe
+   * @param compress how to compress the file
+   * @param bufferSize the number of bytes to compress at once
+   * @param rowIndexStride the number of rows between row index entries or
+   *                       0 to suppress all indexes
+   * @return a new ORC file writer
+   * @throws IOException
+   */
+  public static Writer createWriter(FileSystem fs,
+                                    Path path,
+                                    Configuration conf,
+                                    ObjectInspector inspector,
+                                    long stripeSize,
+                                    CompressionKind compress,
+                                    int bufferSize,
+                                    int rowIndexStride,
+                                    TimeZone timeZone) throws IOException {
+    return createWriter(path,
+                        writerOptions(conf)
+                        .fileSystem(fs)
+                        .inspector(inspector)
+                        .stripeSize(stripeSize)
+                        .compress(compress)
+                        .bufferSize(bufferSize)
+                        .rowIndexStride(rowIndexStride)
+                        .timezone(timeZone));
+  }
+
+  private static ThreadLocal<MemoryManager> memoryManager = null;
+
+  private static synchronized MemoryManager getMemoryManager(final 
Configuration conf) {
+    if (memoryManager == null) {
+      memoryManager = new ThreadLocal<MemoryManager>() {
+        @Override
+        protected MemoryManager initialValue() {
+          return new MemoryManager(conf);
+        }
+      };
+    }
+    return memoryManager.get();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
new file mode 100644
index 0000000..847c10c
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OrcUtils {
+  private static final Log LOG = LogFactory.getLog(OrcUtils.class);
+
+  /**
+   * Returns selected columns as a boolean array with true value set for 
specified column names.
+   * The result will contain number of elements equal to flattened number of 
columns.
+   * For example:
+   * selectedColumns - a,b,c
+   * allColumns - a,b,c,d
+   * If column c is a complex type, say list<string> and other types are 
primitives then result will
+   * be [false, true, true, true, true, true, false]
+   * Index 0 is the root element of the struct which is set to false by 
default, index 1,2
+   * corresponds to columns a and b. Index 3,4 correspond to column c which is 
list<string> and
+   * index 5 correspond to column d. After flattening list<string> gets 2 
columns.
+   *
+   * @param selectedColumns - comma separated list of selected column names
+   * @param allColumns      - comma separated list of all column names
+   * @param inspector       - object inspector
+   * @return - boolean array with true value set for the specified column names
+   */
+  public static boolean[] includeColumns(String selectedColumns, String 
allColumns,
+      ObjectInspector inspector) {
+    int numFlattenedCols = getFlattenedColumnsCount(inspector);
+    boolean[] results = new boolean[numFlattenedCols];
+    if ("*".equals(selectedColumns)) {
+      Arrays.fill(results, true);
+      return results;
+    }
+    if (selectedColumns != null && !selectedColumns.isEmpty()) {
+      includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, 
inspector);
+    }
+    return results;
+  }
+
+  private static void includeColumnsImpl(boolean[] includeColumns, String 
selectedColumns,
+      String allColumns,
+      ObjectInspector inspector) {
+      Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, 
inspector);
+      LOG.info("columnSpanMap: " + columnSpanMap);
+
+      String[] selCols = selectedColumns.split(",");
+      for (String sc : selCols) {
+        if (columnSpanMap.containsKey(sc)) {
+          List<Integer> colSpan = columnSpanMap.get(sc);
+          int start = colSpan.get(0);
+          int end = colSpan.get(1);
+          for (int i = start; i <= end; i++) {
+            includeColumns[i] = true;
+          }
+        }
+      }
+
+      LOG.info("includeColumns: " + Arrays.toString(includeColumns));
+    }
+
+  private static Map<String, List<Integer>> getColumnSpan(String allColumns,
+      ObjectInspector inspector) {
+    // map that contains the column span for each column. Column span is the 
number of columns
+    // required after flattening. For a given object inspector this map 
contains the start column
+    // id and end column id (both inclusive) after flattening.
+    // EXAMPLE:
+    // schema: struct<a:int, b:float, c:map<string,int>>
+    // column span map for the above struct will be
+    // a => [1,1], b => [2,2], c => [3,5]
+    Map<String, List<Integer>> columnSpanMap = new HashMap<String, 
List<Integer>>();
+    if (allColumns != null) {
+      String[] columns = allColumns.split(",");
+      int startIdx = 0;
+      int endIdx = 0;
+      if (inspector instanceof StructObjectInspector) {
+        StructObjectInspector soi = (StructObjectInspector) inspector;
+        List<? extends StructField> fields = soi.getAllStructFieldRefs();
+        for (int i = 0; i < fields.size(); i++) {
+          StructField sf = fields.get(i);
+
+          // we get the type (category) from object inspector but column name 
from the argument.
+          // The reason for this is hive (FileSinkOperator) does not pass the 
actual column names,
+          // instead it passes the internal column names (_col1,_col2).
+          ObjectInspector sfOI = sf.getFieldObjectInspector();
+          String colName = columns[i];
+
+          startIdx = endIdx + 1;
+          switch (sfOI.getCategory()) {
+            case PRIMITIVE:
+              endIdx += 1;
+              break;
+            case STRUCT:
+              endIdx += 1;
+              StructObjectInspector structInsp = (StructObjectInspector) sfOI;
+              List<? extends StructField> structFields = 
structInsp.getAllStructFieldRefs();
+              for (int j = 0; j < structFields.size(); ++j) {
+                endIdx += 
getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector());
+              }
+              break;
+            case MAP:
+              endIdx += 1;
+              MapObjectInspector mapInsp = (MapObjectInspector) sfOI;
+              endIdx += 
getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
+              endIdx += 
getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
+              break;
+            case LIST:
+              endIdx += 1;
+              ListObjectInspector listInsp = (ListObjectInspector) sfOI;
+              endIdx += 
getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
+              break;
+            case UNION:
+              endIdx += 1;
+              UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI;
+              List<ObjectInspector> choices = unionInsp.getObjectInspectors();
+              for (int j = 0; j < choices.size(); ++j) {
+                endIdx += getFlattenedColumnsCount(choices.get(j));
+              }
+              break;
+            default:
+              throw new IllegalArgumentException("Bad category: " +
+                  inspector.getCategory());
+          }
+
+          columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx));
+        }
+      }
+    }
+    return columnSpanMap;
+  }
+
+  /**
+   * Returns the number of columns after flatting complex types.
+   *
+   * @param inspector - object inspector
+   * @return
+   */
+  public static int getFlattenedColumnsCount(ObjectInspector inspector) {
+    int numWriters = 0;
+    switch (inspector.getCategory()) {
+      case PRIMITIVE:
+        numWriters += 1;
+        break;
+      case STRUCT:
+        numWriters += 1;
+        StructObjectInspector structInsp = (StructObjectInspector) inspector;
+        List<? extends StructField> fields = 
structInsp.getAllStructFieldRefs();
+        for (int i = 0; i < fields.size(); ++i) {
+          numWriters += 
getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector());
+        }
+        break;
+      case MAP:
+        numWriters += 1;
+        MapObjectInspector mapInsp = (MapObjectInspector) inspector;
+        numWriters += 
getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
+        numWriters += 
getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
+        break;
+      case LIST:
+        numWriters += 1;
+        ListObjectInspector listInsp = (ListObjectInspector) inspector;
+        numWriters += 
getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
+        break;
+      case UNION:
+        numWriters += 1;
+        UnionObjectInspector unionInsp = (UnionObjectInspector) inspector;
+        List<ObjectInspector> choices = unionInsp.getObjectInspectors();
+        for (int i = 0; i < choices.size(); ++i) {
+          numWriters += getFlattenedColumnsCount(choices.get(i));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Bad category: " +
+            inspector.getCategory());
+    }
+    return numWriters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
new file mode 100644
index 0000000..f6cfd57
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class OutStream extends PositionedOutputStream {
+
+  interface OutputReceiver {
+    /**
+     * Output the given buffer to the final destination
+     * @param buffer the buffer to output
+     * @throws IOException
+     */
+    void output(ByteBuffer buffer) throws IOException;
+  }
+
+  static final int HEADER_SIZE = 3;
+  private final String name;
+  private final OutputReceiver receiver;
+  // if enabled the stream will be suppressed when writing stripe
+  private boolean suppress;
+
+  /**
+   * Stores the uncompressed bytes that have been serialized, but not
+   * compressed yet. When this fills, we compress the entire buffer.
+   */
+  private ByteBuffer current = null;
+
+  /**
+   * Stores the compressed bytes until we have a full buffer and then outputs
+   * them to the receiver. If no compression is being done, this (and overflow)
+   * will always be null and the current buffer will be sent directly to the
+   * receiver.
+   */
+  private ByteBuffer compressed = null;
+
+  /**
+   * Since the compressed buffer may start with contents from previous
+   * compression blocks, we allocate an overflow buffer so that the
+   * output of the codec can be split between the two buffers. After the
+   * compressed buffer is sent to the receiver, the overflow buffer becomes
+   * the new compressed buffer.
+   */
+  private ByteBuffer overflow = null;
+  private final int bufferSize;
+  private final CompressionCodec codec;
+  private long compressedBytes = 0;
+  private long uncompressedBytes = 0;
+
+  OutStream(String name,
+            int bufferSize,
+            CompressionCodec codec,
+            OutputReceiver receiver) throws IOException {
+    this.name = name;
+    this.bufferSize = bufferSize;
+    this.codec = codec;
+    this.receiver = receiver;
+    this.suppress = false;
+  }
+
+  public void clear() throws IOException {
+    flush();
+    suppress = false;
+  }
+
+  /**
+   * Write the length of the compressed bytes. Life is much easier if the
+   * header is constant length, so just use 3 bytes. Considering most of the
+   * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+   * be plenty. We also use the low bit for whether it is the original or
+   * compressed bytes.
+   * @param buffer the buffer to write the header to
+   * @param position the position in the buffer to write at
+   * @param val the size in the file
+   * @param original is it uncompressed
+   */
+  private static void writeHeader(ByteBuffer buffer,
+                                  int position,
+                                  int val,
+                                  boolean original) {
+    buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+    buffer.put(position + 1, (byte) (val >> 7));
+    buffer.put(position + 2, (byte) (val >> 15));
+  }
+
+  private void getNewInputBuffer() throws IOException {
+    if (codec == null) {
+      current = ByteBuffer.allocate(bufferSize);
+    } else {
+      current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+      writeHeader(current, 0, bufferSize, true);
+      current.position(HEADER_SIZE);
+    }
+  }
+
+  /**
+   * Allocate a new output buffer if we are compressing.
+   */
+  private ByteBuffer getNewOutputBuffer() throws IOException {
+    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+  }
+
+  private void flip() throws IOException {
+    current.limit(current.position());
+    current.position(codec == null ? 0 : HEADER_SIZE);
+  }
+
+  @Override
+  public void write(int i) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    if (current.remaining() < 1) {
+      spill();
+    }
+    uncompressedBytes += 1;
+    current.put((byte) i);
+  }
+
+  @Override
+  public void write(byte[] bytes, int offset, int length) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    int remaining = Math.min(current.remaining(), length);
+    current.put(bytes, offset, remaining);
+    uncompressedBytes += remaining;
+    length -= remaining;
+    while (length != 0) {
+      spill();
+      offset += remaining;
+      remaining = Math.min(current.remaining(), length);
+      current.put(bytes, offset, remaining);
+      uncompressedBytes += remaining;
+      length -= remaining;
+    }
+  }
+
+  private void spill() throws IOException {
+    // if there isn't anything in the current buffer, don't spill
+    if (current == null ||
+        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+      return;
+    }
+    flip();
+    if (codec == null) {
+      receiver.output(current);
+      getNewInputBuffer();
+    } else {
+      if (compressed == null) {
+        compressed = getNewOutputBuffer();
+      } else if (overflow == null) {
+        overflow = getNewOutputBuffer();
+      }
+      int sizePosn = compressed.position();
+      compressed.position(compressed.position() + HEADER_SIZE);
+      if (codec.compress(current, compressed, overflow)) {
+        uncompressedBytes = 0;
+        // move position back to after the header
+        current.position(HEADER_SIZE);
+        current.limit(current.capacity());
+        // find the total bytes in the chunk
+        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+        if (overflow != null) {
+          totalBytes += overflow.position();
+        }
+        compressedBytes += totalBytes + HEADER_SIZE;
+        writeHeader(compressed, sizePosn, totalBytes, false);
+        // if we have less than the next header left, spill it.
+        if (compressed.remaining() < HEADER_SIZE) {
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = overflow;
+          overflow = null;
+        }
+      } else {
+        compressedBytes += uncompressedBytes + HEADER_SIZE;
+        uncompressedBytes = 0;
+        // we are using the original, but need to spill the current
+        // compressed buffer first. So back up to where we started,
+        // flip it and add it to done.
+        if (sizePosn != 0) {
+          compressed.position(sizePosn);
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = null;
+          // if we have an overflow, clear it and make it the new compress
+          // buffer
+          if (overflow != null) {
+            overflow.clear();
+            compressed = overflow;
+            overflow = null;
+          }
+        } else {
+          compressed.clear();
+          if (overflow != null) {
+            overflow.clear();
+          }
+        }
+
+        // now add the current buffer into the done list and get a new one.
+        current.position(0);
+        // update the header with the current length
+        writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+        receiver.output(current);
+        getNewInputBuffer();
+      }
+    }
+  }
+
+  void getPosition(PositionRecorder recorder) throws IOException {
+    if (codec == null) {
+      recorder.addPosition(uncompressedBytes);
+    } else {
+      recorder.addPosition(compressedBytes);
+      recorder.addPosition(uncompressedBytes);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    spill();
+    if (compressed != null && compressed.position() != 0) {
+      compressed.flip();
+      receiver.output(compressed);
+      compressed = null;
+    }
+    uncompressedBytes = 0;
+    compressedBytes = 0;
+    overflow = null;
+    current = null;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public long getBufferSize() {
+    long result = 0;
+    if (current != null) {
+      result += current.capacity();
+    }
+    if (compressed != null) {
+      result += compressed.capacity();
+    }
+    if (overflow != null) {
+      result += overflow.capacity();
+    }
+    return result;
+  }
+
+  /**
+   * Set suppress flag
+   */
+  public void suppress() {
+    suppress = true;
+  }
+
+  /**
+   * Returns the state of suppress flag
+   * @return value of suppress flag
+   */
+  public boolean isSuppressed() {
+    return suppress;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
new file mode 100644
index 0000000..a39926e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+interface PositionRecorder {
+  void addPosition(long offset);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
new file mode 100644
index 0000000..748c98c
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+abstract class PositionedOutputStream extends OutputStream {
+
+  /**
+   * Record the current position to the recorder.
+   * @param recorder the object that receives the position
+   * @throws IOException
+   */
+  abstract void getPosition(PositionRecorder recorder) throws IOException;
+
+  /**
+   * Get the memory size currently allocated as buffer associated with this
+   * stream.
+   * @return the number of bytes used by buffers.
+   */
+  abstract long getBufferSize();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
new file mode 100644
index 0000000..2482f93
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * A memory efficient red-black tree that does not allocate any objects per
+ * an element. This class is abstract and assumes that the child class
+ * handles the key and comparisons with the key.
+ */
+abstract class RedBlackTree {
+  public static final int NULL = -1;
+
+  // Various values controlling the offset of the data within the array.
+  private static final int LEFT_OFFSET = 0;
+  private static final int RIGHT_OFFSET = 1;
+  private static final int ELEMENT_SIZE = 2;
+
+  protected int size = 0;
+  private final DynamicIntArray data;
+  protected int root = NULL;
+  protected int lastAdd = 0;
+  private boolean wasAdd = false;
+
+  /**
+   * Create a set with the given initial capacity.
+   */
+  public RedBlackTree(int initialCapacity) {
+    data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
+  }
+
+  /**
+   * Insert a new node into the data array, growing the array as necessary.
+   *
+   * @return Returns the position of the new node.
+   */
+  private int insert(int left, int right, boolean isRed) {
+    int position = size;
+    size += 1;
+    setLeft(position, left, isRed);
+    setRight(position, right);
+    return position;
+  }
+
+  /**
+   * Compare the value at the given position to the new value.
+   * @return 0 if the values are the same, -1 if the new value is smaller and
+   *         1 if the new value is larger.
+   */
+  protected abstract int compareValue(int position);
+
+  /**
+   * Is the given node red as opposed to black? To prevent having an extra word
+   * in the data array, we just the low bit on the left child index.
+   */
+  protected boolean isRed(int position) {
+    return position != NULL &&
+        (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
+  }
+
+  /**
+   * Set the red bit true or false.
+   */
+  private void setRed(int position, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    if (isRed) {
+      data.set(offset, data.get(offset) | 1);
+    } else {
+      data.set(offset, data.get(offset) & ~1);
+    }
+  }
+
+  /**
+   * Get the left field of the given position.
+   */
+  protected int getLeft(int position) {
+    return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
+  }
+
+  /**
+   * Get the right field of the given position.
+   */
+  protected int getRight(int position) {
+    return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left 
pointer.
+   */
+  private void setLeft(int position, int left) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (data.get(offset) & 1));
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left 
pointer.
+   */
+  private void setLeft(int position, int left, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (isRed ? 1 : 0));
+  }
+
+  /**
+   * Set the right field of the given position.
+   */
+  private void setRight(int position, int right) {
+    data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
+  }
+
+  /**
+   * Insert or find a given key in the tree and rebalance the tree correctly.
+   * Rebalancing restores the red-black aspect of the tree to maintain the
+   * invariants:
+   *   1. If a node is red, both of its children are black.
+   *   2. Each child of a node has the same black height (the number of black
+   *      nodes between it and the leaves of the tree).
+   *
+   * Inserted nodes are at the leaves and are red, therefore there is at most a
+   * violation of rule 1 at the node we just put in. Instead of always keeping
+   * the parents, this routine passing down the context.
+   *
+   * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
+   * left-right mirror images of each other). See Algorighms by Cormen,
+   * Leiserson, and Rivest for the explaination of the subcases.
+   *
+   * @param node The node that we are fixing right now.
+   * @param fromLeft Did we come down from the left?
+   * @param parent Nodes' parent
+   * @param grandparent Parent's parent
+   * @param greatGrandparent Grandparent's parent
+   * @return Does parent also need to be checked and/or fixed?
+   */
+  private boolean add(int node, boolean fromLeft, int parent,
+                      int grandparent, int greatGrandparent) {
+    if (node == NULL) {
+      if (root == NULL) {
+        lastAdd = insert(NULL, NULL, false);
+        root = lastAdd;
+        wasAdd = true;
+        return false;
+      } else {
+        lastAdd = insert(NULL, NULL, true);
+        node = lastAdd;
+        wasAdd = true;
+        // connect the new node into the tree
+        if (fromLeft) {
+          setLeft(parent, node);
+        } else {
+          setRight(parent, node);
+        }
+      }
+    } else {
+      int compare = compareValue(node);
+      boolean keepGoing;
+
+      // Recurse down to find where the node needs to be added
+      if (compare < 0) {
+        keepGoing = add(getLeft(node), true, node, parent, grandparent);
+      } else if (compare > 0) {
+        keepGoing = add(getRight(node), false, node, parent, grandparent);
+      } else {
+        lastAdd = node;
+        wasAdd = false;
+        return false;
+      }
+
+      // we don't need to fix the root (because it is always set to black)
+      if (node == root || !keepGoing) {
+        return false;
+      }
+    }
+
+
+    // Do we need to fix this node? Only if there are two reds right under each
+    // other.
+    if (isRed(node) && isRed(parent)) {
+      if (parent == getLeft(grandparent)) {
+        int uncle = getRight(grandparent);
+        if (isRed(uncle)) {
+          // case 1.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getRight(parent)) {
+            // case 1.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // left-rotate on node
+            setLeft(grandparent, parent);
+            setRight(node, getLeft(parent));
+            setLeft(parent, node);
+          }
+
+          // case 1.2 and 1.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+
+          // right-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getLeft(greatGrandparent) == grandparent) {
+            setLeft(greatGrandparent, parent);
+          } else {
+            setRight(greatGrandparent, parent);
+          }
+          setLeft(grandparent, getRight(parent));
+          setRight(parent, grandparent);
+          return false;
+        }
+      } else {
+        int uncle = getLeft(grandparent);
+        if (isRed(uncle)) {
+          // case 2.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getLeft(parent)) {
+            // case 2.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // right-rotate on node
+            setRight(grandparent, parent);
+            setLeft(node, getRight(parent));
+            setRight(parent, node);
+          }
+          // case 2.2 and 2.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+          // left-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getRight(greatGrandparent) == grandparent) {
+            setRight(greatGrandparent, parent);
+          } else {
+            setLeft(greatGrandparent, parent);
+          }
+          setRight(grandparent, getLeft(parent));
+          setLeft(parent, grandparent);
+          return false;
+        }
+      }
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Add the new key to the tree.
+   * @return true if the element is a new one.
+   */
+  protected boolean add() {
+    add(root, false, NULL, NULL, NULL);
+    if (wasAdd) {
+      setRed(root, false);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get the number of elements in the set.
+   */
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Reset the table to empty.
+   */
+  public void clear() {
+    root = NULL;
+    size = 0;
+    data.clear();
+  }
+
+  /**
+   * Get the buffer size in bytes.
+   */
+  public long getSizeInBytes() {
+    return data.getSizeInBytes();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
new file mode 100644
index 0000000..0953cdd
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written 
before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+class RunLengthByteWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_LITERAL_SIZE = 128;
+  static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+
+  RunLengthByteWriter(PositionedOutputStream output) {
+    this.output = output;
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write(literals, 0, 1);
+     } else {
+        output.write(-numLiterals);
+        output.write(literals, 0, numLiterals);
+      }
+      repeat = false;
+      tailRunLength = 0;
+      numLiterals = 0;
+    }
+  }
+
+  void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  void write(byte value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0]) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (value == literals[numLiterals - 1]) {
+        tailRunLength += 1;
+      } else {
+        tailRunLength = 1;
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          writeValues();
+          literals[0] = value;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
new file mode 100644
index 0000000..867f041
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of integers. A control byte is 
written before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
+ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
+ * literal vint values follow.
+ */
+class RunLengthIntegerWriter implements IntegerWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_DELTA = 127;
+  static final int MIN_DELTA = -128;
+  static final int MAX_LITERAL_SIZE = 128;
+  private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private final long[] literals = new long[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private long delta = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+  private SerializationUtils utils;
+
+  RunLengthIntegerWriter(PositionedOutputStream output,
+                         boolean signed) {
+    this.output = output;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write((byte) delta);
+        if (signed) {
+          utils.writeVslong(output, literals[0]);
+        } else {
+          utils.writeVulong(output, literals[0]);
+        }
+      } else {
+        output.write(-numLiterals);
+        for(int i=0; i < numLiterals; ++i) {
+          if (signed) {
+            utils.writeVslong(output, literals[i]);
+          } else {
+            utils.writeVulong(output, literals[i]);
+          }
+        }
+      }
+      repeat = false;
+      numLiterals = 0;
+      tailRunLength = 0;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  @Override
+  public void write(long value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0] + delta * numLiterals) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (tailRunLength == 1) {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      } else if (value == literals[numLiterals - 1] + delta) {
+        tailRunLength += 1;
+      } else {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          long base = literals[numLiterals];
+          writeValues();
+          literals[0] = base;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+
+}

Reply via email to