TAJO-1153: Merge off-heap package in block_iteration branch to master branch.

Closes #225


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/514ed847
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/514ed847
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/514ed847

Branch: refs/heads/index_support
Commit: 514ed8474a843c92b2fb772971e41f53010f5a4e
Parents: 080f4e1
Author: Hyunsik Choi <[email protected]>
Authored: Tue Nov 4 00:49:39 2014 -0800
Committer: Hyunsik Choi <[email protected]>
Committed: Tue Nov 4 00:49:39 2014 -0800

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/catalog/SchemaUtil.java     | 103 ++++
 .../java/org/apache/tajo/datum/TextDatum.java   |   6 +-
 .../java/org/apache/tajo/storage/Tuple.java     |  12 +-
 .../java/org/apache/tajo/storage/VTuple.java    |  55 +-
 .../org/apache/tajo/util/Deallocatable.java     |  23 +
 .../main/java/org/apache/tajo/util/SizeOf.java  | 159 +++++
 .../org/apache/tajo/util/UnsafeComparer.java    | 160 +++++
 .../java/org/apache/tajo/util/UnsafeUtil.java   | 115 ++++
 .../engine/planner/PhysicalPlannerImpl.java     |   5 +-
 .../tajo/engine/planner/global/DataChannel.java |   2 +-
 .../planner/physical/ExternalSortExec.java      | 130 ++++-
 .../planner/physical/HashFullOuterJoinExec.java |   2 +-
 .../engine/planner/physical/HashJoinExec.java   |   2 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   2 +-
 .../engine/planner/physical/MergeJoinExec.java  |   2 +-
 .../planner/physical/PhysicalPlanUtil.java      |   9 +-
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../physical/RightOuterMergeJoinExec.java       |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../tajo/engine/planner/physical/SortExec.java  |   7 +-
 .../engine/planner/physical/WindowAggExec.java  |   5 +-
 .../tajo/worker/RangeRetrieverHandler.java      |   6 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   3 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   2 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |   4 +-
 .../planner/TestUniformRangePartition.java      |   4 +-
 .../planner/physical/TestBSTIndexExec.java      |   4 +-
 .../planner/physical/TestExternalSortExec.java  |   2 +-
 .../planner/physical/TestPhysicalPlanner.java   |   2 +-
 .../physical/TestProgressExternalSortExec.java  |   2 +-
 .../apache/tajo/engine/util/TestTupleUtil.java  |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   4 +-
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |  17 +-
 .../tajo/plan/LogicalPlanPreprocessor.java      |   2 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   2 +-
 .../GreedyHeuristicJoinOrderAlgorithm.java      |   2 +-
 .../org/apache/tajo/plan/logical/ScanNode.java  |   2 +-
 .../tajo/plan/logical/TableSubQueryNode.java    |   2 +-
 .../rewrite/rules/ProjectionPushDownRule.java   |   2 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |   1 +
 .../org/apache/tajo/plan/util/SchemaUtil.java   |  88 ---
 .../tajo/storage/BaseTupleComparator.java       | 206 +++++++
 .../org/apache/tajo/storage/FrameTuple.java     |  14 +-
 .../java/org/apache/tajo/storage/LazyTuple.java |  13 +-
 .../org/apache/tajo/storage/RowStoreUtil.java   | 202 +++++--
 .../apache/tajo/storage/TupleComparator.java    | 159 +----
 .../org/apache/tajo/storage/TupleRange.java     |   2 +-
 .../apache/tajo/storage/index/IndexMethod.java  |   1 +
 .../apache/tajo/storage/index/bst/BSTIndex.java |  10 +-
 .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 ++++
 .../org/apache/tajo/tuple/RowBlockReader.java   |  33 ++
 .../org/apache/tajo/tuple/TupleBuilder.java     |  26 +
 .../tajo/tuple/offheap/DirectBufTuple.java      |  41 ++
 .../tajo/tuple/offheap/FixedSizeLimitSpec.java  |  32 +
 .../apache/tajo/tuple/offheap/HeapTuple.java    | 269 +++++++++
 .../tajo/tuple/offheap/OffHeapMemory.java       | 102 ++++
 .../tajo/tuple/offheap/OffHeapRowBlock.java     | 176 ++++++
 .../tuple/offheap/OffHeapRowBlockReader.java    |  63 ++
 .../tuple/offheap/OffHeapRowBlockUtils.java     |  54 ++
 .../tuple/offheap/OffHeapRowBlockWriter.java    |  58 ++
 .../tajo/tuple/offheap/OffHeapRowWriter.java    | 232 ++++++++
 .../tajo/tuple/offheap/ResizableLimitSpec.java  | 142 +++++
 .../apache/tajo/tuple/offheap/RowWriter.java    |  73 +++
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  | 308 ++++++++++
 .../offheap/UnSafeTupleBytesComparator.java     |  99 ++++
 .../tajo/tuple/offheap/ZeroCopyTuple.java       |  35 ++
 tajo-storage/src/main/proto/IndexProtos.proto   |   4 +-
 .../tajo/storage/TestTupleComparator.java       |   2 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |  20 +-
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../apache/tajo/tuple/TestBaseTupleBuilder.java |  76 +++
 .../tajo/tuple/offheap/TestHeapTuple.java       |  45 ++
 .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 +++++++++++++++++++
 .../tajo/tuple/offheap/TestResizableSpec.java   |  59 ++
 75 files changed, 3789 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 943869b..19dc1f6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -44,6 +44,9 @@ Release 0.9.1 - unreleased
 
   TASKS
 
