TAJO-1271: Improve memory usage of Hash-shuffle.

Closes #837


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/411a26d5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/411a26d5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/411a26d5

Branch: refs/heads/master
Commit: 411a26d5d45fb466d4c3c22806c67aedde14b623
Parents: 10159c7
Author: Jinho Kim <[email protected]>
Authored: Tue Nov 17 12:00:43 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Tue Nov 17 12:00:43 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/TajoTestingCluster.java     |   5 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   4 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  10 +-
 .../org/apache/tajo/storage/BufferPool.java     |   6 +-
 .../org/apache/tajo/tuple/BaseTupleBuilder.java |   2 +-
 .../tuple/memory/CompactRowBlockWriter.java     | 408 +++++++++++++++
 .../tajo/tuple/memory/DirectBufTuple.java       |   6 +-
 .../tajo/tuple/memory/HeapRowBlockReader.java   |   5 +-
 .../org/apache/tajo/tuple/memory/HeapTuple.java |  11 +-
 .../tajo/tuple/memory/MemoryRowBlock.java       |  81 ++-
 .../tuple/memory/OffHeapRowBlockReader.java     |   8 +-
 .../tajo/tuple/memory/OffHeapRowBlockUtils.java | 133 +++--
 .../tajo/tuple/memory/ResizableLimitSpec.java   |  11 +-
 .../tajo/tuple/memory/ResizableMemoryBlock.java |  18 +-
 .../org/apache/tajo/tuple/memory/RowBlock.java  |  20 +
 .../apache/tajo/tuple/memory/UnSafeTuple.java   |  24 +-
 .../apache/tajo/tuple/memory/ZeroCopyTuple.java |  10 +-
 .../java/org/apache/tajo/util/BitArray.java     |   5 +-
 tajo-core-tests/pom.xml                         |   2 +-
 .../planner/physical/TestExternalSortExec.java  |   2 +-
 .../planner/physical/TestHashJoinExec.java      |   1 +
 .../planner/physical/TestPhysicalPlanner.java   |   3 +
 .../physical/TestProgressExternalSortExec.java  |  41 +-
 .../tajo/querymaster/TestTaskStatusUpdate.java  |   2 +-
 .../TestTajoCli/testHelpSessionVars.result      |   3 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   4 +-
 .../planner/physical/ExternalSortExec.java      | 117 +++--
 .../physical/HashShuffleFileWriteExec.java      | 201 +++++---
 .../tajo/engine/planner/physical/SortExec.java  |   3 +-
 .../engine/planner/physical/TupleSorter.java    |   5 +-
 .../planner/physical/VectorizedSorter.java      |   3 +-
 .../apache/tajo/querymaster/Repartitioner.java  |   5 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   4 +
 .../java/org/apache/tajo/worker/TaskImpl.java   |  15 +-
 .../tajo/plan/function/stream/BufferPool.java   |   7 +-
 .../storage/HashShuffleAppenderManager.java     | 162 ++++--
 .../storage/HashShuffleAppenderWrapper.java     | 107 ++--
 .../java/org/apache/tajo/storage/RawFile.java   | 498 ++++---------------
 .../storage/rawfile/DirectRawFileScanner.java   |  28 +-
 .../storage/rawfile/DirectRawFileWriter.java    | 145 ++++--
 41 files changed, 1282 insertions(+), 845 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 14d6a82..10fa37d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1271: Improve memory usage of Hash-shuffle. (jinho)
+
     TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho)
 
     TAJO-1941: PermGen elimination in JDK 8. (Dongkyu Hwangbo via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 046a224..9e0e060 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -157,8 +157,9 @@ public class TajoTestingCluster {
     // Python function path
     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, 
getClass().getResource("/python").toString());
 
-    // Query output file
-    conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, 
BuiltinStorages.DRAW);
+    // Buffer size
+    conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
+    conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
 
     /** decrease Hbase thread and memory cache for testing */
     //server handler

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java 
b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 46df687..08c12a0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -117,7 +117,9 @@ public enum SessionVars implements ConfigKey {
 
   // for physical Executors
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort 
buffer size for external sort (mb)", DEFAULT,
-      Long.class, Validators.min("0")),
+      Integer.class, Validators.min("0")),
+  HASH_SHUFFLE_BUFFER_SIZE(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE, 
"hash-shuffle buffer size for local disk I/O (mb)"
+      , DEFAULT, Integer.class, Validators.min("1")),
   HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited 
size for hash join (mb)", DEFAULT,
       Long.class, Validators.min("0")),
   INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD,

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index de52e53..a2c1fb8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -213,12 +213,11 @@ public class TajoConf extends Configuration {
     
SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, 
Validators.min("1")),
     SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, 
Validators.min("1")),
     
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 
2, Validators.min("0")),
-    
SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 
10000),
-    
SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 
30),
-    HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10),
+    
SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volume-mb", 
30),
+    SHUFFLE_HASH_PARENT_DIRS("tajo.shuffle.hash.parent.dirs.count", 64),
 
     // Query output Configuration 
--------------------------------------------------
-    QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", 
BuiltinStorages.TEXT, Validators.javaString()),
+    QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", 
BuiltinStorages.DRAW, Validators.javaString()),
 
     // Storage Configuration --------------------------------------------------
     ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100),
@@ -334,7 +333,7 @@ public class TajoConf extends Configuration {
     $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 10),
 
     // for physical Executors
-    
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 
200L),
+    
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 
200),
     
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb",
 64l, Validators.min("0")),
     
$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb",
 64l,
         Validators.min("0")),
@@ -342,6 +341,7 @@ public class TajoConf extends Configuration {
         Validators.min("0")),
     
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb",
 64l,
         Validators.min("0")),
+    $EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE("tajo.executor.hash-shuffle.buffer-mb", 
100, Validators.min("1")),
     $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means 
infinite
     $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code 
