Repository: tajo
Updated Branches:
  refs/heads/master 4e0a7a33c -> 550c0189b


TAJO-1983: Improve memory usage of ExternalSortExec.

Closes #869


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/550c0189
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/550c0189
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/550c0189

Branch: refs/heads/master
Commit: 550c0189b9ac06f20d4ef70feb7ef743c0b06d0f
Parents: 4e0a7a3
Author: Jinho Kim <[email protected]>
Authored: Thu Nov 26 11:57:02 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Thu Nov 26 11:57:02 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/tuple/BaseTupleBuilder.java |  23 +-
 .../tuple/memory/CompactRowBlockWriter.java     |  30 +-
 .../tajo/tuple/memory/OffHeapRowBlockUtils.java |  74 ++-
 .../tuple/memory/OffHeapRowBlockWriter.java     |  25 +-
 .../tajo/tuple/memory/OffHeapRowWriter.java     |  16 +
 .../tajo/tuple/memory/ResizableMemoryBlock.java |  10 +-
 .../org/apache/tajo/tuple/memory/RowWriter.java |   9 +-
 .../tajo/tuple/memory/UnSafeTupleList.java      | 105 ++++
 .../tajo/tuple/memory/TestMemoryRowBlock.java   |  40 +-
 .../physical/TestProgressExternalSortExec.java  |   8 +-
 .../planner/physical/TestUnSafeTuple.java       | 104 ++++
 .../tajo/querymaster/TestTaskStatusUpdate.java  |   4 +-
 .../planner/physical/ExternalSortExec.java      | 482 +++++++++++++------
 .../engine/planner/physical/PhysicalExec.java   |   4 +
 .../tajo/worker/ExecutionBlockContext.java      |   5 +
 .../java/org/apache/tajo/worker/Fetcher.java    |  17 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |  29 +-
 .../tajo/pullserver/TajoPullServerService.java  |  24 +-
 .../tajo/storage/BaseTupleComparator.java       |  45 +-
 .../org/apache/tajo/storage/NullScanner.java    |   1 -
 21 files changed, 831 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b9d73c8..00b9129 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1983: Improve memory usage of ExternalSortExec. (jinho)
+
     TAJO-1986: Rename the name 'option' to 'property' in TableMeta. (hyunsik)
 
     TAJO-1988: Remove some duplicated codes in toString() of Projectable.

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
index ebdcc26..d962218 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.tuple;
 
 import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.tuple.memory.*;
 import org.apache.tajo.unit.StorageUnit;
@@ -54,6 +55,11 @@ public class BaseTupleBuilder extends OffHeapRowWriter 
implements TupleBuilder,
   }
 
   @Override
+  public void backward(int length) {
+    memoryBlock.writerPosition(memoryBlock.writerPosition() - length);
+  }
+
+  @Override
   public boolean startRow() {
     memoryBlock.writerPosition(0);
     return super.startRow();
@@ -65,13 +71,18 @@ public class BaseTupleBuilder extends OffHeapRowWriter 
implements TupleBuilder,
   }
 
   @Override