+    TAJO-1153: Merge off-heap package in block_iteration branch to master 
+    branch. (hyunsik)
+
     TAJO-1032: Improve TravisCI scripts to adjust log4j log level. (jinho)
 
     TAJO-1141: Refactor the packages hierarchy of tajo-client. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
new file mode 100644
index 0000000..ee670ef
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+
+public class SchemaUtil {
+  // See TAJO-914 bug.
+  //
+  // Its essential problem is that constant value is evaluated multiple times 
at each scan.
+  // As a result, join nodes can take the child nodes which have the same 
named fields.
+  // Because current schema does not allow the same name and ignore the 
duplicated schema,
+  // it finally causes the in-out schema mismatch between the parent and child 
nodes.
+  //
+  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated 
constant values as different name fields.
+  // The essential solution would be 
https://issues.apache.org/jira/browse/TAJO-895.
+  static int tmpColumnSeq = 0;
+  public static Schema merge(Schema left, Schema right) {
+    Schema merged = new Schema();
+    for(Column col : left.getColumns()) {
+      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn(col);
+      }
+    }
+    for(Column col : right.getColumns()) {
+      if (merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
+      } else {
+        merged.addColumn(col);
+      }
+    }
+
+    // if overflow
+    if (tmpColumnSeq < 0) {
+      tmpColumnSeq = 0;
+    }
+    return merged;
+  }
+
+  /**
+   * Get common columns to be used as join keys of natural joins.
+   */
+  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
+    Schema common = new Schema();
+    for (Column outer : left.getColumns()) {
+      if (!common.containsByName(outer.getSimpleName()) && 
right.containsByName(outer.getSimpleName())) {
+        common.addColumn(new Column(outer.getSimpleName(), 
outer.getDataType()));
+      }
+    }
+    
+    return common;
+  }
+
+  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String 
tableName) {
+    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    if (tableName != null) {
+      logicalSchema.setQualifier(tableName);
+    }
+    return logicalSchema;
+  }
+
+  public static <T extends Schema> T clone(Schema schema) {
+    try {
+      T copy = (T) schema.clone();
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static DataType [] toDataTypes(Schema schema) {
+    DataType[] types = new DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      types[i] = schema.getColumn(i).getDataType();
+    }
+    return types;
+  }
+
+  public static Type [] toTypes(Schema schema) {
+    Type [] types = new Type[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      types[i] = schema.getColumn(i).getDataType().getType();
+    }
+    return types;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java 
b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index ca76ed2..9a9b37e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -30,7 +30,7 @@ import java.nio.charset.Charset;
 import java.util.Comparator;
 
 public class TextDatum extends Datum {
-  static Charset defaultCharset = Charset.forName("UTF-8");
+  public static Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
   @Expose private final int size;
   /* encoded in UTF-8 */
@@ -47,7 +47,7 @@ public class TextDatum extends Datum {
   }
 
   public TextDatum(String string) {
-    this(string.getBytes(defaultCharset));
+    this(string.getBytes(DEFAULT_CHARSET));
   }
 
   @Override
@@ -92,7 +92,7 @@ public class TextDatum extends Datum {
 
   @Override
   public String asChars() {
-    return new String(this.bytes, defaultCharset);
+    return new String(this.bytes, DEFAULT_CHARSET);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java 
b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
index c183171..1ba1926 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.storage;
 
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.ProtobufDatum;
 
 public interface Tuple extends Cloneable {
   
@@ -28,16 +27,19 @@ public interface Tuple extends Cloneable {
        public boolean contains(int fieldid);
 
   public boolean isNull(int fieldid);
+
+  @SuppressWarnings("unused")
+  public boolean isNotNull(int fieldid);
        
        public void clear();
        
        public void put(int fieldId, Datum value);
 
-  public void put(int fieldId, Datum [] values);
+  public void put(int fieldId, Datum[] values);
 
   public void put(int fieldId, Tuple tuple);
        
-       public void put(Datum [] values);
+       public void put(Datum[] values);
        
        public Datum get(int fieldId);
        
@@ -65,7 +67,9 @@ public interface Tuple extends Cloneable {
        
        public String getText(int fieldId);
 
-  public ProtobufDatum getProtobufDatum(int fieldId);
+  public Datum getProtobufDatum(int fieldId);
+
+  public Datum getInterval(int fieldId);
 
   public char [] getUnicodeChars(int fieldId);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java 
b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 4fb35f9..6304734 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.Inet4Datum;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnimplementedException;
 
@@ -38,10 +38,9 @@ public class VTuple implements Tuple, Cloneable {
 
   public VTuple(Tuple tuple) {
     this.values = tuple.getValues().clone();
-    this.offset = ((VTuple)tuple).offset;
   }
 
-  public VTuple(Datum [] datum) {
+  public VTuple(Datum[] datum) {
     this(datum.length);
     put(datum);
   }
@@ -57,7 +56,12 @@ public class VTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return values[fieldid] instanceof NullDatum;
+    return values[fieldid].isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -179,6 +183,11 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) values[fieldId];
+  }
+
+  @Override
   public char[] getUnicodeChars(int fieldId) {
     return values[fieldId].asUnicodeChars();
   }
@@ -193,23 +202,7 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   public String toString() {
-               boolean first = true;
-               StringBuilder str = new StringBuilder();
-               str.append("(");
-               for(int i=0; i < values.length; i++) {                  
-                       if(values[i] != null) {
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       str.append(", ");
-                               }
-                               str.append(i)
-                               .append("=>")
-                               .append(values[i]);
-                       }
-               }
-               str.append(")");
-               return str.toString();
+               return toDisplayString(getValues());
        }
 
        @Override
@@ -230,4 +223,24 @@ public class VTuple implements Tuple, Cloneable {
     }
     return false;
   }
+
+  public static String toDisplayString(Datum [] values) {
+    boolean first = true;
+    StringBuilder str = new StringBuilder();
+    str.append("(");
+    for(int i=0; i < values.length; i++) {
+      if(values[i] != null) {
+        if(first) {
+          first = false;
+        } else {
+          str.append(", ");
+        }
+        str.append(i)
+            .append("=>")
+            .append(values[i]);
+      }
+    }
+    str.append(")");
+    return str.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java 
b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
new file mode 100644
index 0000000..8202a4b
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+public interface Deallocatable {
+  void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java 
b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
new file mode 100644
index 0000000..b7ca10c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
@@ -0,0 +1,159 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import static org.apache.tajo.util.UnsafeUtil.*;
+
+
+public final class SizeOf {
+
+  public static final int BITS_PER_BYTE = 8;
+  public static final int BYTES_PER_WORD = SizeOf.SIZE_OF_LONG;
+  public static final int BITS_PER_WORD = SizeOf.SIZE_OF_LONG * BITS_PER_BYTE;
+
+  public static final byte SIZE_OF_BOOL = 1;
+  public static final byte SIZE_OF_BYTE = 1;
+  public static final byte SIZE_OF_SHORT = 2;
+  public static final byte SIZE_OF_INT = 4;
+  public static final byte SIZE_OF_LONG = 8;
+  public static final byte SIZE_OF_FLOAT = 4;
+  public static final byte SIZE_OF_DOUBLE = 8;
+
+  public static long sizeOf(boolean[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(byte[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(short[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(char[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(int[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(long[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(float[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(double[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * 
array.length);
+  }
+
+  public static long sizeOf(Object[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * 
array.length);
+  }
+
+
+  public static long sizeOfBooleanArray(int length)
+  {
+    return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * 
length);
+  }
+
+  public static long sizeOfByteArray(int length)
+  {
+    return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfShortArray(int length)
+  {
+    return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * 
length);
+  }
+
+  public static long sizeOfCharArray(int length)
+  {
+    return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfIntArray(int length)
+  {
+    return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfLongArray(int length)
+  {
+    return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfFloatArray(int length)
+  {
+    return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * 
length);
+  }
+
+  public static long sizeOfDoubleArray(int length)
+  {
+    return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * 
length);
+  }
+
+  public static long sizeOfObjectArray(int length)
+  {
+    return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * 
length);
+  }
+
+  private SizeOf()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java 
b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
new file mode 100644
index 0000000..fec4e5a
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
@@ -0,0 +1,160 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.primitives.UnsignedLongs;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Comparator;
+
+/**
+ * This is borrowed from Guava's UnsignedBytes.
+ */
+public class UnsafeComparer implements Comparator<byte[]> {
+
+  public static final UnsafeComparer INSTANCE;
+
+  static {
+    INSTANCE = new UnsafeComparer();
+  }
+
+  public Comparator<byte []> get() {
+    return INSTANCE;
+  }
+
+  private UnsafeComparer() {}
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+      /*
+       * The following static final fields exist for performance reasons.
+       *
+       * In UnsignedBytesBenchmark, accessing the following objects via static
+       * final fields is the fastest (more than twice as fast as the Java
+       * implementation, vs ~1.5x with non-final static fields, on x86_32)
+       * under the Hotspot server compiler. The reason is obviously that the
+       * non-final fields need to be reloaded inside the loop.
+       *
+       * And, no, defining (final or not) local variables out of the loop still
+       * isn't as good because the null check on the theUnsafe object remains
+       * inside the loop and BYTE_ARRAY_BASE_OFFSET doesn't get
+       * constant-folded.
+       *
+       * The compiler can treat static final fields as compile-time constants
+       * and can constant-fold them while (final or not) local variables are
+       * run time values.
+       */
+
+  static final Unsafe theUnsafe;
+
+  /** The offset to the first element in a byte array. */
+  static final int BYTE_ARRAY_BASE_OFFSET;
+
+  static {
+    theUnsafe = (Unsafe) AccessController.doPrivileged(
+        new PrivilegedAction<Object>() {
+          @Override
+          public Object run() {
+            try {
+              Field f = Unsafe.class.getDeclaredField("theUnsafe");
+              f.setAccessible(true);
+              return f.get(null);
+            } catch (NoSuchFieldException e) {
+              // It doesn't matter what we throw;
+              // it's swallowed in getBestComparator().
+              throw new Error();
+            } catch (IllegalAccessException e) {
+              throw new Error();
+            }
+          }
+        });
+
+    BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+
+    // sanity check - this should never fail
+    if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
+      throw new AssertionError();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  public static int compareStatic(byte[] left, byte[] right) {
+    return INSTANCE.compare(left, right);
+  }
+
+  @Override public int compare(byte[] left, byte[] right) {
+    int minLength = Math.min(left.length, right.length);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = theUnsafe.getLong(left, BYTE_ARRAY_BASE_OFFSET + (long) i);
+      long rw = theUnsafe.getLong(right, BYTE_ARRAY_BASE_OFFSET + (long) i);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UnsignedBytes.compare(left[i], right[i]);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return left.length - right.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
new file mode 100644
index 0000000..ae88f73
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
@@ -0,0 +1,115 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.util;
+
+import com.google.common.base.Preconditions;
+import sun.misc.Cleaner;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+public class UnsafeUtil {
+  public static final Unsafe unsafe;
+
+  // offsets
+  public static final int ARRAY_BOOLEAN_BASE_OFFSET;
+  public static final int ARRAY_BYTE_BASE_OFFSET;
+  public static final int ARRAY_SHORT_BASE_OFFSET;
+  public static final int ARRAY_CHAR_BASE_OFFSET;
+  public static final int ARRAY_INT_BASE_OFFSET;
+  public static final int ARRAY_LONG_BASE_OFFSET;
+  public static final int ARRAY_FLOAT_BASE_OFFSET;
+  public static final int ARRAY_DOUBLE_BASE_OFFSET;
+  public static final int ARRAY_OBJECT_BASE_OFFSET;
+
+  // scale
+  public static final int ARRAY_BOOLEAN_INDEX_SCALE;
+  public static final int ARRAY_BYTE_INDEX_SCALE;
+  public static final int ARRAY_SHORT_INDEX_SCALE;
+  public static final int ARRAY_CHAR_INDEX_SCALE;
+  public static final int ARRAY_INT_INDEX_SCALE;
+  public static final int ARRAY_LONG_INDEX_SCALE;
+  public static final int ARRAY_FLOAT_INDEX_SCALE;
+  public static final int ARRAY_DOUBLE_INDEX_SCALE;
+  public static final int ARRAY_OBJECT_INDEX_SCALE;
+
+  static {
+    Field field;
+    try {
+      field = Unsafe.class.getDeclaredField("theUnsafe");
+
+      field.setAccessible(true);
+      unsafe = (Unsafe) field.get(null);
+      if (unsafe == null) {
+        throw new RuntimeException("Unsafe access not available");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    ARRAY_BOOLEAN_BASE_OFFSET = unsafe.arrayBaseOffset(boolean[].class);
+    ARRAY_BYTE_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
+    ARRAY_SHORT_BASE_OFFSET = unsafe.arrayBaseOffset(short[].class);
+    ARRAY_CHAR_BASE_OFFSET = unsafe.arrayBaseOffset(char[].class);
+    ARRAY_INT_BASE_OFFSET = unsafe.arrayBaseOffset(int[].class);
+    ARRAY_LONG_BASE_OFFSET = unsafe.arrayBaseOffset(long[].class);
+    ARRAY_FLOAT_BASE_OFFSET = unsafe.arrayBaseOffset(float[].class);
+    ARRAY_DOUBLE_BASE_OFFSET = unsafe.arrayBaseOffset(double[].class);
+    ARRAY_OBJECT_BASE_OFFSET = unsafe.arrayBaseOffset(Object[].class);
+
+    ARRAY_BOOLEAN_INDEX_SCALE = unsafe.arrayIndexScale(boolean[].class);
+    ARRAY_BYTE_INDEX_SCALE = unsafe.arrayIndexScale(byte[].class);
+    ARRAY_SHORT_INDEX_SCALE = unsafe.arrayIndexScale(short[].class);
+    ARRAY_CHAR_INDEX_SCALE = unsafe.arrayIndexScale(char[].class);
+    ARRAY_INT_INDEX_SCALE = unsafe.arrayIndexScale(int[].class);
+    ARRAY_LONG_INDEX_SCALE = unsafe.arrayIndexScale(long[].class);
+    ARRAY_FLOAT_INDEX_SCALE = unsafe.arrayIndexScale(float[].class);
+    ARRAY_DOUBLE_INDEX_SCALE = unsafe.arrayIndexScale(double[].class);
+    ARRAY_OBJECT_INDEX_SCALE = unsafe.arrayIndexScale(Object[].class);
+  }
+
+  public static int alignedSize(int size) {
+    int remain = size % SizeOf.BYTES_PER_WORD;
+    if (remain > 0) {
+      return size + (SizeOf.BYTES_PER_WORD - remain);
+    } else {
+      return size;
+    }
+  }
+
+  public static long getAddress(ByteBuffer buffer) {
+    Preconditions.checkArgument(buffer instanceof DirectBuffer, "ByteBuffer 
must be DirectBuffer");
+    return ((DirectBuffer)buffer).address();
+  }
+
+  public static void free(Deallocatable obj) {
+    obj.release();
+  }
+
+  public static void free(ByteBuffer bb) {
+    Preconditions.checkNotNull(bb);
+    Preconditions.checkState(bb instanceof DirectBuffer);
+
+    Cleaner cleaner = ((DirectBuffer) bb).cleaner();
+    if (cleaner != null) {
+      cleaner.clean();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index f7f071f..99b79ec 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -45,8 +45,9 @@ import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAg
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.TupleComparator;
@@ -1185,7 +1186,7 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), 
annotation.getSortKeys());
     Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), 
"index");
 
-    TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
+    TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());
     return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new 
Path(indexPath, indexName),
         annotation.getKeySchema(), comp, annotation.getDatum());

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 4a2cd19..11548d3 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4c0caea..121e6bd 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -529,9 +529,9 @@ public class ExternalSortExec extends SortExec {
       throws IOException {
     if (num > 1) {
       final int mid = (int) Math.ceil((float)num / 2);
-      return new PairWiseMerger(
+      return new PairWiseMerger(inSchema,
           createKWayMergerInternal(sources, startIdx, mid),
-          createKWayMergerInternal(sources, startIdx + mid, num - mid));
+          createKWayMergerInternal(sources, startIdx + mid, num - mid), 
getComparator());
     } else {
       return sources[startIdx];
     }
@@ -626,6 +626,12 @@ public class ExternalSortExec extends SortExec {
     }
   }
 
+  enum State {
+    NEW,
+    INITED,
+    CLOSED
+  }
+
   /**
    * Two-way merger scanner that reads two input sources and outputs one 
output tuples sorted in some order.
    */
@@ -633,59 +639,132 @@ public class ExternalSortExec extends SortExec {
     private Scanner leftScan;
     private Scanner rightScan;
 
-    private Tuple leftTuple;
-    private Tuple rightTuple;
+    private VTuple outTuple;
+    private VTuple leftTuple;
+    private VTuple rightTuple;
 
-    private final Comparator<Tuple> comparator = getComparator();
+    private final Schema schema;
+    private final Comparator<Tuple> comparator;
 
     private float mergerProgress;
     private TableStats mergerInputStats;
 
-    public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws 
IOException {
+    private State state = State.NEW;
+
+    public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner 
rightScanner, Comparator<Tuple> comparator)
+        throws IOException {
+      this.schema = schema;
       this.leftScan = leftScanner;
       this.rightScan = rightScanner;
+      this.comparator = comparator;
+    }
+
+    private void setState(State state) {
+      this.state = state;
     }
 
     @Override
     public void init() throws IOException {
-      leftScan.init();
-      rightScan.init();
+      if (state == State.NEW) {
+        leftScan.init();
+        rightScan.init();
+
+        prepareTuplesForFirstComparison();
+
+        mergerInputStats = new TableStats();
+        mergerProgress = 0.0f;
+
+        setState(State.INITED);
+      } else {
+        throw new IllegalStateException("Illegal State: init() is not allowed 
in " + state.name());
+      }
+    }
 
-      leftTuple = leftScan.next();
-      rightTuple = rightScan.next();
+    private void prepareTuplesForFirstComparison() throws IOException {
+      Tuple lt = leftScan.next();
+      if (lt != null) {
+        leftTuple = new VTuple(lt);
+      } else {
+        leftTuple = null; // TODO - missed free
+      }
 
-      mergerInputStats = new TableStats();
-      mergerProgress = 0.0f;
+      Tuple rt = rightScan.next();
+      if (rt != null) {
+        rightTuple = new VTuple(rt);
+      } else {
+        rightTuple = null; // TODO - missed free
+      }
     }
 
     public Tuple next() throws IOException {
-      Tuple outTuple;
+
       if (leftTuple != null && rightTuple != null) {
         if (comparator.compare(leftTuple, rightTuple) < 0) {
-          outTuple = leftTuple;
-          leftTuple = leftScan.next();
+          outTuple = new VTuple(leftTuple);
+
+          Tuple lt = leftScan.next();
+          if (lt != null) {
+            leftTuple = new VTuple(lt);
+          } else {
+            leftTuple = null; // TODO - missed free
+          }
         } else {
-          outTuple = rightTuple;
-          rightTuple = rightScan.next();
+          outTuple = new VTuple(rightTuple);
+
+          Tuple rt = rightScan.next();
+          if (rt != null) {
+            rightTuple = new VTuple(rt);
+          } else {
+            rightTuple = null; // TODO - missed free
+          }
         }
         return outTuple;
       }
 
       if (leftTuple == null) {
-        outTuple = rightTuple;
-        rightTuple = rightScan.next();
+        if (rightTuple != null) {
+          outTuple = new VTuple(rightTuple);
+        } else {
+          outTuple = null;
+        }
+
+        Tuple rt = rightScan.next();
+        if (rt != null) {
+          rightTuple = new VTuple(rt);
+        } else {
+          rightTuple = null; // TODO - missed free
+        }
       } else {
-        outTuple = leftTuple;
-        leftTuple = leftScan.next();
+        if (leftTuple != null) {
+          outTuple = new VTuple(leftTuple);
+        } else {
+          outTuple = null;
+        }
+
+        Tuple lt = leftScan.next();
+        if (lt != null) {
+          leftTuple = new VTuple(lt);
+        } else {
+          leftTuple = null; // TODO - missed free
+        }
       }
       return outTuple;
     }
 
     @Override
     public void reset() throws IOException {
-      leftScan.reset();
-      rightScan.reset();
-      init();
+      if (state == State.INITED) {
+        leftScan.reset();
+        rightScan.reset();
+
+        outTuple = null;
+        leftTuple = null;
+        rightTuple = null;
+
+        prepareTuplesForFirstComparison();
+      } else {
+        throw new IllegalStateException("Illegal State: init() is not allowed 
in " + state.name());
+      }
     }
 
     public void close() throws IOException {
@@ -694,6 +773,7 @@ public class ExternalSortExec extends SortExec {
       leftScan = null;
       rightScan = null;
       mergerProgress = 1.0f;
+      setState(State.CLOSED);
     }
 
     @Override
@@ -721,7 +801,7 @@ public class ExternalSortExec extends SortExec {
 
     @Override
     public Schema getSchema() {
-      return inSchema;
+      return schema;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index cf333b0..28d9a3e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -23,7 +23,7 @@ import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 18be6b9..701297f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -22,7 +22,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index c38be88..c1b6522 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.expr.AlgebraicUtil;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.expr.EvalTreeUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 20df210..13104ee 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -52,7 +52,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   private Iterator<Tuple> innerIterator;
 
   private JoinTupleComparator joincomparator = null;
-  private TupleComparator[] tupleComparator = null;
+  private TupleComparator [] tupleComparator = null;
 
   private final static int INITIAL_TUPLE_SLOT = 10000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index 1b824b5..ec5915f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -34,6 +34,7 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.PersistentStoreNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.TupleComparator;
@@ -53,11 +54,11 @@ public class PhysicalPlanUtil {
     return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
   }
 
-  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode 
joinQual, Schema leftSchema, Schema rightSchema) {
+  public static TupleComparator [] getComparatorsFromJoinQual(EvalNode 
joinQual, Schema leftSchema, Schema rightSchema) {
     SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(joinQual, 
leftSchema, rightSchema);
-    TupleComparator[] comparators = new TupleComparator[2];
-    comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
-    comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
+    BaseTupleComparator[] comparators = new BaseTupleComparator[2];
+    comparators[0] = new BaseTupleComparator(leftSchema, sortSpecs[0]);
+    comparators[1] = new BaseTupleComparator(rightSchema, sortSpecs[1]);
     return comparators;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index dd72910..79993da 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -71,7 +71,7 @@ public class RangeShuffleFileWriteExec extends 
UnaryPhysicalExec {
     }
 
     BSTIndex bst = new BSTIndex(new TajoConf());
-    this.comp = new TupleComparator(keySchema, sortSpecs);
+    this.comp = new BaseTupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);
     this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 9d8122f..a02d00b 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -51,7 +51,7 @@ public class RightOuterMergeJoinExec extends 
BinaryPhysicalExec {
   private List<Tuple> innerTupleSlots;
 
   private JoinTupleComparator joinComparator = null;
-  private TupleComparator[] tupleComparator = null;
+  private TupleComparator [] tupleComparator = null;
 
   private final static int INITIAL_TUPLE_SLOT = 10000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c6a4f55..3cbb7c9 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -30,7 +30,7 @@ import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.ConstEval;
 import org.apache.tajo.plan.expr.EvalNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index a4a8d37..c0703dd 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -18,24 +18,25 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.Comparator;
 
 public abstract class SortExec extends UnaryPhysicalExec {
-  private final Comparator<Tuple> comparator;
+  private final TupleComparator comparator;
   private final SortSpec [] sortSpecs;
 
   public SortExec(TaskAttemptContext context, Schema inSchema,
                   Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) 
{
     super(context, inSchema, outSchema, child);
     this.sortSpecs = sortSpecs;
-    this.comparator = new TupleComparator(inSchema, sortSpecs);
+    this.comparator = new BaseTupleComparator(inSchema, sortSpecs);
   }
 
   public SortSpec[] getSortSpecs() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 3f4b22b..e36dcd8 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -27,6 +27,7 @@ import org.apache.tajo.plan.expr.WindowFunctionEval;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.WindowAggNode;
 import org.apache.tajo.plan.logical.WindowSpec;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
@@ -285,9 +286,9 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
     for (int idx = 0; idx < functions.length; idx++) {
       if (orderedFuncFlags[idx]) {
-        comp = new TupleComparator(inSchema, functions[idx].getSortSpecs());
+        comp = new BaseTupleComparator(inSchema, 
functions[idx].getSortSpecs());
         Collections.sort(accumulatedInTuples, comp);
-        comp = new TupleComparator(schemaForOrderBy, 
functions[idx].getSortSpecs());
+        comp = new BaseTupleComparator(schemaForOrderBy, 
functions[idx].getSortSpecs());
         Collections.sort(evaluatedTuples, comp);
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index be33a12..2b58196 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.worker.dataserver.retriever.FileChunk;
 import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -57,10 +57,10 @@ public class RangeRetrieverHandler implements 
RetrieverHandler {
   private final File file;
   private final BSTIndex.BSTIndexReader idxReader;
   private final Schema schema;
-  private final TupleComparator comp;
+  private final BaseTupleComparator comp;
   private final RowStoreDecoder decoder;
 
-  public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator 
comp) throws IOException {
+  public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator 
comp) throws IOException {
     this.file = outDir;
     BSTIndex index = new BSTIndex(new TajoConf());
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 3858c96..a7b52c7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -52,6 +52,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
@@ -185,7 +186,7 @@ public class Task {
       if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
         SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = 
PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
-        this.sortComp = new TupleComparator(finalSchema, 
sortNode.getSortKeys());
+        this.sortComp = new BaseTupleComparator(finalSchema, 
sortNode.getSortKeys());
       }
     } else {
       // The final result of a task will be written in a file named 
part-ss-nnnnnnn,

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 4287191..73d89ec 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -41,7 +41,7 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.serder.EvalTreeProtoDeserializer;
 import org.apache.tajo.plan.serder.EvalTreeProtoSerializer;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 90ec9b1..5cc89b8 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -36,10 +36,10 @@ import org.apache.tajo.engine.function.builtin.SumInt;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil;
 import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
@@ -299,7 +299,7 @@ public class TestPlannerUtil {
     FieldEval f4 = new FieldEval("people.fid2", 
CatalogUtil.newSimpleDataType(Type.INT4));
 
     EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
-    TupleComparator [] comparators = 
PhysicalPlanUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
+    TupleComparator[] comparators = 
PhysicalPlanUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
 
     Tuple t1 = new VTuple(2);
     t1.put(0, DatumFactory.createInt4(1));

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index b044384..c5ef4ef 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -23,8 +23,8 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.junit.Test;
@@ -484,7 +484,7 @@ public class TestUniformRangePartition {
     TupleRange expected = new TupleRange(sortSpecs, s, e);
 
     UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
-    TupleComparator comp = new TupleComparator(schema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs);
 
     Tuple tuple = s;
     Tuple prevTuple = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 868a82e..c7fbf39 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -68,7 +68,7 @@ public class TestBSTIndexExec {
   private LogicalOptimizer optimizer;
   private StorageManager sm;
   private Schema idxSchema;
-  private TupleComparator comp;
+  private BaseTupleComparator comp;
   private BSTIndex.BSTIndexWriter writer;
   private HashMap<Integer , Integer> randomValues ;
   private int rndKey = -1;
@@ -104,7 +104,7 @@ public class TestBSTIndexExec {
     idxSchema.addColumn("managerid", Type.INT4);
     SortSpec[] sortKeys = new SortSpec[1];
     sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
-    this.comp = new TupleComparator(idxSchema, sortKeys);
+    this.comp = new BaseTupleComparator(idxSchema, sortKeys);
 
     this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
         BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 46f55c6..f09d104 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -152,7 +152,7 @@ public class TestExternalSortExec {
     int cnt = 0;
     exec.init();
     long start = System.currentTimeMillis();
-    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+    BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
         new SortSpec[]{
             new SortSpec(new Column("managerid", Type.INT4)),
             new SortSpec(new Column("empid", Type.INT4))

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 341ce9e..42338a8 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -1073,7 +1073,7 @@ public class TestPhysicalPlanner {
     keySchema.addColumn("?empId", Type.INT4);
     SortSpec[] sortSpec = new SortSpec[1];
     sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpec);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, 
"output/index"),
         keySchema, comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index a02f2ce..9da9dee 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -170,7 +170,7 @@ public class TestProgressExternalSortExec {
     Tuple curVal;
     int cnt = 0;
     exec.init();
-    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+    BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
         new SortSpec[]{
             new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)),
             new SortSpec(new Column("empid", TajoDataTypes.Type.INT4))

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 62c959c..b8114e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -112,7 +112,7 @@ public class TestTupleUtil {
         sortSpecs);
     TupleRange [] ranges = partitioner.partition(5);
     assertTrue(5 <= ranges.length);
-    TupleComparator comp = new TupleComparator(schema, 
PlannerUtil.schemaToSortSpecs(schema));
+    BaseTupleComparator comp = new BaseTupleComparator(schema, 
PlannerUtil.schemaToSortSpecs(schema));
     TupleRange prev = ranges[0];
     for (int i = 1; i < ranges.length; i++) {
       assertTrue(comp.compare(prev.getStart(), ranges[i].getStart()) < 0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 124976a..e35b80c 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -178,7 +178,7 @@ public class TestRangeRetrieverHandler {
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);
@@ -302,7 +302,7 @@ public class TestRangeRetrieverHandler {
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java 
b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index 5dae67e..6c8ef5d 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -17,6 +17,7 @@ package org.apache.tajo.jdbc; /**
  */
 
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
@@ -47,7 +48,12 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public boolean isNull(int fieldid) {
-    return values.get(fieldid) == null || values.get(fieldid) instanceof 
NullDatum;
+    return values.get(fieldid) == null || values.get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -142,7 +148,12 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public ProtobufDatum getProtobufDatum(int fieldId) {
-    throw new UnsupportedException();
+    throw new UnsupportedException("getProtobufDatum");
+  }
+
+  @Override
+  public IntervalDatum getInterval(int fieldId) {
+    throw new UnsupportedException("getInterval");
   }
 
   @Override
@@ -157,6 +168,6 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public Datum[] getValues(){
-    throw new UnsupportedException();
+    throw new UnsupportedException("getValues");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
index 6bdfb9d..68f2186 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
@@ -32,7 +32,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.nameresolver.NameResolver;
 import org.apache.tajo.plan.nameresolver.NameResolvingMode;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
 import org.apache.tajo.util.TUtil;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 84942a1..e953ccf 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -47,7 +47,7 @@ import org.apache.tajo.plan.nameresolver.NameResolvingMode;
 import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
 import org.apache.tajo.plan.util.ExprFinder;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.Pair;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
index a916d5c..0fad8a8 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
@@ -23,7 +23,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.expr.AlgebraicUtil;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.util.TUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
index a7ef543..3e4abe3 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.util.TUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
index c8fbecd..4e5f41c 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
@@ -23,7 +23,7 @@ import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.Target;
 
 public class TableSubQueryNode extends RelationNode implements Projectable {

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index 167da5b..f7fd90d 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -31,7 +31,7 @@ import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.RewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.TUtil;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 5fe1515..6868b6c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -25,6 +25,7 @@ import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes.DataType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
deleted file mode 100644
index 78135ab..0000000
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.plan.util;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-public class SchemaUtil {
-  // See TAJO-914 bug.
-  //
-  // Its essential problem is that constant value is evaluated multiple times 
at each scan.
-  // As a result, join nodes can take the child nodes which have the same 
named fields.
-  // Because current schema does not allow the same name and ignore the 
duplicated schema,
-  // it finally causes the in-out schema mismatch between the parent and child 
nodes.
-  //
-  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated 
constant values as different name fields.
-  // The essential solution would be 
https://issues.apache.org/jira/browse/TAJO-895.
-  static int tmpColumnSeq = 0;
-  public static Schema merge(Schema left, Schema right) {
-    Schema merged = new Schema();
-    for(Column col : left.getColumns()) {
-      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn(col);
-      }
-    }
-    for(Column col : right.getColumns()) {
-      if (merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
-      } else {
-        merged.addColumn(col);
-      }
-    }
-
-    // if overflow
-    if (tmpColumnSeq < 0) {
-      tmpColumnSeq = 0;
-    }
-    return merged;
-  }
-
-  /**
-   * Get common columns to be used as join keys of natural joins.
-   */
-  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
-    Schema common = new Schema();
-    for (Column outer : left.getColumns()) {
-      if (!common.containsByName(outer.getSimpleName()) && 
right.containsByName(outer.getSimpleName())) {
-        common.addColumn(new Column(outer.getSimpleName(), 
outer.getDataType()));
-      }
-    }
-    
-    return common;
-  }
-
-  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String 
tableName) {
-    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
-    if (tableName != null) {
-      logicalSchema.setQualifier(tableName);
-    }
-    return logicalSchema;
-  }
-
-  public static <T extends Schema> T clone(Schema schema) {
-    try {
-      T copy = (T) schema.clone();
-      return copy;
-    } catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
new file mode 100644
index 0000000..b829f60
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+
+import static 
org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ * 
+ * @see Tuple
+ */
+public class BaseTupleComparator extends TupleComparator implements 
ProtoObject<TupleComparatorProto> {
+  private final Schema schema;
+  private final SortSpec [] sortSpecs;
+  private final int[] sortKeyIds;
+  private final boolean[] asc;
+  @SuppressWarnings("unused")
+  private final boolean[] nullFirsts;  
+
+  private Datum left;
+  private Datum right;
+  private int compVal;
+
+  /**
+   * @param schema The schema of input tuples
+   * @param sortKeys The description of sort keys
+   */
+  public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
+    Preconditions.checkArgument(sortKeys.length > 0, 
+        "At least one sort key must be specified.");
+
+    this.schema = schema;
+    this.sortSpecs = sortKeys;
+    this.sortKeyIds = new int[sortKeys.length];
+    this.asc = new boolean[sortKeys.length];
+    this.nullFirsts = new boolean[sortKeys.length];
+    for (int i = 0; i < sortKeys.length; i++) {
+      if (sortKeys[i].getSortKey().hasQualifier()) {
+        this.sortKeyIds[i] = 
schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+      } else {
+        this.sortKeyIds[i] = 
schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+      }
+          
+      this.asc[i] = sortKeys[i].isAscending();
+      this.nullFirsts[i]= sortKeys[i].isNullFirst();
+    }
+  }
+
+  public BaseTupleComparator(TupleComparatorProto proto) {
+    this.schema = new Schema(proto.getSchema());
+
+    this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
+    for (int i = 0; i < proto.getSortSpecsCount(); i++) {
+      sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
+    }
+
+    this.sortKeyIds = new int[proto.getCompSpecsCount()];
+    this.asc = new boolean[proto.getCompSpecsCount()];
+    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+      sortKeyIds[i] = sortSepcProto.getColumnId();
+      asc[i] = sortSepcProto.getAscending();
+      nullFirsts[i] = sortSepcProto.getNullFirst();
+    }
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public SortSpec [] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public int [] getSortKeyIds() {
+    return sortKeyIds;
+  }
+
+  @Override
+  public boolean isAscendingFirstKey() {
+    return this.asc[0];
+  }
+
+  @Override
+  public int compare(Tuple tuple1, Tuple tuple2) {
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      left = tuple1.get(sortKeyIds[i]);
+      right = tuple2.get(sortKeyIds[i]);
+
+      if (left.isNull() || right.isNull()) {
+        if (!left.equals(right)) {
+          if (left.isNull()) {
+            compVal = 1;
+          } else if (right.isNull()) {
+            compVal = -1;
+          }
+          if (nullFirsts[i]) {
+            if (compVal != 0) {
+              compVal *= -1;
+            }
+          }
+        } else {
+          compVal = 0;
+        }
+      } else {
+        if (asc[i]) {
+          compVal = left.compareTo(right);
+        } else {
+          compVal = right.compareTo(left);
+        }
+      }
+
+      if (compVal < 0 || compVal > 0) {
+        return compVal;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(sortKeyIds);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof BaseTupleComparator) {
+      BaseTupleComparator other = (BaseTupleComparator) obj;
+      if (sortKeyIds.length != other.sortKeyIds.length) {
+        return false;
+      }
+
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        if (sortKeyIds[i] != other.sortKeyIds[i] ||
+            asc[i] != other.asc[i] ||
+            nullFirsts[i] != other.nullFirsts[i]) {
+          return false;
+        }
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public TupleComparatorProto getProto() {
+    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+    builder.setSchema(schema.getProto());
+    for (int i = 0; i < sortSpecs.length; i++) {
+      builder.addSortSpecs(sortSpecs[i].getProto());
+    }
+
+    TupleComparatorSpecProto.Builder sortSpecBuilder;
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+      sortSpecBuilder.setColumnId(sortKeyIds[i]);
+      sortSpecBuilder.setAscending(asc[i]);
+      sortSpecBuilder.setNullFirst(nullFirsts[i]);
+      builder.addCompSpecs(sortSpecBuilder);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    String prefix = "";
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
+        .append(",Asc=").append(asc[i])
+        .append(",NullFirst=").append(nullFirsts[i]);
+      prefix = " ,";
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index e0f8a2e..8b7e2e0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -23,7 +23,7 @@ package org.apache.tajo.storage;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
 
@@ -71,7 +71,12 @@ public class FrameTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -177,6 +182,11 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
   public char [] getUnicodeChars(int fieldId) {
     return get(fieldId).asUnicodeChars();
   }

Reply via email to