generation (todo this is broken)
     $AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000),

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java 
b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 97ac7b4..4913d3b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -42,10 +42,6 @@ public class BufferPool {
   }
 
   static {
-    /* TODO Enable thread cache
-    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
-    *  Because the TaskRunner thread is newly created
-    * */
 
     if (TajoConstants.IS_TEST_MODE) {
       /* Disable pooling buffers for memory usage  */
@@ -55,7 +51,7 @@ public class BufferPool {
       ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
     } else {
       TajoConf tajoConf = new TajoConf();
-      ALLOCATOR = createPooledByteBufAllocator(true, 
tajoConf.getBoolean(ALLOW_CACHE, false), 0);
+      ALLOCATOR = createPooledByteBufAllocator(true, 
tajoConf.getBoolean(ALLOW_CACHE, true), 0);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
index cb417f3..ebdcc26 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -85,7 +85,7 @@ public class BaseTupleBuilder extends OffHeapRowWriter 
implements TupleBuilder,
 
   public UnSafeTuple buildToZeroCopyTuple() {
     UnSafeTuple zcTuple = new UnSafeTuple();
-    zcTuple.set(memoryBlock, memoryBlock.readerPosition(), 
memoryBlock.readableBytes(), dataTypes());
+    zcTuple.set(memoryBlock, memoryBlock.readerPosition(), dataTypes());
     return zcTuple;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
new file mode 100644
index 0000000..a88d2f1
--- /dev/null
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.BooleanDatum;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.BitArray;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+
+/**
+ * This class represent serialization of RawFile
+ *
+ * Row Record Structure
+ *
+ * | row length | null flags length | null flags | field 1 | field 2| ... | 
field N |;
+ *
+ * |  (4 bytes)        (2 bytes)      (N bytes)  |                             
     |;
+ *                Header                                         values
+ */
+public class CompactRowBlockWriter implements RowWriter {
+  private static final int RECORD_FIELD_SIZE = 4;
+  // Maximum variant int32 size is 5
+  private static final short MAXIMUM_VARIANT_INT32 = 5;
+  // Maximum variant int64 size is 10
+  private static final short MAXIMUM_VARIANT_INT64 = 10;
+
+  private final RowBlock rowBlock;
+  private final BitArray nullFlags;
+
+  /** record capacity + offset list */
+  private final int headerSize;
+
+  private final DataType[] dataTypes;
+
+  private int curFieldIdx;
+  private int curOffset;
+
+
+  public CompactRowBlockWriter(RowBlock rowBlock) {
+    this.dataTypes = rowBlock.getDataTypes();
+    this.rowBlock = rowBlock;
+
+    // compute the number of bytes, representing the null flags
+    nullFlags = new BitArray(dataTypes.length);
+    headerSize = RECORD_FIELD_SIZE + SizeOf.SIZE_OF_SHORT + 
nullFlags.bytesLength();
+
+    if (!rowBlock.getMemory().hasAddress()) {
+      throw new 
TajoInternalError(rowBlock.getMemory().getClass().getSimpleName()
+          + " does not support to direct memory access");
+    }
+  }
+
+
+  /**
+   * Encode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
+   * into values that can be efficiently encoded with varint.  (Otherwise,
+   * negative values must be sign-extended to 64 bits to be varint encoded,
+   * thus always taking 10 bytes on the wire.)
+   *
+   * @param n A signed 32-bit integer.
+   * @return An unsigned 32-bit integer, stored in a signed int because
+   *         Java has no explicit unsigned support.
+   */
+  public static int encodeZigZag32(final int n) {
+    // Note:  the right-shift must be arithmetic
+    return (n << 1) ^ (n >> 31);
+  }
+
+  /**
+   * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+   * into values that can be efficiently encoded with varint.  (Otherwise,
+   * negative values must be sign-extended to 64 bits to be varint encoded,
+   * thus always taking 10 bytes on the wire.)
+   *
+   * @param n A signed 64-bit integer.
+   * @return An unsigned 64-bit integer, stored in a signed int because
+   *         Java has no explicit unsigned support.
+   */
+  public static long encodeZigZag64(final long n) {
+    // Note:  the right-shift must be arithmetic
+    return (n << 1) ^ (n >> 63);
+  }
+
+  /**
+   * Encode and write a varint.  {@code value} is treated as
+   * unsigned, so it won't be sign-extended if negative.
+   */
+  public static short writeRawVarint32(long address, int value) {
+    short length = 0;
+    while (true) {
+      if ((value & ~0x7F) == 0) {
+        PlatformDependent.putByte(address + length, (byte) value);
+        length++;
+        return length;
+      } else {
+        PlatformDependent.putByte(address + length, (byte) ((value & 0x7F) | 
0x80));
+        value >>>= 7;
+        length++;
+      }
+    }
+  }
+
+  /**
+   * Encode and write a varint64.
+   */
+  public static short writeRawVarint64(long address, long value) {
+    short length = 0;
+    while (true) {
+      if ((value & ~0x7FL) == 0) {
+        PlatformDependent.putByte(address + length, (byte) value);
+        length++;
+        return length;
+      } else {
+        PlatformDependent.putByte(address + length, (byte) ((value & 0x7F) | 
0x80));
+        value >>>= 7;
+        length++;
+      }
+    }
+  }
+
+  /**
+   * Compute the number of bytes that would be needed to encode a varint.
+   * {@code value} is treated as unsigned, so it won't be sign-extended if
+   * negative.
+   */
+  public static int computeRawVarint32Size(final int value) {
+    if ((value & (0xffffffff <<  7)) == 0) return 1;
+    if ((value & (0xffffffff << 14)) == 0) return 2;
+    if ((value & (0xffffffff << 21)) == 0) return 3;
+    if ((value & (0xffffffff << 28)) == 0) return 4;
+    return 5;
+  }
+
+  /**
+   * Current memory address of the buffer
+   *
+   * @return The memory address
+   */
+  public long address() {
+    return rowBlock.getMemory().address();
+  }
+
+  /**
+   * Current position
+   *
+   * @return The position
+   */
+  public int position() {
+    return rowBlock.getMemory().writerPosition();
+  }
+
+
+  /**
+   * Forward the address;
+   *
+   * @param length Length to be forwarded
+   */
+  public void forward(int length) {
+    rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() 
+ length);
+  }
+
+  public void ensureSize(int size) {
+    rowBlock.getMemory().ensureSize(size);
+  }
+
+  @Override
+  public DataType[] dataTypes() {
+    return rowBlock.getDataTypes();
+  }
+
+  /**
+   * Current memory address of the row
+   *
+   * @return The memory address
+   */
+  public long recordStartAddr() {
+    return currentAddr() - curOffset;
+  }
+
+  /**
+   * Memory address that point to the first byte of the buffer
+   *
+   * @return The memory address
+   */
+  private long currentAddr() {
+    return address() + position();
+  }
+
+  public int offset() {
+    return position();
+  }
+
+
+  @Override
+  public void clear() {
+    curOffset = 0;
+    curFieldIdx = 0;
+    nullFlags.clear();
+  }
+
+  @Override
+  public boolean startRow() {
+    ensureSize(headerSize);
+    nullFlags.clear();
+
+    curOffset = headerSize;
+    curFieldIdx = 0;
+    forward(headerSize);
+    return true;
+  }
+
+
+  public void endRow() {
+    long rowHeaderPos = recordStartAddr();
+    // curOffset is equivalent to a byte length of this row.
+    PlatformDependent.putInt(rowHeaderPos, curOffset);
+    rowHeaderPos += SizeOf.SIZE_OF_INT;
+
+    //set null flags
+    byte [] flags = nullFlags.toArray();
+    PlatformDependent.putShort(rowHeaderPos, (short) flags.length);
+    rowHeaderPos += SizeOf.SIZE_OF_SHORT;
+    PlatformDependent.copyMemory(flags, 0, rowHeaderPos, flags.length);
+
+    rowBlock.setRows(rowBlock.rows() + 1);
+  }
+
+  @Override
+  public void skipField() {
+    // set null flag
+    nullFlags.set(curFieldIdx);
+    curFieldIdx++;
+  }
+
+  /**
+   * set current buffer position and forward field length
+   * @param fieldLength
+   */
+  private void forwardField(int fieldLength) {
+    forward(fieldLength);
+    curOffset += fieldLength;
+
+  }
+
+  @Override
+  public void putByte(byte val) {
+    ensureSize(SizeOf.SIZE_OF_BYTE);
+    long addr = currentAddr();
+
+    PlatformDependent.putByte(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_BYTE);
+  }
+
+  @Override
+  public void putBool(boolean val) {
+    putByte(val ? BooleanDatum.TRUE_INT : BooleanDatum.FALSE_INT);
+  }
+
+  @Override
+  public void putInt2(short val) {
+    ensureSize(SizeOf.SIZE_OF_SHORT);
+    long addr = currentAddr();
+
+    PlatformDependent.putShort(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_SHORT);
+  }
+
+  @Override
+  public void putInt4(int val) {
+    ensureSize(MAXIMUM_VARIANT_INT32);
+
+    curFieldIdx++;
+    forwardField(writeRawVarint32(currentAddr(), encodeZigZag32(val)));
+  }
+
+  @Override
+  public void putInt8(long val) {
+    ensureSize(MAXIMUM_VARIANT_INT64);
+
+    curFieldIdx++;
+    forwardField(writeRawVarint64(currentAddr(), encodeZigZag64(val)));
+  }
+
+  @Override
+  public void putFloat4(float val) {
+    ensureSize(SizeOf.SIZE_OF_FLOAT);
+    long addr = currentAddr();
+
+    UnsafeUtil.unsafe.putFloat(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_FLOAT);
+  }
+
+  @Override
+  public void putFloat8(double val) {
+    ensureSize(SizeOf.SIZE_OF_DOUBLE);
+    long addr = currentAddr();
+
+    UnsafeUtil.unsafe.putDouble(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_DOUBLE);
+  }
+
+  @Override
+  public void putText(String val) {
+    putText(val.getBytes(TextDatum.DEFAULT_CHARSET));
+  }
+
+  @Override
+  public void putText(byte[] val) {
+    putBlob(val);
+  }
+
+  @Override
+  public void putBlob(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(MAXIMUM_VARIANT_INT32 + bytesLen);
+    long addr = currentAddr();
+
+    short length = writeRawVarint32(addr, bytesLen);
+    PlatformDependent.copyMemory(val, 0, addr + length, bytesLen);
+    curFieldIdx++;
+    forwardField(length + bytesLen);
+  }
+
+  @Override
+  public void putDate(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_INT);
+  }
+
+  @Override
+  public void putTime(long val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    long addr = currentAddr();
+
+    PlatformDependent.putLong(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_LONG);
+  }
+
+  @Override
+  public void putTimestamp(long val) {
+    putTime(val);
+  }
+
+  @Override
+  public void putInterval(IntervalDatum val) {
+    ensureSize(MAXIMUM_VARIANT_INT32 + MAXIMUM_VARIANT_INT64);
+    long addr = currentAddr();
+
+    short length = writeRawVarint32(addr, encodeZigZag32(val.getMonths()));
+    length += writeRawVarint64(addr, encodeZigZag64(val.getMilliSeconds()));
+
+    curFieldIdx++;
+    forwardField(length);
+  }
+
+  @Override
+  public void putInet4(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, val);
+    curFieldIdx++;
+    forwardField(SizeOf.SIZE_OF_INT);
+  }
+
+  @Override
+  public void putProtoDatum(ProtobufDatum val) {
+    putBlob(val.asByteArray());
+  }
+
+  @Override
+  public void addTuple(Tuple tuple) {
+    OffHeapRowBlockUtils.convert(tuple, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
index 10e493f..1852e6d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
@@ -28,11 +28,11 @@ import static org.apache.tajo.common.TajoDataTypes.DataType;
 public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
   private MemoryBlock memoryBlock;
 
-  public DirectBufTuple(int length, DataType[] types) {
-    ByteBuffer bb = 
ByteBuffer.allocateDirect(length).order(ByteOrder.LITTLE_ENDIAN);
+  public DirectBufTuple(DataType[] types) {
+    ByteBuffer bb = 
ByteBuffer.allocateDirect(getLength()).order(ByteOrder.LITTLE_ENDIAN);
     memoryBlock = new ResizableMemoryBlock(bb);
 
-    set(memoryBlock, 0, length, types);
+    set(memoryBlock, 0, types);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
index dd377cf..ec5033b 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
@@ -48,10 +48,9 @@ public class HeapRowBlockReader implements 
RowBlockReader<HeapTuple> {
   public boolean next(HeapTuple tuple) {
     if (curRowIdxForRead < rows) {
 
-      int recordLen = memoryBlock.getInt(curPosForRead);
-      tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes);
+      tuple.set(memoryBlock, curPosForRead, dataTypes);
 
-      curPosForRead += recordLen;
+      curPosForRead += tuple.getLength();
       curRowIdxForRead++;
       memoryBlock.readerPosition(curPosForRead);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
index 9f508b3..c6c7daf 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
@@ -41,16 +41,16 @@ public class HeapTuple extends ZeroCopyTuple implements 
Cloneable {
   private DataType[] types;
 
   @Override
-  public void set(MemoryBlock memoryBlock, int relativePos, int length, 
DataType[] types) {
+  public void set(MemoryBlock memoryBlock, int relativePos, DataType[] types) {
     this.buffer = memoryBlock.getBuffer();
     this.types = types;
-    super.set(relativePos, length);
+    super.set(relativePos);
   }
 
   protected void set(final byte[] bytes, final DataType[] types) {
     this.buffer = Unpooled.wrappedBuffer(bytes).order(ByteOrder.LITTLE_ENDIAN);
     this.types = types;
-    super.set(0, bytes.length);
+    super.set(0);
   }
 
   @Override
@@ -59,6 +59,11 @@ public class HeapTuple extends ZeroCopyTuple implements 
Cloneable {
   }
 
   @Override
+  public int getLength() {
+    return buffer.getInt(getRelativePos());
+  }
+
+  @Override
   public TajoDataTypes.Type type(int fieldId) {
     return types[fieldId].getType();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
index 922fc68..3d02f9a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
@@ -19,6 +19,8 @@
 package org.apache.tajo.tuple.memory;
 
 import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.BuiltinStorages;
+import org.apache.tajo.annotation.NotThreadSafe;
 import org.apache.tajo.exception.NotImplementedException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.tuple.RowBlockReader;
@@ -32,10 +34,12 @@ import java.nio.channels.ScatteringByteChannel;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
+@NotThreadSafe
 public class MemoryRowBlock implements RowBlock, Deallocatable {
   public static final int NULL_FIELD_OFFSET = -1;
 
-  private DataType[] dataTypes;
+  private final DataType[] dataTypes;
+  private final String dataFormat;
 
   // Basic States
   private int maxRowNum = Integer.MAX_VALUE; // optional
@@ -45,20 +49,20 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
   private MemoryBlock memory;
 
   public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, 
boolean isDirect) {
+    this(dataTypes, limitSpec, isDirect, BuiltinStorages.DRAW);
+  }
+
+  public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, 
boolean isDirect, String dataFormat) {
     this.memory = new ResizableMemoryBlock(limitSpec, isDirect);
     this.dataTypes = dataTypes;
+    this.dataFormat = dataFormat;
   }
 
   public MemoryRowBlock(MemoryRowBlock rowBlock) {
     this.memory = TUtil.checkTypeAndGet(rowBlock.getMemory().duplicate(), 
ResizableMemoryBlock.class);
     this.rowNum = rowBlock.rowNum;
     this.dataTypes = rowBlock.dataTypes;
-  }
-
-  public MemoryRowBlock(MemoryBlock memory, DataType[] dataTypes, int rowNum) {
-    this.memory = memory;
-    this.rowNum = rowNum;
-    this.dataTypes = dataTypes;
+    this.dataFormat = rowBlock.dataFormat;
   }
 
   public MemoryRowBlock(DataType[] dataTypes) {
@@ -69,8 +73,13 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
     this(dataTypes, new ResizableLimitSpec(bytes), true);
   }
 
-  public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect) {
-    this(dataTypes, new ResizableLimitSpec(bytes), isDirect);
+  public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect, 
String dataFormat) {
+    this(dataTypes, new ResizableLimitSpec(bytes), isDirect, dataFormat);
+  }
+
+  @Override
+  public String getDataFormat() {
+    return dataFormat;
   }
 
   @Override
@@ -91,6 +100,20 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
     return memory.capacity();
   }
 
+  @Override
+  public int usedMem() {
+    return memory.writerPosition();
+  }
+
+  @Override
+  public float usage() {
+    if (usedMem() > 0) {
+      return (usedMem() / (float) capacity());
+    } else {
+      return 0.0f;
+    }
+  }
+
   public int maxRowNum() {
     return maxRowNum;
   }
@@ -112,6 +135,15 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
 
   @Override
   public boolean copyFromChannel(ScatteringByteChannel channel) throws 
IOException {
+    switch (dataFormat) {
+    case BuiltinStorages.DRAW:
+      return fillDrawBuffer(channel);
+    default:
+      throw new TajoInternalError(new NotImplementedException("Heap memory 
writer not implemented yet"));
+    }
+  }
+
+  protected boolean fillDrawBuffer(ScatteringByteChannel channel) throws 
IOException {
     reset();
 
     int readBytes = memory.writeBytes(channel);
@@ -143,13 +175,23 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
   @Override
   public RowWriter getWriter() {
 
+    if (!getMemory().hasAddress()) {
+      throw new TajoInternalError(new NotImplementedException("Heap memory 
writer not implemented yet"));
+    }
+
     if (builder == null) {
-      if (!getMemory().hasAddress()) {
-        throw new TajoInternalError(new NotImplementedException("Heap memory 
writer not implemented yet"));
-      } else {
+      switch (dataFormat) {
+      case BuiltinStorages.DRAW:
         this.builder = new OffHeapRowBlockWriter(this);
+        break;
+      case BuiltinStorages.RAW:
+        this.builder = new CompactRowBlockWriter(this);
+        break;
+      default:
+        throw new TajoInternalError(new NotImplementedException(dataFormat + " 
memory writer not implemented yet"));
       }
     }
+
     return builder;
   }
 
@@ -165,10 +207,17 @@ public class MemoryRowBlock implements RowBlock, 
Deallocatable {
 
   @Override
   public RowBlockReader getReader() {
-    if (!getMemory().hasAddress()) {
-      return new HeapRowBlockReader(this);
-    } else {
-      return new OffHeapRowBlockReader(this);
+
+    switch (dataFormat) {
+    case BuiltinStorages.DRAW: {
+      if (!getMemory().hasAddress()) {
+        return new HeapRowBlockReader(this);
+      } else {
+        return new OffHeapRowBlockReader(this);
+      }
+    }
+    default:
+      throw new TajoInternalError(new NotImplementedException(dataFormat + " 
memory writer not implemented yet"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
index ccaeffc..c5673e3 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.tuple.memory;
 
-import io.netty.util.internal.PlatformDependent;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.tuple.RowBlockReader;
@@ -54,11 +53,8 @@ public class OffHeapRowBlockReader implements 
RowBlockReader<ZeroCopyTuple> {
   public boolean next(ZeroCopyTuple tuple) {
     if (curRowIdxForRead < rows) {
 
-      long recordStartPtr = memoryBlock.address() + curPosForRead;
-      int recordLen = PlatformDependent.getInt(recordStartPtr);
-      tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes);
-
-      curPosForRead += recordLen;
+      tuple.set(memoryBlock, curPosForRead, dataTypes);
+      curPosForRead += tuple.getLength();
       curRowIdxForRead++;
       memoryBlock.readerPosition(curPosForRead);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
index e8f219c..1aca22f 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
@@ -32,6 +32,11 @@ import java.util.Comparator;
 import java.util.List;
 
 public class OffHeapRowBlockUtils {
+  private static TupleConverter tupleConverter;
+
+  static {
+    tupleConverter = new TupleConverter();
+  }
 
   public static List<Tuple> sort(MemoryRowBlock rowBlock, Comparator<Tuple> 
comparator) {
     List<Tuple> tupleList = Lists.newArrayList();
@@ -80,62 +85,86 @@ public class OffHeapRowBlockUtils {
     return tuples;
   }
 
-  public static void convert(Tuple tuple, RowWriter writer) {
-    writer.startRow();
+  /**
+   * This class is tuple converter to the RowBlock
+   */
+  public static class TupleConverter {
 
-    for (int i = 0; i < writer.dataTypes().length; i++) {
-      if (tuple.isBlankOrNull(i)) {
-        writer.skipField();
-        continue;
+    public void convert(Tuple tuple, RowWriter writer) {
+      writer.startRow();
+
+      for (int i = 0; i < writer.dataTypes().length; i++) {
+        writeField(i, tuple, writer);
       }
-      switch (writer.dataTypes()[i].getType()) {
-      case BOOLEAN:
-        writer.putBool(tuple.getBool(i));
-        break;
-      case BIT:
-        writer.putByte(tuple.getByte(i));
-        break;
-      case INT1:
-      case INT2:
-        writer.putInt2(tuple.getInt2(i));
-        break;
-      case INT4:
-      case DATE:
-      case INET4:
-        writer.putInt4(tuple.getInt4(i));
-        break;
-      case INT8:
-      case TIMESTAMP:
-      case TIME:
-        writer.putInt8(tuple.getInt8(i));
-        break;
-      case FLOAT4:
-        writer.putFloat4(tuple.getFloat4(i));
-        break;
-      case FLOAT8:
-        writer.putFloat8(tuple.getFloat8(i));
-        break;
-      case CHAR:
-      case TEXT:
-        writer.putText(tuple.getBytes(i));
-        break;
-      case BLOB:
-        writer.putBlob(tuple.getBytes(i));
-        break;
-      case INTERVAL:
-        writer.putInterval((IntervalDatum) tuple.getInterval(i));
-        break;
-      case PROTOBUF:
-        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
-        break;
-      case NULL_TYPE:
+
+      writer.endRow();
+    }
+
+    protected void writeField(int colIdx, Tuple tuple, RowWriter writer) {
+
+      if (tuple.isBlankOrNull(colIdx)) {
         writer.skipField();
-        break;
-      default:
-        throw new TajoRuntimeException(
-            new UnsupportedException("unknown data type '" + 
writer.dataTypes()[i].getType().name() + "'"));
+      } else {
+        switch (writer.dataTypes()[colIdx].getType()) {
+        case BOOLEAN:
+          writer.putBool(tuple.getBool(colIdx));
+          break;
+        case BIT:
+          writer.putByte(tuple.getByte(colIdx));
+          break;
+        case INT1:
+        case INT2:
+          writer.putInt2(tuple.getInt2(colIdx));
+          break;
+        case INT4:
+          writer.putInt4(tuple.getInt4(colIdx));
+          break;
+        case DATE:
+          writer.putDate(tuple.getInt4(colIdx));
+          break;
+        case INT8:
+          writer.putInt8(tuple.getInt8(colIdx));
+          break;
+        case TIMESTAMP:
+          writer.putTimestamp(tuple.getInt8(colIdx));
+          break;
+        case TIME:
+          writer.putTime(tuple.getInt8(colIdx));
+          break;
+        case FLOAT4:
+          writer.putFloat4(tuple.getFloat4(colIdx));
+          break;
+        case FLOAT8:
+          writer.putFloat8(tuple.getFloat8(colIdx));
+          break;
+        case CHAR:
+        case TEXT:
+          writer.putText(tuple.getBytes(colIdx));
+          break;
+        case BLOB:
+          writer.putBlob(tuple.getBytes(colIdx));
+          break;
+        case INTERVAL:
+          writer.putInterval((IntervalDatum) tuple.getInterval(colIdx));
+          break;
+        case PROTOBUF:
+          writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(colIdx));
+          break;
+        case INET4:
+          writer.putInet4(tuple.getInt4(colIdx));
+          break;
+        case NULL_TYPE:
+          writer.skipField();
+          break;
+        default:
+          throw new TajoRuntimeException(
+              new UnsupportedException("unknown data type '" + 
writer.dataTypes()[colIdx].getType().name() + "'"));
+        }
       }
     }
-    writer.endRow();
+  }
+
+  public static void convert(Tuple tuple, RowWriter writer) {
+    tupleConverter.convert(tuple, writer);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
index 614b3fb..ddf50ab 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
@@ -21,6 +21,8 @@ package org.apache.tajo.tuple.memory;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.util.FileUtil;
 
 /**
@@ -29,7 +31,7 @@ import org.apache.tajo.util.FileUtil;
  * due to ByteBuffer.
  */
 public class ResizableLimitSpec {
-  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+  private static final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
 
   public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
   public static final ResizableLimitSpec DEFAULT_LIMIT = new 
ResizableLimitSpec(Integer.MAX_VALUE);
@@ -114,10 +116,11 @@ public class ResizableLimitSpec {
       return (int) initSize;
     }
 
-    if (currentSize > Integer.MAX_VALUE) {
-      LOG.warn("Current size already exceeds the maximum size (" + 
Integer.MAX_VALUE + " bytes)");
-      return Integer.MAX_VALUE;
+    if (currentSize == Integer.MAX_VALUE) {
+      throw new TajoRuntimeException(new UnsupportedException(
+          "Current size already exceeds the maximum size (" + 
Integer.MAX_VALUE + " bytes)"));
     }
+
     long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
 
     if (nextSize > limitBytes) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
index 22c2561..09faff9 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
@@ -39,10 +39,12 @@ public class ResizableMemoryBlock implements MemoryBlock {
 
   protected ByteBuf buffer;
   protected ResizableLimitSpec limitSpec;
+  private long memoryAddress;
 
   public ResizableMemoryBlock(ByteBuf buffer, ResizableLimitSpec limitSpec) {
     this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN);
     this.limitSpec = limitSpec;
+    this.memoryAddress = this.buffer.hasMemoryAddress() ? 
this.buffer.memoryAddress() : 0;
   }
 
   public ResizableMemoryBlock(ByteBuf buffer) {
@@ -50,13 +52,13 @@ public class ResizableMemoryBlock implements MemoryBlock {
   }
 
   public ResizableMemoryBlock(ByteBuffer buffer) {
-    this.buffer = 
Unpooled.wrappedBuffer(buffer).order(ByteOrder.LITTLE_ENDIAN);
-    this.limitSpec = new ResizableLimitSpec(buffer.capacity());
+    this(Unpooled.wrappedBuffer(buffer), new 
ResizableLimitSpec(buffer.capacity(), buffer.capacity()));
   }
 
   public ResizableMemoryBlock(ResizableLimitSpec limitSpec, boolean isDirect) {
     if (isDirect) {
       this.buffer = BufferPool.directBuffer((int) limitSpec.initialSize(), 
(int) limitSpec.limit());
+      this.memoryAddress = buffer.memoryAddress();
     } else {
       this.buffer = BufferPool.heapBuffer((int) limitSpec.initialSize(), (int) 
limitSpec.limit());
     }
@@ -65,7 +67,7 @@ public class ResizableMemoryBlock implements MemoryBlock {
 
   @Override
   public long address() {
-    return buffer.memoryAddress();
+    return memoryAddress;
   }
 
   @Override
@@ -123,15 +125,14 @@ public class ResizableMemoryBlock implements MemoryBlock {
     return buffer.writerIndex();
   }
 
-
   @Override
   public void ensureSize(int size) {
     if (!buffer.isWritable(size)) {
-      if (!limitSpec.canIncrease(buffer.capacity())) {
+      if (!limitSpec.canIncrease(size)) {
         throw new RuntimeException("Cannot increase RowBlock anymore.");
       }
 
-      int newBlockSize = limitSpec.increasedSize(buffer.capacity());
+      int newBlockSize = limitSpec.increasedSize(size);
       resize(newBlockSize);
       LOG.info("Increase DirectRowBlock to " + 
FileUtil.humanReadableByteCount(newBlockSize, false));
     }
@@ -144,17 +145,18 @@ public class ResizableMemoryBlock implements MemoryBlock {
       throw new RuntimeException("Resize cannot exceed the capacity limit");
     }
 
-    if (newSize < buffer.capacity()) {
+    if (newSize < buffer.writableBytes()) {
       LOG.warn("The capacity reduction is ignored.");
     }
 
     int newBlockSize = UnsafeUtil.alignedSize(newSize);
     buffer = BufferPool.ensureWritable(buffer, newBlockSize);
+    memoryAddress = buffer.memoryAddress();
   }
 
   @Override
   public void release() {
-    buffer.release();
+    if(buffer.refCnt() > 0) buffer.release();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
index 68902fb..1ab1042 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
@@ -25,11 +25,31 @@ import java.io.IOException;
 import java.nio.channels.ScatteringByteChannel;
 
 public interface RowBlock {
+  /**
+   * a data format for de/serialization
+   */
+  String getDataFormat();
 
+  /**
+   * reset the memory and writer
+   */
   void clear();
 
+  /**
+   * @return the number of bytes this memory block can contain.
+   */
   int capacity();
 
+  /**
+   * @return the number of written bytes in this memory block
+   */
+  int usedMem();
+
+  /**
+   * @return the percentage of written bytes in this memory block
+   */
+  float usage();
+
   void setRows(int rowNum);
 
   int rows();

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
index ec167f8..26f7df3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
@@ -42,22 +42,22 @@ import static org.apache.tajo.common.TajoDataTypes.DataType;
 public class UnSafeTuple extends ZeroCopyTuple {
   private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
 
-  private long address;
+  private MemoryBlock memoryBlock;
   private DataType[] types;
 
   @Override
-  public void set(MemoryBlock memoryBlock, int relativePos, int length, 
DataType[] types) {
+  public void set(MemoryBlock memoryBlock, int relativePos, DataType[] types) {
     Preconditions.checkArgument(memoryBlock.hasAddress());
 
-    this.address = memoryBlock.address();
+    this.memoryBlock = memoryBlock;
     this.types = types;
-    super.set(relativePos, length);
+    super.set(relativePos);
   }
 
   public void set(UnSafeTuple tuple) {
-    this.address = tuple.address;
+    this.memoryBlock = tuple.memoryBlock;
     this.types = tuple.types;
-    super.set(tuple.getRelativePos(), tuple.getLength());
+    super.set(tuple.getRelativePos());
   }
 
   @Override
@@ -66,6 +66,11 @@ public class UnSafeTuple extends ZeroCopyTuple {
   }
 
   @Override
+  public int getLength() {
+    return PlatformDependent.getInt(address());
+  }
+
+  @Override
   public TajoDataTypes.Type type(int fieldId) {
     return types[fieldId].getType();
   }
@@ -93,7 +98,7 @@ public class UnSafeTuple extends ZeroCopyTuple {
   }
 
   public long address() {
-    return address + getRelativePos();
+    return memoryBlock.address() + getRelativePos();
   }
 
   public HeapTuple toHeapTuple() {
@@ -110,8 +115,9 @@ public class UnSafeTuple extends ZeroCopyTuple {
 
   public long getFieldAddr(int fieldId) {
     int fieldOffset = getFieldOffset(fieldId);
-    if (fieldOffset == -1) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    if (fieldOffset < 0 || fieldOffset > getLength()) {
+      throw new RuntimeException("Invalid Access. Field : " + fieldId
+          + ", Offset:" + fieldOffset + ", Record length:" + getLength());
     }
     return address() + fieldOffset;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
index 1f4f57e..e9108f2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
@@ -24,22 +24,18 @@ import org.apache.tajo.storage.Tuple;
 public abstract class ZeroCopyTuple implements Tuple {
 
   protected int relativePos;
-  protected int length;
 
-  public abstract void set(MemoryBlock memoryBlock, int relativePos, int 
length, DataType[] types);
+  public abstract void set(MemoryBlock memoryBlock, int relativePos, 
DataType[] types);
 
-  void set(int relativePos, int length) {
+  void set(int relativePos) {
     this.relativePos = relativePos;
-    this.length = length;
   }
 
   public int getRelativePos() {
     return relativePos;
   }
 
-  public int getLength() {
-    return length;
-  }
+  public abstract int getLength();
 
   @Override
   public Tuple clone() throws CloneNotSupportedException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java 
b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
index e62496a..973266b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.util;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 public class BitArray {
   private byte [] data;
@@ -60,9 +61,7 @@ public class BitArray {
   }
 
   public void clear() {
-    for (int i = 0; i < data.length; i++) {
-      data[i] = 0;
-    }
+    Arrays.fill(data, (byte) 0);
   }
 
   public int bytesLength() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 2037f0b..6de5546 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -375,7 +375,7 @@
               <forkCount>${maven.fork.count}</forkCount>
               <reuseForks>true</reuseForks>
               <trimStackTrace>false</trimStackTrace>
-              <argLine>-Xms128m -Xmx800m -XX:+CMSClassUnloadingEnabled 
-Dfile.encoding=UTF-8 -Dderby.storage.pageSize=1024 
-Dderby.stream.error.file=/dev/null</argLine>
+              <argLine>-Xms128m -Xmx800m -XX:MinMetaspaceFreeRatio=10 
-XX:MaxMetaspaceFreeRatio=10 -XX:+CMSClassUnloadingEnabled 
-Dfile.encoding=UTF-8 -Dderby.storage.pageSize=1024 
-Dderby.stream.error.file=/dev/null</argLine>
               <useSystemClassLoader>true</useSystemClassLoader>
               <useManifestOnlyJar>true</useManifestOnlyJar>
               <systemProperties>

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 4ee2c9c..30c46e5 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -122,7 +122,7 @@ public class TestExternalSortExec {
   public final void testNext() throws IOException, TajoException {
     conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2);
     QueryContext queryContext = 
LocalTajoTestingUtility.createDummyContext(conf);
-    queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, 1024*1024);
+    queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 1);
 
     FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", 
employee.getMeta(),
         new Path(employee.getUri()), Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index a4afa7f..2f4d66f 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -216,6 +216,7 @@ public class TestHashJoinExec {
     HashJoinExec joinExec = proj.getChild();
 
     assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+    exec.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index bcecff7..adeb250 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -573,6 +573,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode = optimizer.optimize(plan);
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.close();
     assertTrue(exec instanceof SortBasedColPartitionStoreExec);
   }
 
@@ -597,6 +598,7 @@ public class TestPhysicalPlanner {
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.close();
     assertTrue(exec instanceof HashBasedColPartitionStoreExec);
   }
 
@@ -621,6 +623,7 @@ public class TestPhysicalPlanner {
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.close();
     assertTrue(exec instanceof SortBasedColPartitionStoreExec);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index 349aec0..b1f53da 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -20,10 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -42,6 +39,7 @@ import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
@@ -65,7 +63,7 @@ public class TestProgressExternalSortExec {
   private LogicalPlanner planner;
   private Path testDir;
 
-  private final int numTuple = 5000;
+  private final int numTuple = 50000;
   private Random rnd = new Random(System.currentTimeMillis());
 
   private TableDesc employee;
@@ -87,8 +85,8 @@ public class TestProgressExternalSortExec {
     schema.addColumn("empid", TajoDataTypes.Type.INT4);
     schema.addColumn("deptname", TajoDataTypes.Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
-    Path employeePath = new Path(testDir, "employee.csv");
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW);
+    Path employeePath = new Path(testDir, "employee.raw");
     Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
@@ -126,18 +124,24 @@ public class TestProgressExternalSortExec {
 
   @Test
   public void testExternalSortExecProgressWithMemTableScanner() throws 
Exception {
-    testProgress(testDataStats.getNumBytes() * 20);    //multiply 20 for 
memory fit
+    QueryContext queryContext = 
LocalTajoTestingUtility.createDummyContext(conf);
+    int bufferSize = (int) (testDataStats.getNumBytes() * 20) / 
StorageUnit.MB; //multiply 2 for memory fit
+    queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize);
+
+    testProgress(queryContext);
   }
 
   @Test
   public void testExternalSortExecProgressWithPairWiseMerger() throws 
Exception {
-    testProgress(testDataStats.getNumBytes());
+    QueryContext queryContext = 
LocalTajoTestingUtility.createDummyContext(conf);
+    int bufferSize = (int) Math.max((testDataStats.getNumBytes() / 
StorageUnit.MB), 1);
+    queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize);
+
+    testProgress(queryContext);
   }
 
-  private void testProgress(long sortBufferBytesNum) throws Exception {
+  private void testProgress(QueryContext queryContext) throws Exception {
     conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2);
-    QueryContext queryContext = 
LocalTajoTestingUtility.createDummyContext(conf);
-    queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, sortBufferBytesNum);
 
     FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", 
employee.getMeta(),
         new Path(employee.getUri()), Integer.MAX_VALUE);
@@ -191,9 +195,9 @@ public class TestProgressExternalSortExec {
     TableStats tableStats = exec.getInputStats();
     assertNotNull(tableStats);
     assertEquals(testDataStats.getNumBytes().longValue(), 
tableStats.getNumBytes().longValue());
-    assertEquals(cnt, testDataStats.getNumRows().longValue());
-    assertEquals(cnt, tableStats.getNumRows().longValue());
-    assertEquals(testDataStats.getNumBytes().longValue(), 
tableStats.getReadBytes().longValue());
+    assertEquals(testDataStats.getNumRows().longValue(), cnt);
+    assertEquals(testDataStats.getNumRows().longValue(), 
tableStats.getNumRows().longValue());
+    assertTrue(testDataStats.getNumBytes().longValue() <= 
tableStats.getReadBytes().longValue());
 
     // for rescan test
     preVal = null;
@@ -216,9 +220,10 @@ public class TestProgressExternalSortExec {
     tableStats = exec.getInputStats();
     assertNotNull(tableStats);
     assertEquals(testDataStats.getNumBytes().longValue(), 
tableStats.getNumBytes().longValue());
-    assertEquals(cnt, testDataStats.getNumRows().longValue());
-    assertEquals(cnt, tableStats.getNumRows().longValue());
-    assertEquals(testDataStats.getNumBytes().longValue(), 
tableStats.getReadBytes().longValue());
+    assertEquals(testDataStats.getNumRows().longValue(), cnt);
+    assertEquals(testDataStats.getNumRows().longValue(), 
tableStats.getNumRows().longValue());
+    //'ReadBytes' is actual read bytes
+    assertTrue(testDataStats.getNumBytes().longValue() <= 
tableStats.getReadBytes().longValue());
 
     conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 
ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.defaultIntVal);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
index f845bb3..9f5ecb5 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
@@ -106,7 +106,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase 
{
       res = executeQuery();
 
       // in/out * stage(4)
-      long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2};
+      long[] expectedNumRows = new long[]{5, 5, 2, 2, 2, 2, 2, 2};
       long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64};
       long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0};
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index d65c346..ec8344f 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -28,7 +28,8 @@ Available Session Variables:
 \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for 
partition table write (mb)
 \set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby 
enabled
 \set QUERY_EXECUTE_PARALLEL [int value] - Maximum parallel running of 
execution blocks for a query
-\set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
+\set EXTSORT_BUFFER_SIZE [int value] - sort buffer size for external sort (mb)
+\set HASH_SHUFFLE_BUFFER_SIZE [int value] - hash-shuffle buffer size for local 
disk I/O (mb)
 \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
 \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner 
join (mb)
 \set OUTER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash outer 
join (mb)

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c6db96d..e0da9e6 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -904,14 +904,12 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
   public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode 
scanNode, Stack<LogicalNode> node)
       throws IOException {
     // check if an input is sorted in the same order to the subsequence sort 
operator.
-    // TODO - it works only if input files are raw files. We should check the 
file format.
-    // Since the default intermediate file format is raw file, it is not 
problem right now.
     if (checkIfSortEquivalance(ctx, scanNode, node)) {
       if (ctx.getTable(scanNode.getCanonicalName()) == null) {
         return new SeqScanExec(ctx, scanNode, null);
       }
       FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
-      return new ExternalSortExec(ctx, (SortNode) node.peek(), fragments);
+      return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, 
fragments);
     } else {
       Enforcer enforcer = ctx.getEnforcer();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index b9ab344..8abafec 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -32,6 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -49,7 +51,6 @@ import java.util.List;
 import java.util.concurrent.*;
 
 import static org.apache.tajo.storage.RawFile.RawFileAppender;
-import static org.apache.tajo.storage.RawFile.RawFileScanner;
 
 /**
  * This external sort algorithm can be characterized by the followings:
@@ -69,7 +70,8 @@ public class ExternalSortExec extends SortExec {
   private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_";
 
   private SortNode plan;
-  private final TableMeta meta;
+  /** the data format of intermediate file*/
+  private TableMeta intermediateMeta;
   /** the defaultFanout of external sort */
   private final int defaultFanout;
   /** It's the size of in-memory table. If memory consumption exceeds it, 
store the memory table into a disk. */
@@ -87,9 +89,9 @@ public class ExternalSortExec extends SortExec {
   /** local file system */
   private final RawLocalFileSystem localFS;
   /** final output files which are used for cleaning */
-  private List<FileFragment> finalOutputFiles = null;
+  private List<Chunk> finalOutputFiles = null;
   /** for directly merging sorted inputs */
-  private List<FileFragment> mergedInputFragments = null;
+  private List<Chunk> mergedInputFragments = null;
 
   ///////////////////////////////////////////////////
   // transient variables
@@ -108,8 +110,6 @@ public class ExternalSortExec extends SortExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), null, 
plan.getSortKeys());
 
     this.plan = plan;
-    this.meta = CatalogUtil.newTableMeta("ROWFILE");
-
     this.defaultFanout = 
context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
     if (defaultFanout < 2) {
       throw new 
PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " 
cannot be lower than 2");
@@ -121,18 +121,19 @@ public class ExternalSortExec extends SortExec {
     this.inMemoryTable = new 
TupleList(context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE));
 
     this.sortTmpDir = getExecutorTmpDir();
-    localDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-    localFS = new RawLocalFileSystem();
+    this.localDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    this.localFS = new RawLocalFileSystem();
+    this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW); 
//TODO change to SHUFFLE_FILE_FORMAT
   }
 
-  public ExternalSortExec(final TaskAttemptContext context,final SortNode plan,
+  public ExternalSortExec(final TaskAttemptContext context,final SortNode 
plan, final ScanNode scanNode,
                           final CatalogProtos.FragmentProto[] fragments) 
throws PhysicalPlanningException {
     this(context, plan);
 
     mergedInputFragments = TUtil.newList();
     for (CatalogProtos.FragmentProto proto : fragments) {
       FileFragment fragment = FragmentConvertor.convert(FileFragment.class, 
proto);
-      mergedInputFragments.add(fragment);
+      mergedInputFragments.add(new Chunk(fragment, 
scanNode.getTableDesc().getMeta()));
     }
   }
 
@@ -154,9 +155,9 @@ public class ExternalSortExec extends SortExec {
   /**
    * Sort a tuple block and store them into a chunk file
    */
-  private Path sortAndStoreChunk(int chunkId, TupleList tupleBlock)
+  private Chunk sortAndStoreChunk(int chunkId, TupleList tupleBlock)
       throws IOException {
-    TableMeta meta = CatalogUtil.newTableMeta("RAW");
+
     int rowNum = tupleBlock.size();
 
     long sortStart = System.currentTimeMillis();
@@ -165,7 +166,9 @@ public class ExternalSortExec extends SortExec {
 
     long chunkWriteStart = System.currentTimeMillis();
     Path outputPath = getChunkPathForWrite(0, chunkId);
-    final RawFileAppender appender = new RawFileAppender(context.getConf(), 
null, inSchema, meta, outputPath);
+    final RawFileAppender appender =
+        new RawFileAppender(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
+
     appender.init();
     for (Tuple t : sorted) {
       appender.addTuple(t);
@@ -179,7 +182,10 @@ public class ExternalSortExec extends SortExec {
         FileUtil.humanReadableByteCount(appender.getOffset(), false) + " 
bytes, " + rowNum + " rows, " +
         "sort time: " + (sortEnd - sortStart) + " msec, " +
         "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)");
-    return outputPath;
+
+    FileFragment frag = new FileFragment("", outputPath, 0,
+        new File(localFS.makeQualified(outputPath).toUri()).length());
+    return new Chunk(frag, intermediateMeta);
   }
 
   /**
@@ -188,10 +194,10 @@ public class ExternalSortExec extends SortExec {
    * @return All paths of chunks
    * @throws java.io.IOException
    */
-  private List<Path> sortAndStoreAllChunks() throws IOException {
+  private List<Chunk> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
     long memoryConsumption = 0;
-    List<Path> chunkPaths = TUtil.newList();
+    List<Chunk> chunkPaths = TUtil.newList();
 
     int chunkId = 0;
     long runStartTime = System.currentTimeMillis();
@@ -248,7 +254,8 @@ public class ExternalSortExec extends SortExec {
    * Get a local path from all temporal paths in round-robin manner.
    */
   private synchronized Path getChunkPathForWrite(int level, int chunkId) 
throws IOException {
-    return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level 
+"_" + chunkId, context.getConf());
+    return localFS.makeQualified(localDirAllocator.getLocalPathForWrite(
+        sortTmpDir + "/" + level + "_" + chunkId, context.getConf()));
   }
 
   @Override
@@ -266,7 +273,7 @@ public class ExternalSortExec extends SortExec {
       } else {
         // Try to sort all data, and store them as multiple chunks if memory 
exceeds
         long startTimeOfChunkSplit = System.currentTimeMillis();
-        List<Path> chunks = sortAndStoreAllChunks();
+        List<Chunk> chunks = sortAndStoreAllChunks();
         long endTimeOfChunkSplit = System.currentTimeMillis();
         info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - 
startTimeOfChunkSplit) + " msec");
 
@@ -276,14 +283,7 @@ public class ExternalSortExec extends SortExec {
         } else { // if input data exceeds main-memory at least once
 
           try {
-            List<FileFragment> fragments = TUtil.newList();
-            for (Path chunk : chunks) {
-              FileFragment frag = new FileFragment("", chunk, 0,
-                  new File(localFS.makeQualified(chunk).toUri()).length());
-              fragments.add(frag);
-            }
-
-            this.result = externalMergeAndSort(fragments);
+            this.result = externalMergeAndSort(chunks);
           } catch (Exception e) {
             throw new PhysicalPlanningException(e);
           }
@@ -324,11 +324,11 @@ public class ExternalSortExec extends SortExec {
     return computedFanout;
   }
 
-  private Scanner externalMergeAndSort(List<FileFragment> chunks)
+  private Scanner externalMergeAndSort(List<Chunk> chunks)
       throws IOException, ExecutionException, InterruptedException {
     int level = 0;
-    final List<FileFragment> inputFiles = TUtil.newList(chunks);
-    final List<FileFragment> outputFiles = TUtil.newList();
+    final List<Chunk> inputFiles = TUtil.newList(chunks);
+    final List<Chunk> outputFiles = TUtil.newList();
     int remainRun = inputFiles.size();
     int chunksSize = chunks.size();
 
@@ -341,7 +341,7 @@ public class ExternalSortExec extends SortExec {
       int remainInputRuns = inputFiles.size();
       int outChunkId = 0;
       int outputFileNum = 0;
-      List<Future<FileFragment>> futures = TUtil.newList();
+      List<Future<Chunk>> futures = TUtil.newList();
       // the number of files being merged in threads.
       List<Integer> numberOfMergingFiles = TUtil.newList();
 
@@ -364,7 +364,7 @@ public class ExternalSortExec extends SortExec {
           info(LOG, "Unbalanced merge possibility detected: number of remain 
input (" + remainInputRuns
               + ") and output files (" + outputFileNum + ") <= " + 
defaultFanout);
 
-          List<FileFragment> switched = TUtil.newList();
+          List<Chunk> switched = TUtil.newList();
           // switch the remain inputs to the next outputs
           for (int j = startIdx; j < inputFiles.size(); j++) {
             switched.add(inputFiles.get(j));
@@ -379,7 +379,7 @@ public class ExternalSortExec extends SortExec {
       // wait for all sort runners
       int finishedMerger = 0;
       int index = 0;
-      for (Future<FileFragment> future : futures) {
+      for (Future<Chunk> future : futures) {
         outputFiles.add(future.get());
         // Getting the number of merged files
         finishedMerger += numberOfMergingFiles.get(index++);
@@ -405,12 +405,12 @@ public class ExternalSortExec extends SortExec {
        * deleted at this point. However, for the ease of future code 
maintenance, we delete only type-C fragments here
        */
       int numDeletedFiles = 0;
-      for (FileFragment frag : inputFiles) {
-        if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
-          localFS.delete(frag.getPath(), true);
+      for (Chunk chunk : inputFiles) {
+        if 
(chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
+          localFS.delete(chunk.getFragment().getPath(), true);
           numDeletedFiles++;
 
-          if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: 
" + frag);
+          if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: 
" + chunk.getFragment());
         }
       }
       info(LOG, numDeletedFiles + " merged intermediate files deleted");
@@ -436,15 +436,15 @@ public class ExternalSortExec extends SortExec {
   /**
    * Merge Thread
    */
-  private class KWayMergerCaller implements Callable<FileFragment> {
+  private class KWayMergerCaller implements Callable<Chunk> {
     final int level;
     final int nextRunId;
-    final List<FileFragment> inputFiles;
+    final List<Chunk> inputFiles;
     final int startIdx;
     final int mergeFanout;
     final boolean updateInputStats;
 
-    public KWayMergerCaller(final int level, final int nextRunId, final 
List<FileFragment> inputFiles,
+    public KWayMergerCaller(final int level, final int nextRunId, final 
List<Chunk> inputFiles,
                             final int startIdx, final int mergeFanout, final 
boolean updateInputStats) {
       this.level = level;
       this.nextRunId = nextRunId;
@@ -455,11 +455,12 @@ public class ExternalSortExec extends SortExec {
     }
 
     @Override
-    public FileFragment call() throws Exception {
+    public Chunk call() throws Exception {
       final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
       info(LOG, mergeFanout + " files are being merged to an output file " + 
outputPath.getName());
       long mergeStartTime = System.currentTimeMillis();
-      final RawFileAppender output = new RawFileAppender(context.getConf(), 
null, inSchema, meta, outputPath);
+      final RawFileAppender output =
+          new RawFileAppender(context.getConf(), null, inSchema, 
intermediateMeta, outputPath);
       output.init();
       final Scanner merger = createKWayMerger(inputFiles, startIdx, 
mergeFanout);
       merger.init();
@@ -475,7 +476,7 @@ public class ExternalSortExec extends SortExec {
           + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
       File f = new File(localFS.makeQualified(outputPath).toUri());
       FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + 
outputPath.getName(), outputPath, 0, f.length());
-      return frag;
+      return new Chunk(frag, intermediateMeta);
     }
   }
 
@@ -489,7 +490,7 @@ public class ExternalSortExec extends SortExec {
   /**
    * Create a merged file scanner or k-way merge scanner.
    */
-  private Scanner createFinalMerger(List<FileFragment> inputs) throws 
IOException {
+  private Scanner createFinalMerger(List<Chunk> inputs) throws IOException {
     if (inputs.size() == 1) {
       this.result = getFileScanner(inputs.get(0));
     } else {
@@ -498,11 +499,11 @@ public class ExternalSortExec extends SortExec {
     return result;
   }
 
-  private Scanner getFileScanner(FileFragment frag) throws IOException {
-    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, 
frag);
+  private Scanner getFileScanner(Chunk chunk) throws IOException {
+    return TablespaceManager.getLocalFs().getScanner(chunk.getMeta(), 
inSchema, chunk.getFragment(), outSchema);
   }
 
-  private Scanner createKWayMerger(List<FileFragment> inputs, final int 
startChunkId, final int num) throws IOException {
+  private Scanner createKWayMerger(List<Chunk> inputs, final int startChunkId, 
final int num) throws IOException {
     final Scanner [] sources = new Scanner[num];
     for (int i = 0; i < num; i++) {
       sources[i] = getFileScanner(inputs.get(startChunkId + i));
@@ -773,6 +774,8 @@ public class ExternalSortExec extends SortExec {
 
   @Override
   public void close() throws IOException {
+    super.close();
+
     if (result != null) {
       result.close();
       try {
@@ -784,7 +787,8 @@ public class ExternalSortExec extends SortExec {
     }
 
     if (finalOutputFiles != null) {
-      for (FileFragment frag : finalOutputFiles) {
+      for (Chunk chunk : finalOutputFiles) {
+        FileFragment frag = chunk.getFragment();
         File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri());
         if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) {
           localFS.delete(frag.getPath(), true);
@@ -804,7 +808,6 @@ public class ExternalSortExec extends SortExec {
     }
 
     plan = null;
-    super.close();
   }
 
   @Override
@@ -833,4 +836,22 @@ public class ExternalSortExec extends SortExec {
       return inputStats;
     }
   }
+
+  private static class Chunk {
+    private FileFragment fragment;
+    private TableMeta meta;
+
+    public Chunk(FileFragment fragment, TableMeta meta) {
+      this.fragment = fragment;
+      this.meta = meta;
+    }
+
+    public FileFragment getFragment() {
+      return fragment;
+    }
+
+    public TableMeta getMeta() {
+      return meta;
+    }
+  }
 }

Reply via email to