http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoIntObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoIntObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoIntObjectInspector.java
new file mode 100644
index 0000000..a0c2209
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoIntObjectInspector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.tajo.datum.Int4Datum;
+
+public class TajoIntObjectInspector extends TajoPrimitiveObjectInspector 
implements IntObjectInspector {
+  @Override
+  public int get(Object o) {
+    return ((Int4Datum)o).asInt4();
+  }
+
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.intTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.INT;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public Object getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return Integer.class;
+  }
+
+  @Override
+  public Object getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "INT";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoLongObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoLongObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoLongObjectInspector.java
new file mode 100644
index 0000000..b30b333
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoLongObjectInspector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.tajo.datum.Int8Datum;
+
+public class TajoLongObjectInspector extends TajoPrimitiveObjectInspector 
implements LongObjectInspector {
+  @Override
+  public long get(Object o) {
+    return ((Int8Datum)o).asInt8();
+  }
+
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.shortTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.LONG;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public Object getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return Long.class;
+  }
+
+  @Override
+  public Object getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "LONG";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoNullObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoNullObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoNullObjectInspector.java
new file mode 100644
index 0000000..49998ce
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoNullObjectInspector.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+public class TajoNullObjectInspector extends TajoPrimitiveObjectInspector {
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.voidTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.VOID;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public Object getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return Void.class;
+  }
+
+  @Override
+  public Object getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "NULL";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoPrimitiveObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoPrimitiveObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoPrimitiveObjectInspector.java
new file mode 100644
index 0000000..90ac178
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoPrimitiveObjectInspector.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+
+public abstract class TajoPrimitiveObjectInspector implements 
PrimitiveObjectInspector {
+  @Override
+  public Category getCategory() {
+    return Category.PRIMITIVE;
+  }
+
+  @Override
+  public int precision() {
+    return 0;
+  }
+
+  @Override
+  public int scale() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoShortObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoShortObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoShortObjectInspector.java
new file mode 100644
index 0000000..d32bee1
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoShortObjectInspector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.tajo.datum.Int2Datum;
+
+public class TajoShortObjectInspector extends TajoPrimitiveObjectInspector 
implements ShortObjectInspector {
+  @Override
+  public short get(Object o) {
+    return ((Int2Datum)o).asInt2();
+  }
+
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.shortTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.SHORT;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public Object getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return Short.class;
+  }
+
+  @Override
+  public Object getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "SHORT";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStringObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStringObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStringObjectInspector.java
new file mode 100644
index 0000000..b9331da
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStringObjectInspector.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+
+public class TajoStringObjectInspector extends TajoPrimitiveObjectInspector 
implements StringObjectInspector {
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.stringTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.STRING;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public Text getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return null;
+  }
+
+  @Override
+  public String getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "STRING";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
new file mode 100644
index 0000000..a8b4b49
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TajoStructObjectInspector extends StructObjectInspector {
+  private final static Log LOG = 
LogFactory.getLog(TajoStructObjectInspector.class);
+  private List<TajoStructField> structFields;
+
+  static class TajoStructField implements StructField {
+    private String name;
+    private ObjectInspector oi;
+    private String comment;
+
+    TajoStructField(String name, ObjectInspector oi) {
+      this(name, oi, null);
+    }
+
+    TajoStructField(String name, ObjectInspector oi, String comment) {
+      this.name = name;
+      this.oi = oi;
+      this.comment = comment;
+    }
+
+    @Override
+    public String getFieldName() {
+      return name;
+    }
+
+    @Override
+    public ObjectInspector getFieldObjectInspector() {
+      return oi;
+    }
+
+    @Override
+    public String getFieldComment() {
+      return comment;
+    }
+  }
+
+  TajoStructObjectInspector(Schema schema) {
+    structFields = new ArrayList<TajoStructField>(schema.size());
+
+    for (Column c: schema.getRootColumns()) {
+      try {
+        TajoStructField field = new TajoStructField(c.getSimpleName(),
+          
ObjectInspectorFactory.buildObjectInspectorByType(c.getDataType().getType()));
+        structFields.add(field);
+      } catch (UnsupportedException e) {
+        LOG.error(e.getMessage());
+      }
+    }
+  }
+
+  @Override
+  public List<? extends StructField> getAllStructFieldRefs() {
+    return structFields;
+  }
+
+  @Override
+  public StructField getStructFieldRef(String s) {
+    for (TajoStructField field:structFields) {
+      if (field.getFieldName().equals(s)) {
+        return field;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public Object getStructFieldData(Object o, StructField structField) {
+    return null;
+  }
+
+  @Override
+  public List<Object> getStructFieldsDataAsList(Object o) {
+    return null;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "STRUCT";
+  }
+
+  @Override
+  public Category getCategory() {
+    return Category.STRUCT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoTimestampObjectInspector.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoTimestampObjectInspector.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoTimestampObjectInspector.java
new file mode 100644
index 0000000..bb887e7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoTimestampObjectInspector.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc.objectinspector;
+
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.sql.Timestamp;
+
+public class TajoTimestampObjectInspector extends TajoPrimitiveObjectInspector 
implements TimestampObjectInspector {
+  @Override
+  public PrimitiveTypeInfo getTypeInfo() {
+    return TypeInfoFactory.timestampTypeInfo;
+  }
+
+  @Override
+  public PrimitiveCategory getPrimitiveCategory() {
+    return PrimitiveCategory.TIMESTAMP;
+  }
+
+  @Override
+  public Class<?> getPrimitiveWritableClass() {
+    return null;
+  }
+
+  @Override
+  public TimestampWritable getPrimitiveWritableObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Class<?> getJavaPrimitiveClass() {
+    return null;
+  }
+
+  @Override
+  public Timestamp getPrimitiveJavaObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public Object copyObject(Object o) {
+    return null;
+  }
+
+  @Override
+  public boolean preferWritable() {
+    return false;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "TIMESTAMP";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/package-info.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/package-info.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/package-info.java
new file mode 100644
index 0000000..a987bb9
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/package-info.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * Provides read and write support for ORC files. Tajo schemas are
+ * converted to ORC struct type
+ * </p>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Tajo type</th>
+ *     <th>ORC type</th>
+ *   </tr>
+ *   <tr>
+ *     <td>NULL_TYPE</td>
+ *     <td>BOOLEAN, but all fields are marked as null</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BOOLEAN</td>
+ *     <td>BOOLEAN</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BYTE</td>
+ *     <td>BYTE</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INT2</td>
+ *     <td>SHORT</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INT4</td>
+ *     <td>INTEGER</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INT8</td>
+ *     <td>LONG</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FLOAT4</td>
+ *     <td>FLOAT</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FLOAT8</td>
+ *     <td>DOUBLE</td>
+ *   </tr>
+ *   <tr>
+ *     <td>CHAR/TEXT</td>
+ *     <td>STRING</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BLOB/PROTOBUF</td>
+ *     <td>BINARY</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INET4</td>
+ *     <td>INTEGER</td>
+ *   </tr>
+ *   <tr>
+ *     <td>TIMESTAMP</td>
+ *     <td>TIMESTAMP</td>
+ *   </tr>
+ *   <tr>
+ *     <td>DATE</td>
+ *     <td>DATE</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>
+ * Because Tajo fields can be NULL, all ORC fields are marked as optional.
+ * </p>
+ *
+ * <p>
+ * The conversion from Tajo to ORC is lossy without the original Tajo
+ * schema. As a result, ORC files are read using the Tajo schema saved in
+ * the Tajo catalog for the table the ORC files belong to, which was
+ * defined when the table was created.
+ * </p>
+ */
+
+package org.apache.tajo.storage.orc;

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 086ad77..d9d2639 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -244,7 +244,7 @@ public class TextFieldSerializerDeserializer implements 
FieldSerializerDeseriali
             byte[] bytes = new byte[buf.readableBytes()];
             buf.readBytes(bytes);
             protobufJsonFormat.merge(bytes, builder);
-            datum = factory.createDatum(builder.build());
+            datum = ProtobufDatumFactory.createDatum(builder.build());
           } catch (IOException e) {
             e.printStackTrace();
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BinaryColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BinaryColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BinaryColumnStatistics.java
new file mode 100644
index 0000000..bee29fb
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BinaryColumnStatistics.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics for binary columns.
+ */
+public interface BinaryColumnStatistics extends ColumnStatistics {
+  long getSum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BitFieldWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BitFieldWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BitFieldWriter.java
new file mode 100644
index 0000000..23719bd
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BitFieldWriter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+
+class BitFieldWriter {
+  private RunLengthByteWriter output;
+  private final int bitSize;
+  private byte current = 0;
+  private int bitsLeft = 8;
+
+  BitFieldWriter(PositionedOutputStream output,
+                 int bitSize) throws IOException {
+    this.output = new RunLengthByteWriter(output);
+    this.bitSize = bitSize;
+  }
+
+  private void writeByte() throws IOException {
+    output.write(current);
+    current = 0;
+    bitsLeft = 8;
+  }
+
+  void flush() throws IOException {
+    if (bitsLeft != 8) {
+      writeByte();
+    }
+    output.flush();
+  }
+
+  void write(int value) throws IOException {
+    int bitsToWrite = bitSize;
+    while (bitsToWrite > bitsLeft) {
+      // add the bits to the bottom of the current word
+      current |= value >>> (bitsToWrite - bitsLeft);
+      // subtract out the bits we just added
+      bitsToWrite -= bitsLeft;
+      // zero out the bits above bitsToWrite
+      value &= (1 << bitsToWrite) - 1;
+      writeByte();
+    }
+    bitsLeft -= bitsToWrite;
+    current |= value << bitsLeft;
+    if (bitsLeft == 0) {
+      writeByte();
+    }
+  }
+
+  void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(8 - bitsLeft);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BloomFilterIO.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BloomFilterIO.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BloomFilterIO.java
new file mode 100644
index 0000000..9d7c09c
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BloomFilterIO.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.google.common.primitives.Longs;
+import org.apache.tajo.storage.thirdparty.orc.util.BloomFilter;
+
+public class BloomFilterIO extends BloomFilter {
+
+  public BloomFilterIO(long expectedEntries) {
+    super(expectedEntries, DEFAULT_FPP);
+  }
+
+  public BloomFilterIO(long expectedEntries, double fpp) {
+    super(expectedEntries, fpp);
+  }
+
+/**
+ * Initializes the BloomFilter from the given Orc BloomFilter
+ */
+  public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
+    this.bitSet = new BitSet(Longs.toArray(bloomFilter.getBitsetList()));
+    this.numHashFunctions = bloomFilter.getNumHashFunctions();
+    this.numBits = (int) this.bitSet.bitSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BooleanColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BooleanColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BooleanColumnStatistics.java
new file mode 100644
index 0000000..0f55697
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/BooleanColumnStatistics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics for boolean columns.
+ */
+public interface BooleanColumnStatistics extends ColumnStatistics {
+  long getFalseCount();
+
+  long getTrueCount();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatistics.java
new file mode 100644
index 0000000..b317e41
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatistics.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics that are available for all types of columns.
+ */
+public interface ColumnStatistics {
+  /**
+   * Get the number of values in this column. It will differ from the number
+   * of rows because of NULL values and repeated values.
+   * @return the number of values
+   */
+  long getNumberOfValues();
+
+  /**
+   * Returns true if there are nulls in the scope of column statistics.
+   * @return true if null present else false
+   */
+  boolean hasNull();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatisticsImpl.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatisticsImpl.java
new file mode 100644
index 0000000..d74f989
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ColumnStatisticsImpl.java
@@ -0,0 +1,1017 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.tajo.datum.Datum;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+
+class ColumnStatisticsImpl implements ColumnStatistics {
+
+  private static final class BooleanStatisticsImpl extends ColumnStatisticsImpl
+      implements BooleanColumnStatistics {
+    private long trueCount = 0;
+
+    BooleanStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.BucketStatistics bkt = stats.getBucketStatistics();
+      trueCount = bkt.getCount(0);
+    }
+
+    BooleanStatisticsImpl() {
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      trueCount = 0;
+    }
+
+    @Override
+    void updateBoolean(boolean value) {
+      if (value) {
+        trueCount += 1;
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof BooleanStatisticsImpl) {
+        BooleanStatisticsImpl bkt = (BooleanStatisticsImpl) other;
+        trueCount += bkt.trueCount;
+      } else {
+        if (isStatsExists() && trueCount != 0) {
+          throw new IllegalArgumentException("Incompatible merging of boolean 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder builder = super.serialize();
+      OrcProto.BucketStatistics.Builder bucket =
+        OrcProto.BucketStatistics.newBuilder();
+      bucket.addCount(trueCount);
+      builder.setBucketStatistics(bucket);
+      return builder;
+    }
+
+    @Override
+    public long getFalseCount() {
+      return getNumberOfValues() - trueCount;
+    }
+
+    @Override
+    public long getTrueCount() {
+      return trueCount;
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + " true: " + trueCount;
+    }
+  }
+
+  private static final class IntegerStatisticsImpl extends ColumnStatisticsImpl
+      implements IntegerColumnStatistics {
+
+    private long minimum = Long.MAX_VALUE;
+    private long maximum = Long.MIN_VALUE;
+    private long sum = 0;
+    private boolean hasMinimum = false;
+    private boolean overflow = false;
+
+    IntegerStatisticsImpl() {
+    }
+
+    IntegerStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.IntegerStatistics intStat = stats.getIntStatistics();
+      if (intStat.hasMinimum()) {
+        hasMinimum = true;
+        minimum = intStat.getMinimum();
+      }
+      if (intStat.hasMaximum()) {
+        maximum = intStat.getMaximum();
+      }
+      if (intStat.hasSum()) {
+        sum = intStat.getSum();
+      } else {
+        overflow = true;
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      hasMinimum = false;
+      minimum = Long.MAX_VALUE;
+      maximum = Long.MIN_VALUE;
+      sum = 0;
+      overflow = false;
+    }
+
+    @Override
+    void updateInteger(long value) {
+      if (!hasMinimum) {
+        hasMinimum = true;
+        minimum = value;
+        maximum = value;
+      } else if (value < minimum) {
+        minimum = value;
+      } else if (value > maximum) {
+        maximum = value;
+      }
+      if (!overflow) {
+        boolean wasPositive = sum >= 0;
+        sum += value;
+        if ((value >= 0) == wasPositive) {
+          overflow = (sum >= 0) != wasPositive;
+        }
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof IntegerStatisticsImpl) {
+        IntegerStatisticsImpl otherInt = (IntegerStatisticsImpl) other;
+        if (!hasMinimum) {
+          hasMinimum = otherInt.hasMinimum;
+          minimum = otherInt.minimum;
+          maximum = otherInt.maximum;
+        } else if (otherInt.hasMinimum) {
+          if (otherInt.minimum < minimum) {
+            minimum = otherInt.minimum;
+          }
+          if (otherInt.maximum > maximum) {
+            maximum = otherInt.maximum;
+          }
+        }
+
+        overflow |= otherInt.overflow;
+        if (!overflow) {
+          boolean wasPositive = sum >= 0;
+          sum += otherInt.sum;
+          if ((otherInt.sum >= 0) == wasPositive) {
+            overflow = (sum >= 0) != wasPositive;
+          }
+        }
+      } else {
+        if (isStatsExists() && hasMinimum) {
+          throw new IllegalArgumentException("Incompatible merging of integer 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder builder = super.serialize();
+      OrcProto.IntegerStatistics.Builder intb =
+        OrcProto.IntegerStatistics.newBuilder();
+      if (hasMinimum) {
+        intb.setMinimum(minimum);
+        intb.setMaximum(maximum);
+      }
+      if (!overflow) {
+        intb.setSum(sum);
+      }
+      builder.setIntStatistics(intb);
+      return builder;
+    }
+
+    @Override
+    public long getMinimum() {
+      return minimum;
+    }
+
+    @Override
+    public long getMaximum() {
+      return maximum;
+    }
+
+    @Override
+    public boolean isSumDefined() {
+      return !overflow;
+    }
+
+    @Override
+    public long getSum() {
+      return sum;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (hasMinimum) {
+        buf.append(" min: ");
+        buf.append(minimum);
+        buf.append(" max: ");
+        buf.append(maximum);
+      }
+      if (!overflow) {
+        buf.append(" sum: ");
+        buf.append(sum);
+      }
+      return buf.toString();
+    }
+  }
+
+  private static final class DoubleStatisticsImpl extends ColumnStatisticsImpl
+       implements DoubleColumnStatistics {
+    private boolean hasMinimum = false;
+    private double minimum = Double.MAX_VALUE;
+    private double maximum = Double.MIN_VALUE;
+    private double sum = 0;
+
+    DoubleStatisticsImpl() {
+    }
+
+    DoubleStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.DoubleStatistics dbl = stats.getDoubleStatistics();
+      if (dbl.hasMinimum()) {
+        hasMinimum = true;
+        minimum = dbl.getMinimum();
+      }
+      if (dbl.hasMaximum()) {
+        maximum = dbl.getMaximum();
+      }
+      if (dbl.hasSum()) {
+        sum = dbl.getSum();
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      hasMinimum = false;
+      minimum = Double.MAX_VALUE;
+      maximum = Double.MIN_VALUE;
+      sum = 0;
+    }
+
+    @Override
+    void updateDouble(double value) {
+      if (!hasMinimum) {
+        hasMinimum = true;
+        minimum = value;
+        maximum = value;
+      } else if (value < minimum) {
+        minimum = value;
+      } else if (value > maximum) {
+        maximum = value;
+      }
+      sum += value;
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof DoubleStatisticsImpl) {
+        DoubleStatisticsImpl dbl = (DoubleStatisticsImpl) other;
+        if (!hasMinimum) {
+          hasMinimum = dbl.hasMinimum;
+          minimum = dbl.minimum;
+          maximum = dbl.maximum;
+        } else if (dbl.hasMinimum) {
+          if (dbl.minimum < minimum) {
+            minimum = dbl.minimum;
+          }
+          if (dbl.maximum > maximum) {
+            maximum = dbl.maximum;
+          }
+        }
+        sum += dbl.sum;
+      } else {
+        if (isStatsExists() && hasMinimum) {
+          throw new IllegalArgumentException("Incompatible merging of double 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder builder = super.serialize();
+      OrcProto.DoubleStatistics.Builder dbl =
+        OrcProto.DoubleStatistics.newBuilder();
+      if (hasMinimum) {
+        dbl.setMinimum(minimum);
+        dbl.setMaximum(maximum);
+      }
+      dbl.setSum(sum);
+      builder.setDoubleStatistics(dbl);
+      return builder;
+    }
+
+    @Override
+    public double getMinimum() {
+      return minimum;
+    }
+
+    @Override
+    public double getMaximum() {
+      return maximum;
+    }
+
+    @Override
+    public double getSum() {
+      return sum;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (hasMinimum) {
+        buf.append(" min: ");
+        buf.append(minimum);
+        buf.append(" max: ");
+        buf.append(maximum);
+      }
+      buf.append(" sum: ");
+      buf.append(sum);
+      return buf.toString();
+    }
+  }
+
+  protected static final class StringStatisticsImpl extends 
ColumnStatisticsImpl
+      implements StringColumnStatistics {
+    private String minimum = null;
+    private String maximum = null;
+    private long sum = 0;
+
+    StringStatisticsImpl() {
+    }
+
+    StringStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.StringStatistics str = stats.getStringStatistics();
+      if (str.hasMaximum()) {
+        maximum = str.getMaximum();
+      }
+      if (str.hasMinimum()) {
+        minimum = str.getMinimum();
+      }
+      if(str.hasSum()) {
+        sum = str.getSum();
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      minimum = null;
+      maximum = null;
+      sum = 0;
+    }
+
+    @Override
+    void updateString(String value) {
+      if (minimum == null) {
+        maximum = minimum = value;
+      } else if (minimum.compareTo(value) > 0) {
+        minimum = value;
+      } else if (maximum.compareTo(value) < 0) {
+        maximum = value;
+      }
+      sum += value.length();
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof StringStatisticsImpl) {
+        StringStatisticsImpl str = (StringStatisticsImpl) other;
+        if (minimum == null) {
+          if (str.minimum != null) {
+            maximum = str.getMaximum();
+            minimum = str.getMinimum();
+          } else {
+          /* both are empty */
+            maximum = minimum = null;
+          }
+        } else if (str.minimum != null) {
+          if (minimum.compareTo(str.minimum) > 0) {
+            minimum = str.getMinimum();
+          }
+          if (maximum.compareTo(str.maximum) < 0) {
+            maximum = str.getMaximum();
+          }
+        }
+        sum += str.sum;
+      } else {
+        if (isStatsExists() && minimum != null) {
+          throw new IllegalArgumentException("Incompatible merging of string 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.StringStatistics.Builder str =
+        OrcProto.StringStatistics.newBuilder();
+      if (getNumberOfValues() != 0) {
+        str.setMinimum(getMinimum());
+        str.setMaximum(getMaximum());
+        str.setSum(sum);
+      }
+      result.setStringStatistics(str);
+      return result;
+    }
+
+    @Override
+    public String getMinimum() {
+      return minimum;
+    }
+
+    @Override
+    public String getMaximum() {
+      return maximum;
+    }
+
+    @Override
+    public long getSum() {
+      return sum;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" min: ");
+        buf.append(getMinimum());
+        buf.append(" max: ");
+        buf.append(getMaximum());
+        buf.append(" sum: ");
+        buf.append(sum);
+      }
+      return buf.toString();
+    }
+  }
+
+  protected static final class BinaryStatisticsImpl extends 
ColumnStatisticsImpl implements
+    BinaryColumnStatistics {
+
+    private long sum = 0;
+
+    BinaryStatisticsImpl() {
+    }
+
+    BinaryStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.BinaryStatistics binStats = stats.getBinaryStatistics();
+      if (binStats.hasSum()) {
+        sum = binStats.getSum();
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      sum = 0;
+    }
+
+    @Override
+    void updateBinary(Datum value) {
+      sum += value.size();
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof BinaryColumnStatistics) {
+        BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other;
+        sum += bin.sum;
+      } else {
+        if (isStatsExists() && sum != 0) {
+          throw new IllegalArgumentException("Incompatible merging of binary 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    public long getSum() {
+      return sum;
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.BinaryStatistics.Builder bin = 
OrcProto.BinaryStatistics.newBuilder();
+      bin.setSum(sum);
+      result.setBinaryStatistics(bin);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" sum: ");
+        buf.append(sum);
+      }
+      return buf.toString();
+    }
+  }
+
+  private static final class DecimalStatisticsImpl extends ColumnStatisticsImpl
+      implements DecimalColumnStatistics {
+    private HiveDecimal minimum = null;
+    private HiveDecimal maximum = null;
+    private HiveDecimal sum = HiveDecimal.ZERO;
+
+    DecimalStatisticsImpl() {
+    }
+
+    DecimalStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.DecimalStatistics dec = stats.getDecimalStatistics();
+      if (dec.hasMaximum()) {
+        maximum = HiveDecimal.create(dec.getMaximum());
+      }
+      if (dec.hasMinimum()) {
+        minimum = HiveDecimal.create(dec.getMinimum());
+      }
+      if (dec.hasSum()) {
+        sum = HiveDecimal.create(dec.getSum());
+      } else {
+        sum = null;
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      minimum = null;
+      maximum = null;
+      sum = HiveDecimal.ZERO;
+    }
+
+    @Override
+    void updateDecimal(HiveDecimal value) {
+      if (minimum == null) {
+        minimum = value;
+        maximum = value;
+      } else if (minimum.compareTo(value) > 0) {
+        minimum = value;
+      } else if (maximum.compareTo(value) < 0) {
+        maximum = value;
+      }
+      if (sum != null) {
+        sum = sum.add(value);
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof DecimalStatisticsImpl) {
+        DecimalStatisticsImpl dec = (DecimalStatisticsImpl) other;
+        if (minimum == null) {
+          minimum = dec.minimum;
+          maximum = dec.maximum;
+          sum = dec.sum;
+        } else if (dec.minimum != null) {
+          if (minimum.compareTo(dec.minimum) > 0) {
+            minimum = dec.minimum;
+          }
+          if (maximum.compareTo(dec.maximum) < 0) {
+            maximum = dec.maximum;
+          }
+          if (sum == null || dec.sum == null) {
+            sum = null;
+          } else {
+            sum = sum.add(dec.sum);
+          }
+        }
+      } else {
+        if (isStatsExists() && minimum != null) {
+          throw new IllegalArgumentException("Incompatible merging of decimal 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.DecimalStatistics.Builder dec =
+          OrcProto.DecimalStatistics.newBuilder();
+      if (getNumberOfValues() != 0 && minimum != null) {
+        dec.setMinimum(minimum.toString());
+        dec.setMaximum(maximum.toString());
+      }
+      if (sum != null) {
+        dec.setSum(sum.toString());
+      }
+      result.setDecimalStatistics(dec);
+      return result;
+    }
+
+    @Override
+    public HiveDecimal getMinimum() {
+      return minimum;
+    }
+
+    @Override
+    public HiveDecimal getMaximum() {
+      return maximum;
+    }
+
+    @Override
+    public HiveDecimal getSum() {
+      return sum;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" min: ");
+        buf.append(minimum);
+        buf.append(" max: ");
+        buf.append(maximum);
+        if (sum != null) {
+          buf.append(" sum: ");
+          buf.append(sum);
+        }
+      }
+      return buf.toString();
+    }
+  }
+
+  private static final class DateStatisticsImpl extends ColumnStatisticsImpl
+      implements DateColumnStatistics {
+    private Integer minimum = null;
+    private Integer maximum = null;
+
+    DateStatisticsImpl() {
+    }
+
+    DateStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.DateStatistics dateStats = stats.getDateStatistics();
+      // min,max values serialized/deserialized as int (days since epoch)
+      if (dateStats.hasMaximum()) {
+        maximum = dateStats.getMaximum();
+      }
+      if (dateStats.hasMinimum()) {
+        minimum = dateStats.getMinimum();
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      minimum = null;
+      maximum = null;
+    }
+
+    @Override
+    void updateDate(int daysSinceEpoch) {
+      if (minimum == null) {
+        minimum = daysSinceEpoch;
+        maximum = daysSinceEpoch;
+      } else if (minimum > daysSinceEpoch) {
+        minimum = daysSinceEpoch;
+      } else if (maximum < daysSinceEpoch) {
+        maximum = daysSinceEpoch;
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof DateStatisticsImpl) {
+        DateStatisticsImpl dateStats = (DateStatisticsImpl) other;
+        if (minimum == null) {
+          minimum = dateStats.minimum;
+          maximum = dateStats.maximum;
+        } else if (dateStats.minimum != null) {
+          if (minimum > dateStats.minimum) {
+            minimum = dateStats.minimum;
+          }
+          if (maximum < dateStats.maximum) {
+            maximum = dateStats.maximum;
+          }
+        }
+      } else {
+        if (isStatsExists() && minimum != null) {
+          throw new IllegalArgumentException("Incompatible merging of date 
column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.DateStatistics.Builder dateStats =
+          OrcProto.DateStatistics.newBuilder();
+      if (getNumberOfValues() != 0 && minimum != null) {
+        dateStats.setMinimum(minimum);
+        dateStats.setMaximum(maximum);
+      }
+      result.setDateStatistics(dateStats);
+      return result;
+    }
+
+    private transient final DateWritable minDate = new DateWritable();
+    private transient final DateWritable maxDate = new DateWritable();
+
+    @Override
+    public Date getMinimum() {
+      if (minimum == null) {
+        return null;
+      }
+      minDate.set(minimum);
+      return minDate.get();
+    }
+
+    @Override
+    public Date getMaximum() {
+      if (maximum == null) {
+        return null;
+      }
+      maxDate.set(maximum);
+      return maxDate.get();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" min: ");
+        buf.append(getMinimum());
+        buf.append(" max: ");
+        buf.append(getMaximum());
+      }
+      return buf.toString();
+    }
+  }
+
+  private static final class TimestampStatisticsImpl extends 
ColumnStatisticsImpl
+      implements TimestampColumnStatistics {
+    private Long minimum = null;
+    private Long maximum = null;
+
+    TimestampStatisticsImpl() {
+    }
+
+    TimestampStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.TimestampStatistics timestampStats = 
stats.getTimestampStatistics();
+      // min,max values serialized/deserialized as int (milliseconds since 
epoch)
+      if (timestampStats.hasMaximum()) {
+        maximum = timestampStats.getMaximum();
+      }
+      if (timestampStats.hasMinimum()) {
+        minimum = timestampStats.getMinimum();
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      minimum = null;
+      maximum = null;
+    }
+
+    @Override
+    void updateTimestamp(Timestamp value) {
+      if (minimum == null) {
+        minimum = value.getTime();
+        maximum = value.getTime();
+      } else if (minimum > value.getTime()) {
+        minimum = value.getTime();
+      } else if (maximum < value.getTime()) {
+        maximum = value.getTime();
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      if (other instanceof TimestampStatisticsImpl) {
+        TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) 
other;
+        if (minimum == null) {
+          minimum = timestampStats.minimum;
+          maximum = timestampStats.maximum;
+        } else if (timestampStats.minimum != null) {
+          if (minimum > timestampStats.minimum) {
+            minimum = timestampStats.minimum;
+          }
+          if (maximum < timestampStats.maximum) {
+            maximum = timestampStats.maximum;
+          }
+        }
+      } else {
+        if (isStatsExists() && minimum != null) {
+          throw new IllegalArgumentException("Incompatible merging of 
timestamp column statistics");
+        }
+      }
+      super.merge(other);
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.TimestampStatistics.Builder timestampStats = 
OrcProto.TimestampStatistics
+          .newBuilder();
+      if (getNumberOfValues() != 0 && minimum != null) {
+        timestampStats.setMinimum(minimum);
+        timestampStats.setMaximum(maximum);
+      }
+      result.setTimestampStatistics(timestampStats);
+      return result;
+    }
+
+    @Override
+    public Timestamp getMinimum() {
+      return minimum == null ? null : new Timestamp(minimum);
+    }
+
+    @Override
+    public Timestamp getMaximum() {
+      return maximum == null ? null : new Timestamp(maximum);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" min: ");
+        buf.append(getMinimum());
+        buf.append(" max: ");
+        buf.append(getMaximum());
+      }
+      return buf.toString();
+    }
+  }
+
+  private long count = 0;
+  private boolean hasNull = false;
+
+  ColumnStatisticsImpl(OrcProto.ColumnStatistics stats) {
+    if (stats.hasNumberOfValues()) {
+      count = stats.getNumberOfValues();
+    }
+
+    hasNull = !stats.hasHasNull() || stats.getHasNull();
+  }
+
+  ColumnStatisticsImpl() {
+  }
+
+  void increment() {
+    count += 1;
+  }
+
+  void setNull() {
+    hasNull = true;
+  }
+
+  void updateBoolean(boolean value) {
+    throw new UnsupportedOperationException("Can't update boolean");
+  }
+
+  void updateInteger(long value) {
+    throw new UnsupportedOperationException("Can't update integer");
+  }
+
+  void updateDouble(double value) {
+    throw new UnsupportedOperationException("Can't update double");
+  }
+
+  void updateString(String value) {
+    throw new UnsupportedOperationException("Can't update string");
+  }
+
+  void updateBinary(Datum value) {
+    throw new UnsupportedOperationException("Can't update binary");
+  }
+
+  void updateDecimal(HiveDecimal value) {
+    throw new UnsupportedOperationException("Can't update decimal");
+  }
+
+  void updateDate(int days) {
+    throw new UnsupportedOperationException("Can't update date");
+  }
+
+  void updateTimestamp(Timestamp value) {
+    throw new UnsupportedOperationException("Can't update timestamp");
+  }
+
+  boolean isStatsExists() {
+    return (count > 0 || hasNull);
+  }
+
+  void merge(ColumnStatisticsImpl stats) {
+    count += stats.count;
+    hasNull |= stats.hasNull;
+  }
+
+  void reset() {
+    count = 0;
+    hasNull = false;
+  }
+
+  @Override
+  public long getNumberOfValues() {
+    return count;
+  }
+
+  @Override
+  public boolean hasNull() {
+    return hasNull;
+  }
+
+  @Override
+  public String toString() {
+    return "count: " + count + " hasNull: " + hasNull;
+  }
+
+  OrcProto.ColumnStatistics.Builder serialize() {
+    OrcProto.ColumnStatistics.Builder builder =
+      OrcProto.ColumnStatistics.newBuilder();
+    builder.setNumberOfValues(count);
+    builder.setHasNull(hasNull);
+    return builder;
+  }
+
+  static ColumnStatisticsImpl create(ObjectInspector inspector) {
+    switch (inspector.getCategory()) {
+      case PRIMITIVE:
+        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) 
{
+          case BOOLEAN:
+            return new BooleanStatisticsImpl();
+          case BYTE:
+          case SHORT:
+          case INT:
+          case LONG:
+            return new IntegerStatisticsImpl();
+          case FLOAT:
+          case DOUBLE:
+            return new DoubleStatisticsImpl();
+          case STRING:
+          case CHAR:
+          case VARCHAR:
+            return new StringStatisticsImpl();
+          case DECIMAL:
+            return new DecimalStatisticsImpl();
+          case DATE:
+            return new DateStatisticsImpl();
+          case TIMESTAMP:
+            return new TimestampStatisticsImpl();
+          case BINARY:
+            return new BinaryStatisticsImpl();
+          default:
+            return new ColumnStatisticsImpl();
+        }
+      default:
+        return new ColumnStatisticsImpl();
+    }
+  }
+
+  static ColumnStatisticsImpl deserialize(OrcProto.ColumnStatistics stats) {
+    if (stats.hasBucketStatistics()) {
+      return new BooleanStatisticsImpl(stats);
+    } else if (stats.hasIntStatistics()) {
+      return new IntegerStatisticsImpl(stats);
+    } else if (stats.hasDoubleStatistics()) {
+      return new DoubleStatisticsImpl(stats);
+    } else if (stats.hasStringStatistics()) {
+      return new StringStatisticsImpl(stats);
+    } else if (stats.hasDecimalStatistics()) {
+      return new DecimalStatisticsImpl(stats);
+    } else if (stats.hasDateStatistics()) {
+      return new DateStatisticsImpl(stats);
+    } else if (stats.hasTimestampStatistics()) {
+      return new TimestampStatisticsImpl(stats);
+    } else if(stats.hasBinaryStatistics()) {
+      return new BinaryStatisticsImpl(stats);
+    } else {
+      return new ColumnStatisticsImpl(stats);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionCodec.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionCodec.java
new file mode 100644
index 0000000..769ca50
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionCodec.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+public interface CompressionCodec {
+
+  public enum Modifier {
+    /* speed/compression tradeoffs */
+    FASTEST,
+    FAST,
+    DEFAULT,
+    /* data sensitivity modifiers */
+    TEXT,
+    BINARY
+  };
+
+  /**
+   * Compress the in buffer to the out buffer.
+   * @param in the bytes to compress
+   * @param out the uncompressed bytes
+   * @param overflow put any additional bytes here
+   * @return true if the output is smaller than input
+   * @throws IOException
+   */
+  boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow
+  ) throws IOException;
+
+  /**
+   * Decompress the in buffer to the out buffer.
+   * @param in the bytes to decompress
+   * @param out the decompressed bytes
+   * @throws IOException
+   */
+  void decompress(ByteBuffer in, ByteBuffer out) throws IOException;
+
+  /**
+   * Produce a modified compression codec if the underlying algorithm allows
+   * modification.
+   *
+   * This does not modify the current object, but returns a new object if
+   * modifications are possible. Returns the same object if no modifications
+   * are possible.
+   * @param modifiers compression modifiers
+   * @return codec for use after optional modification
+   */
+  CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers);
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionKind.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionKind.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionKind.java
new file mode 100644
index 0000000..8b16c67
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/CompressionKind.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * An enumeration that lists the generic compression algorithms that
+ * can be applied to ORC files.
+ */
+public enum CompressionKind {
+  NONE, ZLIB, SNAPPY, LZO
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DateColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DateColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DateColumnStatistics.java
new file mode 100644
index 0000000..cb3405e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DateColumnStatistics.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.util.Date;
+
+/**
+ * Statistics for DATE columns.
+ */
+public interface DateColumnStatistics extends ColumnStatistics {
+  /**
+   * Get the minimum value for the column.
+   * @return minimum value
+   */
+  Date getMinimum();
+
+  /**
+   * Get the maximum value for the column.
+   * @return maximum value
+   */
+  Date getMaximum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
new file mode 100644
index 0000000..27cdac2
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+/**
+ * Statistics for decimal columns.
+ */
+public interface DecimalColumnStatistics extends ColumnStatistics {
+
+  /**
+   * Get the minimum value for the column.
+   * @return the minimum value
+   */
+  HiveDecimal getMinimum();
+
+  /**
+   * Get the maximum value for the column.
+   * @return the maximum value
+   */
+  HiveDecimal getMaximum();
+
+  /**
+   * Get the sum of the values of the column.
+   * @return the sum
+   */
+  HiveDecimal getSum();
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
new file mode 100644
index 0000000..5333052
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface DirectDecompressionCodec extends CompressionCodec {
+  public boolean isAvailable();
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
new file mode 100644
index 0000000..ddce8f7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+/**
+ * Statistics for float and double columns.
+ */
+public interface DoubleColumnStatistics extends ColumnStatistics {
+
+  /**
+   * Get the smallest value in the column. Only defined if getNumberOfValues
+   * is non-zero.
+   * @return the minimum
+   */
+  double getMinimum();
+
+  /**
+   * Get the largest value in the column. Only defined if getNumberOfValues
+   * is non-zero.
+   * @return the maximum
+   */
+  double getMaximum();
+
+  /**
+   * Get the sum of the values in the column.
+   * @return the sum
+   */
+  double getSum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
new file mode 100644
index 0000000..1d44f77
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A class that is a growable array of bytes. Growth is managed in terms of
+ * chunks that are allocated when needed.
+ */
+final class DynamicByteArray {
+  static final int DEFAULT_CHUNKSIZE = 32 * 1024;
+  static final int DEFAULT_NUM_CHUNKS = 128;
+
+  private final int chunkSize;        // our allocation sizes
+  private byte[][] data;              // the real data
+  private int length;                 // max set element index +1
+  private int initializedChunks = 0;  // the number of chunks created
+
+  public DynamicByteArray() {
+    this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
+  }
+
+  public DynamicByteArray(int numChunks, int chunkSize) {
+    if (chunkSize == 0) {
+      throw new IllegalArgumentException("bad chunksize");
+    }
+    this.chunkSize = chunkSize;
+    data = new byte[numChunks][];
+  }
+
+  /**
+   * Ensure that the given index is valid.
+   */
+  private void grow(int chunkIndex) {
+    if (chunkIndex >= initializedChunks) {
+      if (chunkIndex >= data.length) {
+        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+        byte[][] newChunk = new byte[newSize][];
+        System.arraycopy(data, 0, newChunk, 0, data.length);
+        data = newChunk;
+      }
+      for(int i=initializedChunks; i <= chunkIndex; ++i) {
+        data[i] = new byte[chunkSize];
+      }
+      initializedChunks = chunkIndex + 1;
+    }
+  }
+
+  public byte get(int index) {
+    if (index >= length) {
+      throw new IndexOutOfBoundsException("Index " + index +
+                                            " is outside of 0.." +
+                                            (length - 1));
+    }
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    return data[i][j];
+  }
+
+  public void set(int index, byte value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] = value;
+  }
+
+  public int add(byte value) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow(i);
+    data[i][j] = value;
+    int result = length;
+    length += 1;
+    return result;
+  }
+
+  /**
+   * Copy a slice of a byte array into our buffer.
+   * @param value the array to copy from
+   * @param valueOffset the first location to copy from value
+   * @param valueLength the number of bytes to copy from value
+   * @return the offset of the start of the value
+   */
+  public int add(byte[] value, int valueOffset, int valueLength) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow((length + valueLength) / chunkSize);
+    int remaining = valueLength;
+    while (remaining > 0) {
+      int size = Math.min(remaining, chunkSize - j);
+      System.arraycopy(value, valueOffset, data[i], j, size);
+      remaining -= size;
+      valueOffset += size;
+      i += 1;
+      j = 0;
+    }
+    int result = length;
+    length += valueLength;
+    return result;
+  }
+
+  /**
+   * Read the entire stream into this array.
+   * @param in the stream to read from
+   * @throws IOException
+   */
+  public void readAll(InputStream in) throws IOException {
+    int currentChunk = length / chunkSize;
+    int currentOffset = length % chunkSize;
+    grow(currentChunk);
+    int currentLength = in.read(data[currentChunk], currentOffset,
+      chunkSize - currentOffset);
+    while (currentLength > 0) {
+      length += currentLength;
+      currentOffset = length % chunkSize;
+      if (currentOffset == 0) {
+        currentChunk = length / chunkSize;
+        grow(currentChunk);
+      }
+      currentLength = in.read(data[currentChunk], currentOffset,
+        chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Byte compare a set of bytes against the bytes in this dynamic array.
+   * @param other source of the other bytes
+   * @param otherOffset start offset in the other array
+   * @param otherLength number of bytes in the other array
+   * @param ourOffset the offset in our array
+   * @param ourLength the number of bytes in our array
+   * @return negative for less, 0 for equal, positive for greater
+   */
+  public int compare(byte[] other, int otherOffset, int otherLength,
+                     int ourOffset, int ourLength) {
+    int currentChunk = ourOffset / chunkSize;
+    int currentOffset = ourOffset % chunkSize;
+    int maxLength = Math.min(otherLength, ourLength);
+    while (maxLength > 0 &&
+      other[otherOffset] == data[currentChunk][currentOffset]) {
+      otherOffset += 1;
+      currentOffset += 1;
+      if (currentOffset == chunkSize) {
+        currentChunk += 1;
+        currentOffset = 0;
+      }
+      maxLength -= 1;
+    }
+    if (maxLength == 0) {
+      return otherLength - ourLength;
+    }
+    int otherByte = 0xff & other[otherOffset];
+    int ourByte = 0xff & data[currentChunk][currentOffset];
+    return otherByte > ourByte ? 1 : -1;
+  }
+
+  /**
+   * Get the size of the array.
+   * @return the number of bytes in the array
+   */
+  public int size() {
+    return length;
+  }
+
+  /**
+   * Clear the array to its original pristine state.
+   */
+  public void clear() {
+    length = 0;
+    for(int i=0; i < data.length; ++i) {
+      data[i] = null;
+    }
+    initializedChunks = 0;
+  }
+
+  /**
+   * Set a text value from the bytes in this dynamic array.
+   * @param result the value to set
+   * @param offset the start of the bytes to copy
+   * @param length the number of bytes to copy
+   */
+  public void setText(Text result, int offset, int length) {
+    result.clear();
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    int currentLength = Math.min(length, chunkSize - currentOffset);
+    while (length > 0) {
+      result.append(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+      currentLength = Math.min(length, chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Write out a range of this dynamic array to an output stream.
+   * @param out the stream to write to
+   * @param offset the first offset to write
+   * @param length the number of bytes to write
+   * @throws IOException
+   */
+  public void write(OutputStream out, int offset,
+                    int length) throws IOException {
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    while (length > 0) {
+      int currentLength = Math.min(length, chunkSize - currentOffset);
+      out.write(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+    }
+  }
+
+  @Override
+  public String toString() {
+    int i;
+    StringBuilder sb = new StringBuilder(length * 3);
+
+    sb.append('{');
+    int l = length - 1;
+    for (i=0; i<l; i++) {
+      sb.append(Integer.toHexString(get(i)));
+      sb.append(',');
+    }
+    sb.append(get(i));
+    sb.append('}');
+
+    return sb.toString();
+  }
+
+  public void setByteBuffer(ByteBuffer result, int offset, int length) {
+    result.clear();
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    int currentLength = Math.min(length, chunkSize - currentOffset);
+    while (length > 0) {
+      result.put(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+      currentLength = Math.min(length, chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Gets all the bytes of the array.
+   *
+   * @return Bytes of the array
+   */
+  public byte[] get() {
+    byte[] result = null;
+    if (length > 0) {
+      int currentChunk = 0;
+      int currentOffset = 0;
+      int currentLength = Math.min(length, chunkSize);
+      int destOffset = 0;
+      result = new byte[length];
+      int totalLength = length;
+      while (totalLength > 0) {
+        System.arraycopy(data[currentChunk], currentOffset, result, 
destOffset, currentLength);
+        destOffset += currentLength;
+        totalLength -= currentLength;
+        currentChunk += 1;
+        currentOffset = 0;
+        currentLength = Math.min(totalLength, chunkSize - currentOffset);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Get the size of the buffers.
+   */
+  public long getSizeInBytes() {
+    return initializedChunks * chunkSize;
+  }
+}

Reply via email to