-  public void addTuple(Tuple tuple) {
-    if (tuple instanceof UnSafeTuple) {
-      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
-      addTuple(unSafeTuple);
-    } else {
-      OffHeapRowBlockUtils.convert(tuple, this);
+  public boolean addTuple(Tuple tuple) {
+    try {
+      if (tuple instanceof UnSafeTuple) {
+        UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
+        addTuple(unSafeTuple);
+      } else {
+        OffHeapRowBlockUtils.convert(tuple, this);
+      }
+    } catch (ValueOutOfRangeException e) {
+      return false;
     }
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
index a88d2f1..67d5f8c 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
@@ -25,6 +25,7 @@ import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.BitArray;
 import org.apache.tajo.util.SizeOf;
@@ -182,6 +183,15 @@ public class CompactRowBlockWriter implements RowWriter {
     rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() 
+ length);
   }
 
+  /**
+   * Backward the address;
+   *
+   * @param length Length to be backwarded
+   */
+  public void backward(int length) {
+    rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() 
- length);
+  }
+
   public void ensureSize(int size) {
     rowBlock.getMemory().ensureSize(size);
   }
@@ -232,7 +242,7 @@ public class CompactRowBlockWriter implements RowWriter {
     return true;
   }
 
-
+  @Override
   public void endRow() {
     long rowHeaderPos = recordStartAddr();
     // curOffset is equivalent to a byte length of this row.
@@ -249,6 +259,15 @@ public class CompactRowBlockWriter implements RowWriter {
   }
 
   @Override
+  public void cancelRow() {
+    // curOffset is equivalent to a byte length of current row.
+    backward(curOffset);
+    curOffset = 0;
+    nullFlags.clear();
+    curFieldIdx = 0;
+  }
+
+  @Override
   public void skipField() {
     // set null flag
     nullFlags.set(curFieldIdx);
@@ -402,7 +421,12 @@ public class CompactRowBlockWriter implements RowWriter {
   }
 
   @Override
-  public void addTuple(Tuple tuple) {
-    OffHeapRowBlockUtils.convert(tuple, this);
+  public boolean addTuple(Tuple tuple) {
+    try {
+      OffHeapRowBlockUtils.convert(tuple, this);
+    } catch (ValueOutOfRangeException e) {
+      return false;
+    }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
index 1aca22f..3f27763 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
@@ -19,10 +19,13 @@
 package org.apache.tajo.tuple.memory;
 
 import com.google.common.collect.Lists;
+import com.google.common.primitives.*;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.tuple.RowBlockReader;
 
@@ -62,6 +65,11 @@ public class OffHeapRowBlockUtils {
     return tupleList;
   }
 
+  public static List<UnSafeTuple> sort(UnSafeTupleList list, 
Comparator<UnSafeTuple> comparator) {
+    Collections.sort(list, comparator);
+    return list;
+  }
+
   public static Tuple[] sortToArray(MemoryRowBlock rowBlock, Comparator<Tuple> 
comparator) {
     Tuple[] tuples = new Tuple[rowBlock.rows()];
 
@@ -85,18 +93,74 @@ public class OffHeapRowBlockUtils {
     return tuples;
   }
 
+  public static final int compareColumn(UnSafeTuple tuple1, UnSafeTuple 
tuple2, int index, TajoDataTypes.Type type,
+                                         boolean ascending, boolean nullFirst) 
{
+    final boolean n1 = tuple1.isBlankOrNull(index);
+    final boolean n2 = tuple2.isBlankOrNull(index);
+    if (n1 && n2) {
+      return 0;
+    }
+
+    if (n1 ^ n2) {
+      return nullFirst ? (n1 ? -1 : 1) : (n1 ? 1 : -1);
+    }
+
+    int compare;
+    switch (type) {
+    case BOOLEAN:
+      compare = Booleans.compare(tuple1.getBool(index), tuple2.getBool(index));
+      break;
+    case BIT:
+      compare = tuple1.getByte(index) - tuple2.getByte(index);
+      break;
+    case INT1:
+    case INT2:
+      compare = Shorts.compare(tuple1.getInt2(index), tuple2.getInt2(index));
+      break;
+    case DATE:
+    case INT4:
+      compare = Ints.compare(tuple1.getInt4(index), tuple2.getInt4(index));
+      break;
+    case INET4:
+      compare = UnsignedInts.compare(tuple1.getInt4(index), 
tuple2.getInt4(index));
+      break;
+    case TIME:
+    case TIMESTAMP:
+    case INT8:
+      compare = Longs.compare(tuple1.getInt8(index), tuple2.getInt8(index));
+      break;
+    case FLOAT4:
+      compare = Floats.compare(tuple1.getFloat4(index), 
tuple2.getFloat4(index));
+      break;
+    case FLOAT8:
+      compare = Doubles.compare(tuple1.getFloat8(index), 
tuple2.getFloat8(index));
+      break;
+    case CHAR:
+    case TEXT:
+    case BLOB:
+      compare = UnSafeTupleBytesComparator.compare(tuple1.getFieldAddr(index), 
tuple2.getFieldAddr(index));
+      break;
+    default:
+      throw new TajoRuntimeException(
+          new UnsupportedException("unknown data type '" + type.name() + "'"));
+    }
+    return ascending ? compare : -compare;
+  }
   /**
    * This class is tuple converter to the RowBlock
    */
   public static class TupleConverter {
 
     public void convert(Tuple tuple, RowWriter writer) {
-      writer.startRow();
-
-      for (int i = 0; i < writer.dataTypes().length; i++) {
-        writeField(i, tuple, writer);
+      try {
+        writer.startRow();
+        for (int i = 0; i < writer.dataTypes().length; i++) {
+          writeField(i, tuple, writer);
+        }
+      } catch (ValueOutOfRangeException e) {
+        writer.cancelRow();
+        throw e;
       }
-
       writer.endRow();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
index 9f3d8a2..594f927 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
@@ -20,6 +20,7 @@ package org.apache.tajo.tuple.memory;
 
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.TUtil;
 
@@ -48,6 +49,11 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter {
     rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() 
+ length);
   }
 
+  @Override
+  public void backward(int length) {
+    rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() 
- length);
+  }
+
   public void ensureSize(int size) {
     rowBlock.getMemory().ensureSize(size);
   }
@@ -65,13 +71,18 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter 
{
 
 
   @Override
-  public void addTuple(Tuple tuple) {
-    if (tuple instanceof UnSafeTuple) {
-      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
-      addTuple(unSafeTuple);
-      rowBlock.setRows(rowBlock.rows() + 1);
-    } else {
-      OffHeapRowBlockUtils.convert(tuple, this);
+  public boolean addTuple(Tuple tuple) {
+    try {
+      if (tuple instanceof UnSafeTuple) {
+        UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
+        addTuple(unSafeTuple);
+        rowBlock.setRows(rowBlock.rows() + 1);
+      } else {
+        OffHeapRowBlockUtils.convert(tuple, this);
+      }
+    } catch (ValueOutOfRangeException e) {
+      return false;
     }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
index f082762..e992aed 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
@@ -94,6 +94,12 @@ public abstract class OffHeapRowWriter implements RowWriter {
    */
   public abstract void forward(int length);
 
+  /**
+   * Backward the address;
+   *
+   * @param length Length to be backwarded
+   */
+  public abstract void backward(int length);
 
   @Override
   public void clear() {
@@ -130,6 +136,16 @@ public abstract class OffHeapRowWriter implements 
RowWriter {
       PlatformDependent.putInt(rowHeaderPos, MemoryRowBlock.NULL_FIELD_OFFSET);
       rowHeaderPos += SizeOf.SIZE_OF_INT;
     }
+    curOffset = 0;
+  }
+
+  @Override
+  public void cancelRow() {
+    // curOffset is equivalent to a byte length of current row.
+    backward(curOffset);
+    curOffset = 0;
+    curFieldOffset = SizeOf.SIZE_OF_INT;
+    curFieldIdx = 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
index 09faff9..6b06486 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.storage.BufferPool;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.UnsafeUtil;
@@ -128,11 +129,12 @@ public class ResizableMemoryBlock implements MemoryBlock {
   @Override
   public void ensureSize(int size) {
     if (!buffer.isWritable(size)) {
-      if (!limitSpec.canIncrease(size)) {
-        throw new RuntimeException("Cannot increase RowBlock anymore.");
+      int newBlockSize = limitSpec.increasedSize(size);
+
+      if (!limitSpec.canIncrease(buffer.writableBytes() + newBlockSize)) {
+        throw new ValueOutOfRangeException("Cannot increase RowBlock 
anymore.");
       }
 
-      int newBlockSize = limitSpec.increasedSize(size);
       resize(newBlockSize);
       LOG.info("Increase DirectRowBlock to " + 
FileUtil.humanReadableByteCount(newBlockSize, false));
     }
@@ -142,7 +144,7 @@ public class ResizableMemoryBlock implements MemoryBlock {
     Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 
bytes");
 
     if (newSize > limitSpec.limit()) {
-      throw new RuntimeException("Resize cannot exceed the capacity limit");
+      throw new ValueOutOfRangeException("Resize cannot exceed the capacity 
limit");
     }
 
     if (newSize < buffer.writableBytes()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
index 0393714..e055ab7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
@@ -30,6 +30,11 @@ import org.apache.tajo.storage.Tuple;
  *   startRow() -->  skipField() or putXXX --> endRow()
  * </pre>
  *
+ * If you want to cancel the current row, should be as follows:
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> cancelRow()
+ * </pre>
+ *
  * The total number of skipField and putXXX invocations must be equivalent to 
the number of fields.
  */
 public interface RowWriter {
@@ -40,6 +45,8 @@ public interface RowWriter {
 
   void endRow();
 
+  void cancelRow();
+
   void skipField();
 
   void clear();
@@ -76,5 +83,5 @@ public interface RowWriter {
 
   void putProtoDatum(ProtobufDatum datum);
 
-  void addTuple(Tuple tuple);
+  boolean addTuple(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
new file mode 100644
index 0000000..4c4a6cb
--- /dev/null
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * In UnSafeTupleList, input tuples are copied to off-heap memory page 
whenever the add() method is called.
+ * The memory pages are automatically added, if memory of a page are exceeded.
+ * This instance must be released
+ */
+public class UnSafeTupleList extends ArrayList<UnSafeTuple> {
+
+  private final DataType[] dataTypes;
+  private List<MemoryRowBlock> rowBlocks;
+  private MemoryRowBlock currentRowBlock;
+  private int totalUsedMem;
+  private int pageSize;
+
+  public UnSafeTupleList(DataType[] dataTypes, int initialArraySize) {
+    this(dataTypes, initialArraySize, StorageUnit.MB);
+
+  }
+
+  public UnSafeTupleList(DataType[] dataTypes, int initialArraySize, int 
pageSize) {
+    super(initialArraySize);
+    this.dataTypes = dataTypes;
+    this.pageSize = pageSize;
+    this.rowBlocks = Lists.newArrayList();
+    this.currentRowBlock = new MemoryRowBlock(dataTypes, new 
FixedSizeLimitSpec(pageSize), true);
+    this.rowBlocks.add(currentRowBlock);
+
+  }
+
+  @Override
+  public boolean add(UnSafeTuple tuple) {
+    return addTuple(tuple);
+  }
+
+  public boolean addTuple(Tuple tuple) {
+
+    int prevPos = currentRowBlock.getMemory().writerPosition();
+    if (currentRowBlock.getWriter().addTuple(tuple)) {
+      UnSafeTuple unSafeTuple = new UnSafeTuple();
+      unSafeTuple.set(currentRowBlock.getMemory(), prevPos, dataTypes);
+      return super.add(unSafeTuple);
+    } else {
+      this.totalUsedMem += currentRowBlock.usedMem();
+      this.currentRowBlock = new MemoryRowBlock(dataTypes, new 
FixedSizeLimitSpec(pageSize), true);
+      this.rowBlocks.add(currentRowBlock);
+      return this.addTuple(tuple);
+    }
+  }
+
+  /**
+   * Release the cached pages
+   */
+  public void release() {
+    for (MemoryRowBlock rowBlock : rowBlocks) {
+      rowBlock.release();
+    }
+    super.clear();
+    rowBlocks.clear();
+    totalUsedMem = 0;
+  }
+
+  /**
+   * Total used memory
+   */
+  public int usedMem() {
+    return totalUsedMem + currentRowBlock.usedMem();
+  }
+
+  /**
+   * Release and reset
+   */
+  @Override
+  public void clear() {
+    release();
+    this.currentRowBlock = new MemoryRowBlock(dataTypes, new 
FixedSizeLimitSpec(pageSize), true);
+    this.rowBlocks.add(currentRowBlock);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
 
b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
index a6003c7..4e77d68 100644
--- 
a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
+++ 
b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.ValueOutOfRangeException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -39,9 +40,7 @@ import java.util.Comparator;
 import java.util.List;
 
 import static org.apache.tajo.common.TajoDataTypes.Type;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestMemoryRowBlock {
   private static final Log LOG = LogFactory.getLog(TestMemoryRowBlock.class);
@@ -115,6 +114,41 @@ public class TestMemoryRowBlock {
   }
 
   @Test
+  public void testPutAndCancelValidation() {
+    VTuple vTuple = new VTuple(schema.length);
+    fillVTuple(0, vTuple);
+
+    //get memory size of 1 row
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema);
+    fillRow(0, rowBlock.getWriter());
+    int rowSize = rowBlock.usedMem();
+    rowBlock.release();
+
+    rowBlock = new MemoryRowBlock(schema, new FixedSizeLimitSpec(rowSize / 2, 
0.0f), true);
+    assertFalse(rowBlock.getWriter().addTuple(vTuple));
+    try {
+      OffHeapRowBlockUtils.convert(vTuple, rowBlock.getWriter());
+      fail();
+    } catch (Exception e) {
+      assertEquals(ValueOutOfRangeException.class, e.getClass());
+    }
+    rowBlock.release();
+
+    //allow 1 row
+    rowBlock = new MemoryRowBlock(schema, new FixedSizeLimitSpec(rowSize, 
0.0f), true);
+    assertTrue(rowBlock.getWriter().addTuple(vTuple));
+    assertFalse(rowBlock.getWriter().addTuple(vTuple));
+    assertEquals(1, rowBlock.rows());
+
+    ZeroCopyTuple tuple = new UnSafeTuple();
+    RowBlockReader reader = rowBlock.getReader();
+    assertTrue(reader.next(tuple));
+    validateTupleResult(0, tuple);
+    assertFalse(reader.next(tuple));
+    rowBlock.release();
+  }
+
+  @Test
   public void testNullityValidation() {
     int rowNum = 1000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index b1f53da..15250a3 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -110,6 +110,7 @@ public class TestProgressExternalSortExec {
     catalog.createTable(employee);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    employeePath.getFileSystem(conf).deleteOnExit(employeePath);
   }
 
   @After
@@ -125,9 +126,8 @@ public class TestProgressExternalSortExec {
   @Test
   public void testExternalSortExecProgressWithMemTableScanner() throws 
Exception {
     QueryContext queryContext = 
LocalTajoTestingUtility.createDummyContext(conf);
-    int bufferSize = (int) (testDataStats.getNumBytes() * 20) / 
StorageUnit.MB; //multiply 2 for memory fit
+    int bufferSize = (int) (testDataStats.getNumBytes() * 2) / StorageUnit.MB; 
//multiply 2 for memory fit
     queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize);
-
     testProgress(queryContext);
   }
 
@@ -197,7 +197,7 @@ public class TestProgressExternalSortExec {
     assertEquals(testDataStats.getNumBytes().longValue(), 
tableStats.getNumBytes().longValue());
     assertEquals(testDataStats.getNumRows().longValue(), cnt);
     assertEquals(testDataStats.getNumRows().longValue(), 
tableStats.getNumRows().longValue());
-    assertTrue(testDataStats.getNumBytes().longValue() <= 
tableStats.getReadBytes().longValue());
+    assertTrue(testDataStats.getNumBytes() <= tableStats.getReadBytes());
 
     // for rescan test
     preVal = null;
@@ -223,7 +223,7 @@ public class TestProgressExternalSortExec {
     assertEquals(testDataStats.getNumRows().longValue(), cnt);
     assertEquals(testDataStats.getNumRows().longValue(), 
tableStats.getNumRows().longValue());
     //'ReadBytes' is actual read bytes
-    assertTrue(testDataStats.getNumBytes().longValue() <= 
tableStats.getReadBytes().longValue());
+    assertTrue(testDataStats.getNumBytes() <= tableStats.getReadBytes());
 
     conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 
ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.defaultIntVal);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java
new file mode 100644
index 0000000..48170f6
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
+import org.apache.tajo.unit.StorageUnit;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestUnSafeTuple {
+
+  private static final Random rnd = new Random(-1);
+  private static Schema schema;
+
+  @BeforeClass
+  public static void setupClass() {
+    Column col0 = new Column("col0", Type.BOOLEAN);
+    Column col1 = new Column("col1", Type.INT4);
+    Column col2 = new Column("col2", Type.INT8);
+    Column col3 = new Column("col3", Type.FLOAT4);
+    Column col4 = new Column("col4", Type.FLOAT8);
+
+    schema = new Schema(new Column[]{col0, col1, col2, col3, col4});
+  }
+
+  @Test
+  public final void testMemoryPageAndValidation() {
+
+    Datum[] datums = new Datum[]{
+        DatumFactory.createBool(rnd.nextBoolean()),
+        DatumFactory.createInt4(rnd.nextInt()),
+        DatumFactory.createInt8(rnd.nextLong()),
+        DatumFactory.createFloat4(rnd.nextFloat()),
+        DatumFactory.createFloat8(rnd.nextDouble())};
+    Tuple tuple = new VTuple(datums);
+
+    int pageSize = StorageUnit.KB;
+    UnSafeTupleList unSafeTupleList = new 
UnSafeTupleList(SchemaUtil.toDataTypes(schema), 100, StorageUnit.KB);
+    assertEquals(0, unSafeTupleList.usedMem());
+    assertEquals(0, unSafeTupleList.size());
+
+    unSafeTupleList.addTuple(tuple);
+    //get the memory bytes of tuple
+    int tupleSize = unSafeTupleList.usedMem();
+    assertEquals(1, unSafeTupleList.size());
+    assertEquals(tuple, unSafeTupleList.get(0));
+
+    unSafeTupleList.clear();
+    assertEquals(0, unSafeTupleList.usedMem());
+    assertEquals(0, unSafeTupleList.size());
+
+    //test only 1 page
+    int testCount = pageSize / tupleSize;
+    Tuple[] tuples = new Tuple[testCount];
+
+    for (int i = 0; i < testCount; i++) {
+      datums = new Datum[]{
+          DatumFactory.createBool(rnd.nextBoolean()),
+          DatumFactory.createInt4(rnd.nextInt()),
+          DatumFactory.createInt8(rnd.nextLong()),
+          DatumFactory.createFloat4(rnd.nextFloat()),
+          DatumFactory.createFloat8(rnd.nextDouble())};
+      tuples[i] = new VTuple(datums);
+      unSafeTupleList.addTuple(tuples[i]);
+    }
+
+    assertEquals(testCount, unSafeTupleList.size());
+    assertEquals(tupleSize * testCount, unSafeTupleList.usedMem());
+
+    for (int i = 0; i < testCount; i++) {
+      assertEquals(tuples[i], unSafeTupleList.get(i));
+    }
+
+    unSafeTupleList.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
index 9f5ecb5..425a7d6 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
@@ -79,7 +79,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
       // tpch/lineitem.tbl
       long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2};
       long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 236};
-      long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0};
+      long[] expectedReadBytes = new long[]{604, 604, 236, 0, 138, 0};
 
       QueryId queryId = getQueryId(res);
       assertStatus(queryId, 3, expectedNumRows, expectedNumBytes, 
expectedReadBytes);
@@ -108,7 +108,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase 
{
       // in/out * stage(4)
       long[] expectedNumRows = new long[]{5, 5, 2, 2, 2, 2, 2, 2};
       long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64};
-      long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0};
+      long[] expectedReadBytes = new long[]{20, 20, 8, 8, 64, 0, 34, 0};
 
       QueryId queryId = getQueryId(res);
       assertStatus(queryId, 4, expectedNumRows, expectedNumBytes, 
expectedReadBytes);

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 8abafec..69631b5 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.*;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -26,18 +29,24 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
+import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
@@ -48,9 +57,10 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.*;
-
-import static org.apache.tajo.storage.RawFile.RawFileAppender;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
  * This external sort algorithm can be characterized by the followings:
@@ -75,15 +85,19 @@ public class ExternalSortExec extends SortExec {
   /** the defaultFanout of external sort */
   private final int defaultFanout;
   /** It's the size of in-memory table. If memory consumption exceeds it, 
store the memory table into a disk. */
-  private long sortBufferBytesNum;
+  private int sortBufferBytesNum;
   /** the number of available cores */
   private final int allocatedCoreNum;
   /** If there are available multiple cores, it tries parallel merge. */
   private ExecutorService executorService;
   /** used for in-memory sort of each chunk. */
-  private TupleList inMemoryTable;
+  private UnSafeTupleList inMemoryTable;
+  /** for zero copy tuple comparison */
+  private Comparator<UnSafeTuple> unSafeComparator;
+  /** for other type tuple comparison */
+  private Comparator<Tuple> primitiveComparator;
   /** temporal dir */
-  private final Path sortTmpDir;
+  private Path sortTmpDir;
   /** It enables round-robin disks allocation */
   private final LocalDirAllocator localDirAllocator;
   /** local file system */
@@ -98,12 +112,10 @@ public class ExternalSortExec extends SortExec {
   ///////////////////////////////////////////////////
   /** already sorted or not */
   private boolean sorted = false;
-  /** a flag to point whether sorted data resides in memory or not */
-  private boolean memoryResident = true;
   /** the final result */
   private Scanner result;
   /** total bytes of input data */
-  private long sortAndStoredBytes;
+  private long inputBytes;
 
   private ExternalSortExec(final TaskAttemptContext context, final SortNode 
plan)
       throws PhysicalPlanningException {
@@ -115,15 +127,12 @@ public class ExternalSortExec extends SortExec {
       throw new 
PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " 
cannot be lower than 2");
     }
     // TODO - sort buffer and core num should be changed to use the allocated 
container resource.
-    this.sortBufferBytesNum = 
context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * 
StorageUnit.MB;
+    this.sortBufferBytesNum = 
context.getQueryContext().getInt(SessionVars.EXTSORT_BUFFER_SIZE) * 
StorageUnit.MB;
     this.allocatedCoreNum = 
context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
-    this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
-    this.inMemoryTable = new 
TupleList(context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE));
-
-    this.sortTmpDir = getExecutorTmpDir();
     this.localDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
     this.localFS = new RawLocalFileSystem();
-    this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW); 
//TODO change to SHUFFLE_FILE_FORMAT
+    this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
+    this.inputStats = new TableStats();
   }
 
   public ExternalSortExec(final TaskAttemptContext context,final SortNode 
plan, final ScanNode scanNode,
@@ -133,7 +142,7 @@ public class ExternalSortExec extends SortExec {
     mergedInputFragments = TUtil.newList();
     for (CatalogProtos.FragmentProto proto : fragments) {
       FileFragment fragment = FragmentConvertor.convert(FileFragment.class, 
proto);
-      mergedInputFragments.add(new Chunk(fragment, 
scanNode.getTableDesc().getMeta()));
+      mergedInputFragments.add(new Chunk(inSchema, fragment, 
scanNode.getTableDesc().getMeta()));
     }
   }
 
@@ -143,8 +152,19 @@ public class ExternalSortExec extends SortExec {
     setChild(child);
   }
 
+  @Override
   public void init() throws IOException {
-    inputStats = new TableStats();
+    if(allocatedCoreNum > 1) {
+      this.executorService = 
Executors.newFixedThreadPool(this.allocatedCoreNum);
+    }
+
+    this.sortTmpDir = getExecutorTmpDir();
+
+    int initialArraySize = 
context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE);
+    this.inMemoryTable = new UnSafeTupleList(SchemaUtil.toDataTypes(inSchema), 
initialArraySize);
+    this.unSafeComparator = new UnSafeComparator(inSchema, sortSpecs);
+    this.primitiveComparator = new PrimitiveComparator(inSchema, sortSpecs);
+
     super.init();
   }
 
@@ -155,29 +175,25 @@ public class ExternalSortExec extends SortExec {
   /**
    * Sort a tuple block and store them into a chunk file
    */
-  private Chunk sortAndStoreChunk(int chunkId, TupleList tupleBlock)
+  private Chunk sortAndStoreChunk(int chunkId, UnSafeTupleList tupleBlock)
       throws IOException {
-
     int rowNum = tupleBlock.size();
 
     long sortStart = System.currentTimeMillis();
-    Iterable<Tuple> sorted = getSorter(tupleBlock).sort();
+    OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator);
     long sortEnd = System.currentTimeMillis();
 
     long chunkWriteStart = System.currentTimeMillis();
     Path outputPath = getChunkPathForWrite(0, chunkId);
-    final RawFileAppender appender =
-        new RawFileAppender(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
-
+    final DirectRawFileWriter appender =
+        new DirectRawFileWriter(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
     appender.init();
-    for (Tuple t : sorted) {
+    for (Tuple t : tupleBlock) {
       appender.addTuple(t);
     }
     appender.close();
-    tupleBlock.clear();
     long chunkWriteEnd = System.currentTimeMillis();
 
-
     info(LOG, "Chunk #" + chunkId + " sort and written (" +
         FileUtil.humanReadableByteCount(appender.getOffset(), false) + " 
bytes, " + rowNum + " rows, " +
         "sort time: " + (sortEnd - sortStart) + " msec, " +
@@ -185,7 +201,7 @@ public class ExternalSortExec extends SortExec {
 
     FileFragment frag = new FileFragment("", outputPath, 0,
         new File(localFS.makeQualified(outputPath).toUri()).length());
-    return new Chunk(frag, intermediateMeta);
+    return new Chunk(inSchema, frag, intermediateMeta);
   }
 
   /**
@@ -196,26 +212,23 @@ public class ExternalSortExec extends SortExec {
    */
   private List<Chunk> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
-    long memoryConsumption = 0;
     List<Chunk> chunkPaths = TUtil.newList();
 
     int chunkId = 0;
     long runStartTime = System.currentTimeMillis();
+
     while (!context.isStopped() && (tuple = child.next()) != null) { // 
partition sort start
-      inMemoryTable.add(tuple);
-      memoryConsumption += MemoryUtil.calculateMemorySize(tuple);
+      inMemoryTable.addTuple(tuple);
 
-      if (memoryConsumption > sortBufferBytesNum) {
+      if (inMemoryTable.usedMem() > sortBufferBytesNum) { // if input data 
exceeds main-memory at least once
         long runEndTime = System.currentTimeMillis();
-        info(LOG, chunkId + " run loading time: " + (runEndTime - 
runStartTime) + " msec");
+        info(LOG, "Chunk #" + chunkId + " run loading time: " + (runEndTime - 
runStartTime) + " msec");
         runStartTime = runEndTime;
 
-        info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " 
bytes");
-        memoryResident = false;
+        info(LOG, "Memory consumption exceeds " + 
FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false));
 
         chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
-
-        memoryConsumption = 0;
+        inMemoryTable.clear();
         chunkId++;
 
         // When the volume of sorting data once exceed the size of sort buffer,
@@ -233,19 +246,15 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
-    if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at 
least one or more input tuples
-      // check if data exceeds a sort buffer. If so, it store the remain data 
into a chunk.
-      long start = System.currentTimeMillis();
-      int rowNum = inMemoryTable.size();
-      chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
-      long end = System.currentTimeMillis();
-      info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + 
(end - start) + " msec)");
+    if(inMemoryTable.size() > 0) { //if there are at least one or more input 
tuples
+      //store the remain data into a memory chunk.
+      chunkPaths.add(new Chunk(inSchema, inMemoryTable, intermediateMeta));
     }
 
     // get total loaded (or stored) bytes and total row numbers
     TableStats childTableStats = child.getInputStats();
     if (childTableStats != null) {
-      sortAndStoredBytes = childTableStats.getNumBytes();
+      inputBytes = childTableStats.getNumBytes();
     }
     return chunkPaths;
   }
@@ -267,6 +276,7 @@ public class ExternalSortExec extends SortExec {
       if (mergedInputFragments != null) {
         try {
           this.result = externalMergeAndSort(mergedInputFragments);
+          this.inputBytes = result.getInputStats().getNumBytes();
         } catch (Exception e) {
           throw new PhysicalPlanningException(e);
         }
@@ -275,19 +285,16 @@ public class ExternalSortExec extends SortExec {
         long startTimeOfChunkSplit = System.currentTimeMillis();
         List<Chunk> chunks = sortAndStoreAllChunks();
         long endTimeOfChunkSplit = System.currentTimeMillis();
-        info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - 
startTimeOfChunkSplit) + " msec");
-
-        if (memoryResident) { // if all sorted data reside in a main-memory 
table.
-          TupleSorter sorter = getSorter(inMemoryTable);
-          result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), 
sortAndStoredBytes);
-        } else { // if input data exceeds main-memory at least once
+        info(LOG, chunks.size() + " Chunks creation time: " + 
(endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
 
+        if(chunks.size() == 0) {
+          this.result = new NullScanner(context.getConf(), inSchema, 
intermediateMeta, null);
+        } else {
           try {
             this.result = externalMergeAndSort(chunks);
           } catch (Exception e) {
             throw new PhysicalPlanningException(e);
           }
-
         }
       }
 
@@ -324,8 +331,7 @@ public class ExternalSortExec extends SortExec {
     return computedFanout;
   }
 
-  private Scanner externalMergeAndSort(List<Chunk> chunks)
-      throws IOException, ExecutionException, InterruptedException {
+  private Scanner externalMergeAndSort(List<Chunk> chunks) throws Exception {
     int level = 0;
     final List<Chunk> inputFiles = TUtil.newList(chunks);
     final List<Chunk> outputFiles = TUtil.newList();
@@ -352,8 +358,14 @@ public class ExternalSortExec extends SortExec {
         // how many files are merged in ith thread?
         numberOfMergingFiles.add(fanout);
         // launch a merger runner
-        futures.add(executorService.submit(
-            new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, 
fanout, false)));
+        if(allocatedCoreNum > 1) {
+          futures.add(executorService.submit(
+              new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, 
fanout, false)));
+        } else {
+          final SettableFuture<Chunk> future = SettableFuture.create();
+          future.set(new KWayMergerCaller(level, outChunkId++, inputFiles, 
startIdx, fanout, false).call());
+          futures.add(future);
+        }
         outputFileNum++;
 
         startIdx += fanout;
@@ -406,14 +418,23 @@ public class ExternalSortExec extends SortExec {
        */
       int numDeletedFiles = 0;
       for (Chunk chunk : inputFiles) {
-        if 
(chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
+        if (chunk.isMemory()) {
+          if (LOG.isDebugEnabled()) {
+            debug(LOG, "Remove intermediate memory tuples: " + 
chunk.getMemoryTuples().usedMem());
+          }
+          chunk.getMemoryTuples().release();
+        } else if 
(chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
           localFS.delete(chunk.getFragment().getPath(), true);
           numDeletedFiles++;
 
-          if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: 
" + chunk.getFragment());
+          if (LOG.isDebugEnabled()) {
+            debug(LOG, "Delete merged intermediate file: " + 
chunk.getFragment());
+          }
         }
       }
-      info(LOG, numDeletedFiles + " merged intermediate files deleted");
+      if(LOG.isDebugEnabled()) {
+        debug(LOG, numDeletedFiles + " merged intermediate files deleted");
+      }
 
       // switch input files to output files, and then clear outputFiles
       inputFiles.clear();
@@ -459,11 +480,14 @@ public class ExternalSortExec extends SortExec {
       final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
       info(LOG, mergeFanout + " files are being merged to an output file " + 
outputPath.getName());
       long mergeStartTime = System.currentTimeMillis();
-      final RawFileAppender output =
-          new RawFileAppender(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
-      output.init();
+
       final Scanner merger = createKWayMerger(inputFiles, startIdx, 
mergeFanout);
       merger.init();
+
+      final DirectRawFileWriter output =
+          new DirectRawFileWriter(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
+      output.init();
+
       Tuple mergeTuple;
       while((mergeTuple = merger.next()) != null) {
         output.addTuple(mergeTuple);
@@ -476,7 +500,7 @@ public class ExternalSortExec extends SortExec {
           + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
       File f = new File(localFS.makeQualified(outputPath).toUri());
       FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + 
outputPath.getName(), outputPath, 0, f.length());
-      return new Chunk(frag, intermediateMeta);
+      return new Chunk(inSchema, frag, intermediateMeta);
     }
   }
 
@@ -492,23 +516,35 @@ public class ExternalSortExec extends SortExec {
    */
   private Scanner createFinalMerger(List<Chunk> inputs) throws IOException {
     if (inputs.size() == 1) {
-      this.result = getFileScanner(inputs.get(0));
+      this.result = getScanner(inputs.get(0));
     } else {
       this.result = createKWayMerger(inputs, 0, inputs.size());
     }
     return result;
   }
 
-  private Scanner getFileScanner(Chunk chunk) throws IOException {
-    return TablespaceManager.getLocalFs().getScanner(chunk.getMeta(), 
inSchema, chunk.getFragment(), outSchema);
+  private Scanner getScanner(Chunk chunk) throws IOException {
+    if (chunk.isMemory()) {
+      long sortStart = System.currentTimeMillis();
+
+      OffHeapRowBlockUtils.sort(inMemoryTable, unSafeComparator);
+      Scanner scanner = new MemTableScanner<>(inMemoryTable, 
inMemoryTable.size(), inMemoryTable.usedMem());
+      if(LOG.isDebugEnabled()) {
+        debug(LOG, "Memory Chunk sort (" + 
FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)
+            + " bytes, " + inMemoryTable.size() + " rows, sort time: "
+            + (System.currentTimeMillis() - sortStart) + " msec)");
+      }
+      return scanner;
+    } else {
+      return TablespaceManager.getLocalFs().getScanner(chunk.meta, 
chunk.schema, chunk.fragment, chunk.schema);
+    }
   }
 
   private Scanner createKWayMerger(List<Chunk> inputs, final int startChunkId, 
final int num) throws IOException {
     final Scanner [] sources = new Scanner[num];
     for (int i = 0; i < num; i++) {
-      sources[i] = getFileScanner(inputs.get(startChunkId + i));
+      sources[i] = getScanner(inputs.get(startChunkId + i));
     }
-
     return createKWayMergerInternal(sources, 0, num);
   }
 
@@ -518,27 +554,24 @@ public class ExternalSortExec extends SortExec {
       final int mid = (int) Math.ceil((float)num / 2);
       Scanner left = createKWayMergerInternal(sources, startIdx, mid);
       Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - 
mid);
-      if (ComparableVector.isVectorizable(sortSpecs)) {
-        return new VectorComparePairWiseMerger(inSchema, left, right, 
comparator);
-      }
-      return new PairWiseMerger(inSchema, left, right, comparator);
+      return new PairWiseMerger(inSchema, left, right, primitiveComparator);
     } else {
       return sources[startIdx];
     }
   }
 
-  private static class MemTableScanner extends AbstractScanner {
-    final Iterable<Tuple> iterable;
+  private static class MemTableScanner<T extends Tuple> extends 
AbstractScanner {
+    final Iterable<T> iterable;
     final long sortAndStoredBytes;
     final int totalRecords;
 
-    Iterator<Tuple> iterator;
+    Iterator<T> iterator;
     // for input stats
     float scannerProgress;
     int numRecords;
     TableStats scannerTableStats;
 
-    public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) 
{
+    public MemTableScanner(Iterable<T> iterable, int length, long inBytes) {
       this.iterable = iterable;
       this.totalRecords = length;
       this.sortAndStoredBytes = inBytes;
@@ -601,30 +634,6 @@ public class ExternalSortExec extends SortExec {
     CLOSED
   }
 
-  private static class VectorComparePairWiseMerger extends PairWiseMerger {
-
-    private ComparableVector comparable;
-
-    public VectorComparePairWiseMerger(Schema schema, Scanner leftScanner, 
Scanner rightScanner,
-                                       BaseTupleComparator comparator) throws 
IOException {
-      super(schema, leftScanner, rightScanner, null);
-      comparable = new ComparableVector(2, comparator.getSortSpecs(), 
comparator.getSortKeyIds());
-    }
-
-    @Override
-    protected Tuple prepare(int index, Tuple tuple) {
-      if (tuple != null) {
-        comparable.set(index, tuple);
-      }
-      return tuple;
-    }
-
-    @Override
-    protected int compare() {
-      return comparable.compare(0, 1);
-    }
-  }
-
   /**
    * Two-way merger scanner that reads two input sources and outputs one 
output tuples sorted in some order.
    */
@@ -638,8 +647,10 @@ public class ExternalSortExec extends SortExec {
 
     private Tuple leftTuple;
     private Tuple rightTuple;
+    private boolean leftEOF;
+    private boolean rightEOF;
 
-    private final Tuple outTuple;
+    private Tuple outTuple;
 
     private float mergerProgress;
     private TableStats mergerInputStats;
@@ -652,7 +663,6 @@ public class ExternalSortExec extends SortExec {
       this.leftScan = leftScanner;
       this.rightScan = rightScanner;
       this.comparator = comparator;
-      this.outTuple = new VTuple(schema.size());
     }
 
     private void setState(State state) {
@@ -665,8 +675,6 @@ public class ExternalSortExec extends SortExec {
         leftScan.init();
         rightScan.init();
 
-        prepareTuplesForFirstComparison();
-
         mergerInputStats = new TableStats();
         mergerProgress = 0.0f;
 
@@ -676,31 +684,46 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
-    private void prepareTuplesForFirstComparison() throws IOException {
-      leftTuple = prepare(0, leftScan.next());
-      rightTuple = prepare(1, rightScan.next());
-    }
-
-    protected Tuple prepare(int index, Tuple tuple) {
-      return tuple;
-    }
-
     protected int compare() {
       return comparator.compare(leftTuple, rightTuple);
     }
 
     @Override
     public Tuple next() throws IOException {
-      if (leftTuple == null && rightTuple == null) {
-        return null;
+      if(!leftEOF && leftTuple == null) {
+        leftTuple = leftScan.next();
       }
-      if (rightTuple == null || (leftTuple != null && compare() < 0)) {
-        outTuple.put(leftTuple.getValues());
-        leftTuple = prepare(0, leftScan.next());
+
+      if(!rightEOF && rightTuple == null) {
+        rightTuple = rightScan.next();
+      }
+
+      if (leftTuple != null && rightTuple != null) {
+        if (compare() < 0) {
+          outTuple = leftTuple;
+          leftTuple = null;
+        } else {
+          outTuple = rightTuple;
+          rightTuple = null;
+        }
         return outTuple;
       }
-      outTuple.put(rightTuple.getValues());
-      rightTuple = prepare(1, rightScan.next());
+
+      if (leftTuple == null) {
+        leftEOF = true;
+
+        if (rightTuple != null) {
+          outTuple = rightTuple;
+          rightTuple = null;
+        } else {
+          rightEOF = true;
+          outTuple = null;
+        }
+      } else {
+        rightEOF = true;
+        outTuple = leftTuple;
+        leftTuple = null;
+      }
       return outTuple;
     }
 
@@ -712,8 +735,11 @@ public class ExternalSortExec extends SortExec {
 
         leftTuple = null;
         rightTuple = null;
+        outTuple = null;
+
+        leftEOF = false;
+        rightEOF = false;
 
-        prepareTuplesForFirstComparison();
       } else {
         throw new IllegalStateException("Illegal State: init() is not allowed 
in " + state.name());
       }
@@ -778,27 +804,25 @@ public class ExternalSortExec extends SortExec {
 
     if (result != null) {
       result.close();
-      try {
-        inputStats = (TableStats)result.getInputStats().clone();
-      } catch (CloneNotSupportedException e) {
-        LOG.warn(e.getMessage());
-      }
-      result = null;
     }
 
     if (finalOutputFiles != null) {
       for (Chunk chunk : finalOutputFiles) {
-        FileFragment frag = chunk.getFragment();
-        File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri());
-        if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) {
-          localFS.delete(frag.getPath(), true);
-          LOG.info("Delete file: " + frag);
+        if (!chunk.isMemory()) {
+          FileFragment frag = chunk.getFragment();
+          File tmpFile = new 
File(localFS.makeQualified(frag.getPath()).toUri());
+          if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) 
{
+            localFS.delete(frag.getPath(), true);
+            if(LOG.isDebugEnabled()) {
+              debug(LOG, "Delete file: " + frag);
+            }
+          }
         }
       }
     }
 
-    if(inMemoryTable != null){
-      inMemoryTable.clear();
+    if(inMemoryTable != null) {
+      inMemoryTable.release();
       inMemoryTable = null;
     }
 
@@ -831,21 +855,35 @@ public class ExternalSortExec extends SortExec {
   @Override
   public TableStats getInputStats() {
     if (result != null) {
-      return result.getInputStats();
-    } else {
-      return inputStats;
+
+      TableStats tableStats = result.getInputStats();
+      inputStats.setNumRows(tableStats.getNumRows());
+      inputStats.setNumBytes(inputBytes);
+      inputStats.setReadBytes(tableStats.getReadBytes());
     }
+    return inputStats;
   }
 
   private static class Chunk {
     private FileFragment fragment;
     private TableMeta meta;
+    private Schema schema;
+    private UnSafeTupleList memoryTuples;
+    private boolean isMemory;
 
-    public Chunk(FileFragment fragment, TableMeta meta) {
+    public Chunk(Schema schema, FileFragment fragment, TableMeta meta) {
+      this.schema = schema;
       this.fragment = fragment;
       this.meta = meta;
     }
 
+    public Chunk(Schema schema, UnSafeTupleList tuples, TableMeta meta) {
+      this.memoryTuples = tuples;
+      this.isMemory = true;
+      this.schema = schema;
+      this.meta = meta;
+    }
+
     public FileFragment getFragment() {
       return fragment;
     }
@@ -853,5 +891,169 @@ public class ExternalSortExec extends SortExec {
     public TableMeta getMeta() {
       return meta;
     }
+
+    public UnSafeTupleList getMemoryTuples() {
+      return memoryTuples;
+    }
+
+    public boolean isMemory() {
+      return isMemory;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+  }
+
+  /**
+   * The Comparator class for UnSafeTuples
+   *
+   * @see UnSafeTuple
+   */
+  static class UnSafeComparator implements Comparator<UnSafeTuple> {
+    private final int[] sortKeyIds;
+    private final TajoDataTypes.Type[] sortKeyTypes;
+    private final boolean[] asc;
+    private final boolean[] nullFirsts;
+
+    /**
+     * @param schema   The schema of input tuples
+     * @param sortKeys The description of sort keys
+     */
+    public UnSafeComparator(Schema schema, SortSpec[] sortKeys) {
+      Preconditions.checkArgument(sortKeys.length > 0,
+          "At least one sort key must be specified.");
+
+      this.sortKeyIds = new int[sortKeys.length];
+      this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length];
+      this.asc = new boolean[sortKeys.length];
+      this.nullFirsts = new boolean[sortKeys.length];
+      for (int i = 0; i < sortKeys.length; i++) {
+        if (sortKeys[i].getSortKey().hasQualifier()) {
+          this.sortKeyIds[i] = 
schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+        } else {
+          this.sortKeyIds[i] = 
schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+        }
+
+        this.asc[i] = sortKeys[i].isAscending();
+        this.nullFirsts[i] = sortKeys[i].isNullsFirst();
+        this.sortKeyTypes[i] = 
sortKeys[i].getSortKey().getDataType().getType();
+      }
+    }
+
+    @Override
+    public int compare(UnSafeTuple tuple1, UnSafeTuple tuple2) {
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        int compare = OffHeapRowBlockUtils.compareColumn(tuple1, tuple2,
+            sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]);
+
+        if (compare != 0) {
+          return compare;
+        }
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * The Comparator class for raw file
+   */
+  static class PrimitiveComparator implements Comparator<Tuple> {
+    private final int[] sortKeyIds;
+    private final TajoDataTypes.Type[] sortKeyTypes;
+    private final boolean[] asc;
+    private final boolean[] nullFirsts;
+
+    /**
+     * @param schema   The schema of input tuples
+     * @param sortKeys The description of sort keys
+     */
+    public PrimitiveComparator(Schema schema, SortSpec[] sortKeys) {
+      Preconditions.checkArgument(sortKeys.length > 0,
+          "At least one sort key must be specified.");
+
+      this.sortKeyIds = new int[sortKeys.length];
+      this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length];
+      this.asc = new boolean[sortKeys.length];
+      this.nullFirsts = new boolean[sortKeys.length];
+      for (int i = 0; i < sortKeys.length; i++) {
+        if (sortKeys[i].getSortKey().hasQualifier()) {
+          this.sortKeyIds[i] = 
schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+        } else {
+          this.sortKeyIds[i] = 
schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+        }
+
+        this.asc[i] = sortKeys[i].isAscending();
+        this.nullFirsts[i] = sortKeys[i].isNullsFirst();
+        this.sortKeyTypes[i] = 
sortKeys[i].getSortKey().getDataType().getType();
+      }
+    }
+
+    @Override
+    public int compare(Tuple tuple1, Tuple tuple2) {
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        int compare = compareColumn(tuple1, tuple2,
+            sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]);
+
+        if (compare != 0) {
+          return compare;
+        }
+      }
+      return 0;
+    }
+
+    public int compareColumn(Tuple tuple1, Tuple tuple2, int index, 
TajoDataTypes.Type type,
+                             boolean ascending, boolean nullFirst) {
+      final boolean n1 = tuple1.isBlankOrNull(index);
+      final boolean n2 = tuple2.isBlankOrNull(index);
+      if (n1 && n2) {
+        return 0;
+      }
+
+      if (n1 ^ n2) {
+        return nullFirst ? (n1 ? -1 : 1) : (n1 ? 1 : -1);
+      }
+
+      int compare;
+      switch (type) {
+      case BOOLEAN:
+        compare = Booleans.compare(tuple1.getBool(index), 
tuple2.getBool(index));
+        break;
+      case BIT:
+        compare = tuple1.getByte(index) - tuple2.getByte(index);
+        break;
+      case INT1:
+      case INT2:
+        compare = Shorts.compare(tuple1.getInt2(index), tuple2.getInt2(index));
+        break;
+      case DATE:
+      case INT4:
+        compare = Ints.compare(tuple1.getInt4(index), tuple2.getInt4(index));
+        break;
+      case INET4:
+        compare = UnsignedInts.compare(tuple1.getInt4(index), 
tuple2.getInt4(index));
+        break;
+      case TIME:
+      case TIMESTAMP:
+      case INT8:
+        compare = Longs.compare(tuple1.getInt8(index), tuple2.getInt8(index));
+        break;
+      case FLOAT4:
+        compare = Floats.compare(tuple1.getFloat4(index), 
tuple2.getFloat4(index));
+        break;
+      case FLOAT8:
+        compare = Doubles.compare(tuple1.getFloat8(index), 
tuple2.getFloat8(index));
+        break;
+      case CHAR:
+      case TEXT:
+      case BLOB:
+        compare = TextDatum.COMPARATOR.compare(tuple1.getBytes(index), 
tuple2.getBytes(index));
+        break;
+      default:
+        throw new TajoRuntimeException(
+            new UnsupportedException("unknown data type '" + type.name() + 
"'"));
+      }
+      return ascending ? compare : -compare;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 87a19a9..c70e1ff 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -70,6 +70,10 @@ public abstract class PhysicalExec implements SchemaObject {
     log.info("["+ context.getTaskId() + "] " + message);
   }
 
+  protected void debug(Log log, String message) {
+    log.debug("["+ context.getTaskId() + "] " + message);
+  }
+
   protected void warn(Log log, String message) {
     log.warn("[" + context.getTaskId() + "] " + message);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 9feae7e..65cb6ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -166,6 +166,11 @@ public class ExecutionBlockContext {
       return;
     }
 
+    LOG.info("Worker's task counter - total:" + completedTasksNum.intValue() +
+        ", succeeded: " + succeededTasksNum.intValue()
+        + ", killed: " + killedTasksNum.intValue()
+        + ", failed: " + failedTasksNum.intValue());
+
     try {
       reporter.stop();
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 2e1639f..b5abffe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -26,6 +26,7 @@ import io.netty.handler.codec.http.*;
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
@@ -152,7 +153,9 @@ public class Fetcher {
       request.headers().set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
       request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, 
HttpHeaders.Values.GZIP);
 
-      LOG.info("Status: " + getState() + ", URI:" + uri);
+      if(LOG.isDebugEnabled()) {
+        LOG.info("Status: " + getState() + ", URI:" + uri);
+      }
       // Send the HTTP request.
       channel.writeAndFlush(request);
 
@@ -168,7 +171,17 @@ public class Fetcher {
       }
 
       this.finishTime = System.currentTimeMillis();
-      LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + 
getState() + ", URI:" + uri);
+      long elapsedMills = finishTime - startTime;
+      String transferSpeed;
+      if(elapsedMills > 1000) {
+        long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
+        transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
+      } else {
+        transferSpeed = 
FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
+      }
+
+      LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, 
state:%s, URL:%s",
+          elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index fbd1948..873d9e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -110,7 +110,9 @@ public class TaskImpl implements Task {
     this.descs = Maps.newHashMap();
 
     Path baseDirPath = executionBlockContext.createBaseDir();
-    LOG.info("Task basedir is created (" + baseDirPath +")");
+
+    if(LOG.isDebugEnabled()) LOG.debug("Task basedir is created (" + 
baseDirPath +")");
+
     TaskAttemptId taskAttemptId = request.getId();
 
     this.taskDir = StorageUtil.concatPath(baseDirPath,
@@ -146,24 +148,21 @@ public class TaskImpl implements Task {
     }
 
     this.localChunks = Collections.synchronizedList(new ArrayList<>());
-    LOG.info("==================================");
-    LOG.info("* Stage " + request.getId() + " is initialized");
-    LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
-        ", Fragments (num: " + request.getFragments().size() + ")" +
-        ", Fetches (total:" + request.getFetches().size() + ") :");
+
+    LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: 
%s, Fragments: %d, Fetches:%d, " +
+        "Local dir: %s", request.getId(), interQuery, shuffleType, 
request.getFragments().size(),
+        request.getFetches().size(), taskDir));
 
     if(LOG.isDebugEnabled()) {
       for (FetchImpl f : request.getFetches()) {
         LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + 
f.getSimpleURIs());
       }
     }
-    LOG.info("* Local task dir: " + taskDir);
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("* plan:\n");
       LOG.debug(plan.toString());
     }
-    LOG.info("==================================");
   }
 
   private void updateDescsForScanNodes(NodeType nodeType) {
@@ -191,7 +190,6 @@ public class TaskImpl implements Task {
 
   @Override
   public void init() throws IOException {
-    LOG.info("Initializing: " + getId());
 
     initPlan();
     startScriptExecutors();
@@ -210,8 +208,10 @@ public class TaskImpl implements Task {
         for (String inputTable : context.getInputTables()) {
           tableDir = new Path(inputTableBaseDir, inputTable);
           if (!localFS.exists(tableDir)) {
-            LOG.info("the directory is created  " + tableDir.toUri());
             localFS.mkdirs(tableDir);
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("the directory is created  " + tableDir.toUri());
+            }
           }
         }
       }
@@ -456,11 +456,8 @@ public class TaskImpl implements Task {
         queryMasterStub.done(null, report, NullCallback.get());
       }
       endTime = System.currentTimeMillis();
-      LOG.info(context.getTaskId() + " completed. " +
-          "Worker's task counter - total:" + 
executionBlockContext.completedTasksNum.intValue() +
-          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+      LOG.info(String.format("%s is complete. %d ms elapsed, final state:%s",
+          context.getTaskId(), endTime - startTime, context.getState()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index b3ab85d..6bcb7b4 100644
--- 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -392,10 +392,7 @@ public class TajoPullServerService extends AbstractService 
{
 
     public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
       long fileSendTime = System.currentTimeMillis() - fileStartTime;
-      if (fileSendTime > 20 * 1000) {
-        LOG.info("PullServer send too long time: filePos=" + 
filePart.position() + ", fileLen=" + filePart.count());
-         SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile+ 1);
-      }
+
       if (fileSendTime > maxTime) {
         maxTime = fileSendTime;
       }
@@ -403,12 +400,20 @@ public class TajoPullServerService extends 
AbstractService {
         minTime = fileSendTime;
       }
 
+      if (fileSendTime > 20 * 1000) {
+        LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, 
" +
+            "length:" + (filePart.count() - filePart.position()) + ", URI:" + 
requestUri);
+        SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1);
+      }
+
       REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
       if (REMAIN_FILE_UPDATER.get(this) <= 0) {
         processingStatusMap.remove(requestUri);
-        LOG.info("PullServer processing status: totalTime=" + 
(System.currentTimeMillis() - startTime) + " ms, "
-            + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + 
minTime + " ms, maxTime=" + maxTime + " ms, "
-            + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("PullServer processing status: totalTime=" + 
(System.currentTimeMillis() - startTime) + " ms, "
+              + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + 
minTime + " ms, maxTime=" + maxTime + " ms, "
+              + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
+        }
       }
     }
   }
@@ -431,7 +436,10 @@ public class TajoPullServerService extends AbstractService 
{
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
       accepted.add(ctx.channel());
-      LOG.info(String.format("Current number of shuffle connections (%d)", 
accepted.size()));
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Current number of shuffle connections (%d)", 
accepted.size()));
+      }
       super.channelRegistered(ctx);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
index c8f46a6..6d4b137 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -40,11 +40,7 @@ public class BaseTupleComparator extends TupleComparator 
implements ProtoObject<
   private final int[] sortKeyIds;
   private final boolean[] asc;
   @SuppressWarnings("unused")
-  private final boolean[] nullFirsts;  
-
-  private Datum left;
-  private Datum right;
-  private int compVal;
+  private final boolean[] nullFirsts;
 
   /**
    * @param schema The schema of input tuples
@@ -110,29 +106,30 @@ public class BaseTupleComparator extends TupleComparator 
implements ProtoObject<
 
   @Override
   public int compare(Tuple tuple1, Tuple tuple2) {
+
     for (int i = 0; i < sortKeyIds.length; i++) {
-      left = tuple1.asDatum(sortKeyIds[i]);
-      right = tuple2.asDatum(sortKeyIds[i]);
-
-      if (left.isNull() || right.isNull()) {
-        if (!left.equals(right)) {
-          if (left.isNull()) {
-            compVal = nullFirsts[i] ? -1 : 1;
-          } else {
-            compVal = nullFirsts[i] ? 1 : -1;
-          }
-        } else {
-          compVal = 0;
-        }
+      final boolean n1 = tuple1.isBlankOrNull(sortKeyIds[i]);
+      final boolean n2 = tuple2.isBlankOrNull(sortKeyIds[i]);
+
+      if (n1 && n2) {
+        continue;
+      }
+
+      if (n1 ^ n2) {
+        return nullFirsts[i] ? (n1 ? -1 : 1) : (n1 ? 1 : -1);
+      }
+
+      Datum left = tuple1.asDatum(sortKeyIds[i]);
+      Datum right = tuple2.asDatum(sortKeyIds[i]);
+
+      int compVal;
+      if (asc[i]) {
+        compVal = left.compareTo(right);
       } else {
-        if (asc[i]) {
-          compVal = left.compareTo(right);
-        } else {
-          compVal = right.compareTo(left);
-        }
+        compVal = right.compareTo(left);
       }
 
-      if (compVal < 0 || compVal > 0) {
+      if (compVal != 0) {
         return compVal;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/550c0189/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
index a20adf7..7c91197 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -59,7 +59,6 @@ public class NullScanner implements Scanner {
 
   @Override
   public Tuple next() throws IOException {
-    progress = 1.0f;
     return null;
   }
 

Reply via email to