Repository: tajo
Updated Branches:
  refs/heads/master 10159c7f8 -> 411a26d5d


http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 35a204a..49b0e11 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -19,37 +19,62 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
-import org.apache.tajo.storage.HashShuffleAppenderWrapper;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.tuple.memory.RowBlock;
+import org.apache.tajo.tuple.memory.RowWriter;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * <code>HashShuffleFileWriteExec</code> is a physical executor to store 
intermediate data into a number of
  * file outputs associated with shuffle keys. The file outputs are stored on 
local disks.
  */
 public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
-  private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
-  private ShuffleFileWriteNode plan;
+  private static final Log LOG = 
LogFactory.getLog(HashShuffleFileWriteExec.class);
+  private static final int MAXIMUM_INITIAL_BUFFER_SIZE = StorageUnit.MB;
+  private static final int MINIMUM_INITIAL_BUFFER_SIZE = 4 * StorageUnit.KB;
+  // Buffer usage is greater than threshold, it will be flush to local storage
+  private static final float BUFFER_THRESHOLD_FACTOR = 0.8f;
+
+  private final ShuffleFileWriteNode plan;
   private final TableMeta meta;
-  private Partitioner partitioner;
-  private Map<Integer, HashShuffleAppenderWrapper> appenderMap = new 
HashMap<>();
+  private final Partitioner partitioner;
   private final int numShuffleOutputs;
-  private final int [] shuffleKeyIds;
-  private HashShuffleAppenderManager hashShuffleAppenderManager;
-  private int numHashShuffleBufferTuples;
+  private final int[] shuffleKeyIds;
+  private final HashShuffleAppenderManager hashShuffleAppenderManager;
+  private final int maxBufferSize;
+  private final int bufferThreshold;
+  private final int initialBufferSize;
+  private final DataType[] dataTypes;
+
+  private final Map<Integer, MemoryRowBlock> partitionMemoryMap;
+  private long writtenBytes = 0;
+  private long usedBufferSize = 0;
+  private long totalBufferCapacity = 0;
 
   public HashShuffleFileWriteExec(TaskAttemptContext context,
                                   final ShuffleFileWriteNode plan, final 
PhysicalExec child) throws IOException {
@@ -71,105 +96,151 @@ public final class HashShuffleFileWriteExec extends 
UnaryPhysicalExec {
     }
     this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
     this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager();
-    this.numHashShuffleBufferTuples = 
context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE);
+    this.maxBufferSize = 
context.getQueryContext().getInt(SessionVars.HASH_SHUFFLE_BUFFER_SIZE) * 
StorageUnit.MB;
+    this.bufferThreshold = (int) (maxBufferSize * BUFFER_THRESHOLD_FACTOR);
+    this.dataTypes = SchemaUtil.toDataTypes(outSchema);
+
+    if(numShuffleOutputs > 0){
+      //calculate initial buffer by total partition. a buffer size will be 4Kb 
~ 1MB
+      this.initialBufferSize = Math.min(MAXIMUM_INITIAL_BUFFER_SIZE,
+          Math.max(maxBufferSize / numShuffleOutputs, 
MINIMUM_INITIAL_BUFFER_SIZE));
+    } else {
+      this.initialBufferSize = MINIMUM_INITIAL_BUFFER_SIZE;
+    }
+
+    this.partitionMemoryMap = Maps.newHashMap();
   }
 
   @Override
   public void init() throws IOException {
     super.init();
   }
-  
-  private HashShuffleAppenderWrapper getAppender(int partId) throws 
IOException {
-    HashShuffleAppenderWrapper appender = appenderMap.get(partId);
-    if (appender == null) {
-      appender = hashShuffleAppenderManager.getAppender(context.getConf(),
-          context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, 
outSchema);
-      appenderMap.put(partId, appender);
-    }
-    return appender;
-  }
-
-  Map<Integer, TupleList> partitionTuples = new HashMap<>();
-  long writtenBytes = 0L;
 
   @Override
   public Tuple next() throws IOException {
     try {
       Tuple tuple;
       int partId;
-      int tupleCount = 0;
       long numRows = 0;
       while (!context.isStopped() && (tuple = child.next()) != null) {
-        tupleCount++;
-        numRows++;
 
         partId = partitioner.getPartition(tuple);
-        TupleList partitionTupleList = partitionTuples.get(partId);
-        if (partitionTupleList == null) {
-          partitionTupleList = new TupleList(1000);
-          partitionTuples.put(partId, partitionTupleList);
+        MemoryRowBlock rowBlock = partitionMemoryMap.get(partId);
+        if (rowBlock == null) {
+          rowBlock = new MemoryRowBlock(dataTypes, initialBufferSize, true, 
plan.getStorageType());
+          partitionMemoryMap.put(partId, rowBlock);
+          totalBufferCapacity += rowBlock.capacity();
         }
-        partitionTupleList.add(tuple);
-        if (tupleCount >= numHashShuffleBufferTuples) {
-          for (Map.Entry<Integer, TupleList> entry : 
partitionTuples.entrySet()) {
-            int appendPartId = entry.getKey();
-            HashShuffleAppenderWrapper appender = getAppender(appendPartId);
-            int appendedSize = appender.addTuples(context.getTaskId(), 
entry.getValue());
-            writtenBytes += appendedSize;
-            entry.getValue().clear();
+
+        RowWriter writer = rowBlock.getWriter();
+        long prevUsedMem = rowBlock.usedMem();
+        totalBufferCapacity -= rowBlock.capacity();
+
+        writer.addTuple(tuple);
+        numRows++;
+
+        totalBufferCapacity += rowBlock.capacity(); // calculate resizeable 
buffer capacity
+        usedBufferSize += (rowBlock.usedMem() - prevUsedMem);
+
+        // if total buffer capacity are required more than maxBufferSize,
+        // all partitions are flushed and the buffers are released
+        if (totalBufferCapacity > maxBufferSize) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Too low buffer usage. threshold: %s, 
total capacity: %s, used: %s",
+                FileUtil.humanReadableByteCount(maxBufferSize, false),
+                FileUtil.humanReadableByteCount(totalBufferCapacity, false),
+                FileUtil.humanReadableByteCount(usedBufferSize, false)));
           }
-          tupleCount = 0;
+
+          //flush and release buffer
+          flushBuffer(partitionMemoryMap, true);
+          writtenBytes += usedBufferSize;
+          totalBufferCapacity = usedBufferSize = 0;
+
+        } else if (usedBufferSize > bufferThreshold) {
+          //flush and reuse buffer
+          flushBuffer(partitionMemoryMap, false);
+          writtenBytes += usedBufferSize;
+          usedBufferSize = 0;
         }
       }
 
-      // processing remained tuples
-      for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) {
-        int appendPartId = entry.getKey();
-        HashShuffleAppenderWrapper appender = getAppender(appendPartId);
-        int appendedSize = appender.addTuples(context.getTaskId(), 
entry.getValue());
-        writtenBytes += appendedSize;
-        entry.getValue().clear();
-      }
+      // flush remaining buffers
+      flushBuffer(partitionMemoryMap, true);
 
+      writtenBytes += usedBufferSize;
+      usedBufferSize = totalBufferCapacity = 0;
       TableStats aggregated = (TableStats) child.getInputStats().clone();
       aggregated.setNumBytes(writtenBytes);
       aggregated.setNumRows(numRows);
       context.setResultStats(aggregated);
 
-      partitionTuples.clear();
-
       return null;
     } catch (RuntimeException e) {
       LOG.error(e.getMessage(), e);
       throw new IOException(e);
-    } catch (Exception e) {
+    } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
       throw new IOException(e);
     }
   }
 
+  /**
+   * flush all buffer to local storage
+   */
+  private void flushBuffer(Map<Integer, MemoryRowBlock> partitionMemoryMap, 
boolean releaseBuffer)
+      throws IOException, ExecutionException, InterruptedException {
+    List<Future<MemoryRowBlock>> resultList = Lists.newArrayList();
+    ArrayList<Integer> unusedBuffer = Lists.newArrayList();
+
+    for (Map.Entry<Integer, MemoryRowBlock> entry : 
partitionMemoryMap.entrySet()) {
+      int appendPartId = entry.getKey();
+
+      MemoryRowBlock memoryRowBlock = entry.getValue();
+      if (memoryRowBlock.getMemory().isReadable()) {
+        //flush and release buffer
+        resultList.add(hashShuffleAppenderManager.
+            writePartitions(meta, outSchema, context.getTaskId(), 
appendPartId, memoryRowBlock, releaseBuffer));
+      } else {
+        if (releaseBuffer) {
+          memoryRowBlock.release();
+        } else {
+          unusedBuffer.add(appendPartId);
+        }
+      }
+    }
+
+    // wait for flush to storage
+    for (Future<MemoryRowBlock> future : resultList) {
+      future.get();
+    }
+
+    if (releaseBuffer) {
+      partitionMemoryMap.clear();
+    } else {
+      // release the unused partition
+      for (Integer id : unusedBuffer) {
+        MemoryRowBlock memoryRowBlock = partitionMemoryMap.remove(id);
+        memoryRowBlock.release();
+      }
+    }
+  }
+
   @Override
   public void rescan() throws IOException {
-    // nothing to do   
+    throw new TajoRuntimeException(new UnsupportedException());
   }
 
   @Override
   public void close() throws IOException{
-    super.close();
-    if (appenderMap != null) {
-      appenderMap.clear();
-      appenderMap = null;
-    }
-
-    for (TupleList eachList : partitionTuples.values()) {
-      eachList.clear();
+    if (partitionMemoryMap.size() > 0) {
+      for (RowBlock rowBlock : partitionMemoryMap.values()) {
+        rowBlock.release();
+      }
+      partitionMemoryMap.clear();
     }
-    partitionTuples.clear();
-    partitionTuples = null;
-
-    partitioner = null;
-    plan = null;
 
     progress = 1.0f;
+    super.close();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index b652b0a..cabbf4c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -26,6 +26,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.List;
 
 public abstract class SortExec extends UnaryPhysicalExec {
 
@@ -39,7 +40,7 @@ public abstract class SortExec extends UnaryPhysicalExec {
     this.comparator = new BaseTupleComparator(inSchema, sortSpecs);
   }
 
-  protected TupleSorter getSorter(TupleList tupleSlots) {
+  protected TupleSorter getSorter(List tupleSlots) {
     if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) {
       return new VectorizedSorter(tupleSlots, sortSpecs, 
comparator.getSortKeyIds());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
index abf2808..39c67d6 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
@@ -22,6 +22,7 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 
 import java.util.Collections;
+import java.util.List;
 
 public interface TupleSorter {
 
@@ -29,10 +30,10 @@ public interface TupleSorter {
 
   class DefaultSorter implements TupleSorter {
 
-    private final TupleList target;
+    private final List target;
     private final TupleComparator comparator;
 
-    public DefaultSorter(TupleList target, TupleComparator comparator) {
+    public DefaultSorter(List target, TupleComparator comparator) {
       this.target = target;
       this.comparator = comparator;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
index 82f7153..44649cd 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
@@ -26,6 +26,7 @@ import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
 
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Extract raw level values (primitive or String/byte[]) from each of key 
columns before sorting
@@ -35,7 +36,7 @@ public class VectorizedSorter extends ComparableVector 
implements IndexedSortabl
 
   private final int[] mappings;         // index indirection
 
-  public VectorizedSorter(TupleList source, SortSpec[] sortKeys, int[] 
keyIndex) {
+  public VectorizedSorter(List source, SortSpec[] sortKeys, int[] keyIndex) {
     super(source.size(), sortKeys, keyIndex);
     source.toArray(tuples);   // wish it's array list
     mappings = new int[tuples.length];

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 815c44b..e64cd51 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -194,8 +194,11 @@ public class Repartitioner {
       long maxStats = Long.MIN_VALUE;
       int maxStatsScanIdx = -1;
       StringBuilder nonLeafScanNamesBuilder = new StringBuilder();
+
+      String intermediateDataFormat = 
schedulerContext.getMasterContext().getConf().getVar(ConfVars.SHUFFLE_FILE_FORMAT);
       for (int i = 0; i < scans.length; i++) {
-        if 
(scans[i].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase("RAW")) {
+
+        if 
(scans[i].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase(intermediateDataFormat))
 {
           // Intermediate data scan
           hasNonLeafNode = true;
           largeScanIndexList.add(i);

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4b0dac9..c7cac4f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -338,6 +338,10 @@ public class TajoWorker extends CompositeService {
       return;
     }
 
+    if (hashShuffleAppenderManager != null) {
+      hashShuffleAppenderManager.shutdown();
+    }
+
     if(webServer != null) {
       try {
         webServer.stop();

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index be920a1..fbd1948 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -25,10 +25,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
@@ -548,6 +545,7 @@ public class TaskImpl implements Task {
 
   private FileFragment[] localizeFetchedData(File file, String name, TableMeta 
meta)
       throws IOException {
+
     Configuration c = new Configuration(systemConf);
     c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
     FileSystem fs = FileSystem.get(c);
@@ -561,7 +559,7 @@ public class TaskImpl implements Task {
       if (f.getLen() == 0) {
         continue;
       }
-      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+      tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, 
f.getLen());
       listTablets.add(tablet);
     }
 
@@ -569,7 +567,7 @@ public class TaskImpl implements Task {
     synchronized (localChunks) {
       for (FileChunk chunk : localChunks) {
         if (name.equals(chunk.getEbId())) {
-          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), 
chunk.startOffset(), chunk.length());
+          tablet = new FileFragment(name, fs.makeQualified(new 
Path(chunk.getFile().getPath())), chunk.startOffset(), chunk.length());
           listTablets.add(tablet);
         }
       }
@@ -611,10 +609,9 @@ public class TaskImpl implements Task {
           try {
             FileChunk fetched = fetcher.get();
             if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED 
&& fetched != null
-          && fetched.getFile() != null) {
+                && fetched.getFile() != null) {
               if (fetched.fromRemote() == false) {
-          localChunks.add(fetched);
-          LOG.info("Add a new FileChunk to local chunk list");
+                localChunks.add(fetched);
               }
               break;
             }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
index 55fa45d..b4d4f2b 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
@@ -40,11 +40,6 @@ public class BufferPool {
   }
 
   static {
-    /* TODO Enable thread cache
-    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
-    *  Because the TaskRunner thread is newly created
-    * */
-
     if (TajoConstants.IS_TEST_MODE) {
       /* Disable pooling buffers for memory usage  */
       ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
@@ -53,7 +48,7 @@ public class BufferPool {
       ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
     } else {
       TajoConf tajoConf = new TajoConf();
-      ALLOCATOR = createPooledByteBufAllocator(true, 
tajoConf.getBoolean(ALLOW_CACHE, false), 0);
+      ALLOCATOR = createPooledByteBufAllocator(true, 
tajoConf.getBoolean(ALLOW_CACHE, true), 0);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 62df9a3..79203a2 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -30,6 +33,9 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.Pair;
 
 import java.io.IOException;
@@ -37,13 +43,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 
 public class HashShuffleAppenderManager {
   private static final Log LOG = 
LogFactory.getLog(HashShuffleAppenderManager.class);
 
-  private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> 
appenderMap =
-          new ConcurrentHashMap<>();
+  private ConcurrentMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> 
appenderMap = Maps.newConcurrentMap();
+  private ConcurrentMap<Integer, ExecutorService> executors = 
Maps.newConcurrentMap(); // for parallel writing
+  private List<String> temporalPaths = Lists.newArrayList();
+
   private TajoConf systemConf;
   private FileSystem defaultFS;
   private FileSystem localFS;
@@ -59,66 +67,88 @@ public class HashShuffleAppenderManager {
     // initialize DFS and LocalFileSystems
     defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
     localFS = FileSystem.getLocal(systemConf);
-    pageSize = 
systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
+    pageSize = 
systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 
StorageUnit.MB;
+
+    Iterable<Path> allLocalPath = lDirAllocator.getAllLocalPathsToRead(".", 
systemConf);
+
+    //add async hash shuffle writer
+    for (Path path : allLocalPath) {
+      temporalPaths.add(localFS.makeQualified(path).toString());
+      executors.put(temporalPaths.size() - 1, 
Executors.newSingleThreadExecutor());
+    }
   }
 
-  public HashShuffleAppenderWrapper getAppender(TajoConf tajoConf, 
ExecutionBlockId ebId, int partId,
-                                                TableMeta meta, Schema 
outSchema) throws IOException {
-    synchronized (appenderMap) {
-      Map<Integer, PartitionAppenderMeta> partitionAppenderMap = 
appenderMap.get(ebId);
+  protected int getVolumeId(Path path) {
+    int i = 0;
+    for (String rootPath : temporalPaths) {
+      if (path.toString().startsWith(rootPath)) {
+        break;
+      }
+      i++;
+    }
+    Preconditions.checkPositionIndex(i, temporalPaths.size() - 1);
+    return i;
+  }
+
+  public synchronized HashShuffleAppenderWrapper getAppender(MemoryRowBlock 
memoryRowBlock, ExecutionBlockId ebId, 
+                                                             int partId, 
TableMeta meta, Schema outSchema) 
+      throws IOException {
+    
+    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = 
appenderMap.get(ebId);
+
+    if (partitionAppenderMap == null) {
+      partitionAppenderMap = new ConcurrentHashMap<>();
+      appenderMap.put(ebId, partitionAppenderMap);
+    }
 
-      if (partitionAppenderMap == null) {
-        partitionAppenderMap = new ConcurrentHashMap<>();
-        appenderMap.put(ebId, partitionAppenderMap);
+    PartitionAppenderMeta partitionAppenderMeta = 
partitionAppenderMap.get(partId);
+    if (partitionAppenderMeta == null) {
+      Path dataFile = getDataFile(ebId, partId);
+      FileSystem fs = dataFile.getFileSystem(systemConf);
+      if (fs.exists(dataFile)) {
+        FileStatus status = fs.getFileStatus(dataFile);
+        LOG.info("File " + dataFile + " already exists, size=" + 
status.getLen());
       }
 
-      PartitionAppenderMeta partitionAppenderMeta = 
partitionAppenderMap.get(partId);
-      if (partitionAppenderMeta == null) {
-        Path dataFile = getDataFile(ebId, partId);
-        FileSystem fs = dataFile.getFileSystem(systemConf);
-        if (fs.exists(dataFile)) {
-          FileStatus status = fs.getFileStatus(dataFile);
-          LOG.info("File " + dataFile + " already exists, size=" + 
status.getLen());
-        }
-
-        if (!fs.exists(dataFile.getParent())) {
-          fs.mkdirs(dataFile.getParent());
-        }
-
-        FileTablespace space = (FileTablespace) 
TablespaceManager.get(dataFile.toUri());
-        FileAppender appender = (FileAppender) space.getAppender(meta, 
outSchema, dataFile);
-        appender.enableStats();
-        appender.init();
-
-        partitionAppenderMeta = new PartitionAppenderMeta();
-        partitionAppenderMeta.partId = partId;
-        partitionAppenderMeta.dataFile = dataFile;
-        partitionAppenderMeta.appender = new HashShuffleAppenderWrapper(ebId, 
partId, pageSize, appender);
-        partitionAppenderMeta.appender.init();
-        partitionAppenderMap.put(partId, partitionAppenderMeta);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Create Hash shuffle file(partId=" + partId + "): " + 
dataFile);
-        }
+      if (!fs.exists(dataFile.getParent())) {
+        fs.mkdirs(dataFile.getParent());
       }
 
-      return partitionAppenderMeta.appender;
+      DirectRawFileWriter appender =
+          new DirectRawFileWriter(systemConf, null, outSchema, meta, dataFile, 
memoryRowBlock);
+      appender.enableStats();
+      appender.init();
+
+      partitionAppenderMeta = new PartitionAppenderMeta();
+      partitionAppenderMeta.partId = partId;
+      partitionAppenderMeta.dataFile = dataFile;
+      partitionAppenderMeta.appender =
+          new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender, 
getVolumeId(dataFile));
+      partitionAppenderMeta.appender.init();
+      partitionAppenderMap.put(partId, partitionAppenderMeta);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Create Hash shuffle file(partId=" + partId + "): " + 
dataFile);
+      }
     }
+
+    return partitionAppenderMeta.appender;
   }
 
   public static int getPartParentId(int partId, TajoConf tajoConf) {
-    return partId % 
tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
+    return partId % tajoConf.getIntVar(ConfVars.SHUFFLE_HASH_PARENT_DIRS);
   }
 
   private Path getDataFile(ExecutionBlockId ebId, int partId) throws 
IOException {
     try {
       // the base dir for an output dir
       String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" 
+ "/" + ebId.getId() + "/hash-shuffle";
-      Path baseDirPath = 
localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, 
systemConf));
+      Path baseDirPath = 
lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf);
       //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
 
       // If EB has many partition, too many shuffle file are in single 
directory.
-      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, 
systemConf), "" + partId);
+      return localFS.makeQualified(
+          StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, 
systemConf), "" + partId));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw new IOException(e);
@@ -126,10 +156,7 @@ public class HashShuffleAppenderManager {
   }
 
   public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws 
IOException {
-    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
-    synchronized (appenderMap) {
-      partitionAppenderMap = appenderMap.remove(ebId);
-    }
+    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = 
appenderMap.remove(ebId);
 
     if (partitionAppenderMap == null) {
       LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", not a hash 
shuffle");
@@ -158,16 +185,43 @@ public class HashShuffleAppenderManager {
   }
 
   public void finalizeTask(TaskAttemptId taskId) {
-    synchronized (appenderMap) {
-      Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
+    Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
         appenderMap.get(taskId.getTaskId().getExecutionBlockId());
-      if (partitionAppenderMap == null) {
-        return;
-      }
+    if (partitionAppenderMap == null) {
+      return;
+    }
 
-      for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
-        eachAppender.appender.taskFinished(taskId);
+    for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
+      eachAppender.appender.taskFinished(taskId);
+    }
+  }
+
+  /**
+   * Asynchronously write partitions.
+   */
+  public Future<MemoryRowBlock> writePartitions(TableMeta meta, Schema schema, 
final TaskAttemptId taskId, int partId,
+                                                final MemoryRowBlock rowBlock,
+                                                final boolean release) throws 
IOException {
+
+    HashShuffleAppenderWrapper appender =
+        getAppender(rowBlock, taskId.getTaskId().getExecutionBlockId(), 
partId, meta, schema);
+    ExecutorService executor = executors.get(appender.getVolumeId());
+    return executor.submit(new Callable<MemoryRowBlock>() {
+      @Override
+      public MemoryRowBlock call() throws Exception {
+        appender.writeRowBlock(taskId, rowBlock);
+
+        if (release) rowBlock.release();
+        else rowBlock.clear();
+
+        return rowBlock;
       }
+    });
+  }
+
+  public void shutdown() {
+    for (ExecutorService service : executors.values()) {
+      service.shutdownNow();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
index bcd4388..ccd528b 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.util.Pair;
 
 import java.io.Closeable;
@@ -36,9 +38,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class HashShuffleAppenderWrapper implements Closeable {
   private static Log LOG = LogFactory.getLog(HashShuffleAppenderWrapper.class);
 
-  private FileAppender appender;
+  private DirectRawFileWriter appender;
   private AtomicBoolean closed = new AtomicBoolean(false);
   private int partId;
+  private int volumeId;
 
   //<taskId,<page start offset,<task start, task end>>>
   private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> 
taskTupleIndexes;
@@ -56,11 +59,13 @@ public class HashShuffleAppenderWrapper implements 
Closeable {
 
   private ExecutionBlockId ebId;
 
-  public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int 
pageSize, FileAppender appender) {
+  public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int 
pageSize,
+                                    DirectRawFileWriter appender, int 
volumeId) {
     this.ebId = ebId;
     this.partId = partId;
     this.appender = appender;
     this.pageSize = pageSize;
+    this.volumeId = volumeId;
   }
 
   public void init() throws IOException {
@@ -73,41 +78,36 @@ public class HashShuffleAppenderWrapper implements 
Closeable {
    * Write multiple tuples. Each tuple is written by a FileAppender which is 
responsible specified partition.
    * After writing if a current page exceeds pageSize, pageOffset will be 
added.
    * @param taskId
-   * @param tuples
+   * @param rowBlock
    * @return written bytes
    * @throws java.io.IOException
    */
-  public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws 
IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return 0;
-      }
-      long currentPos = appender.getOffset();
+  public MemoryRowBlock writeRowBlock(TaskAttemptId taskId, MemoryRowBlock 
rowBlock) throws IOException {
+    if (closed.get()) {
+      return rowBlock;
+    }
 
-      for (Tuple eachTuple: tuples) {
-        appender.addTuple(eachTuple);
-      }
-      long posAfterWritten = appender.getOffset();
+    appender.writeRowBlock(rowBlock);
+    appender.flush();
 
-      int writtenBytes = (int)(posAfterWritten - currentPos);
+    int rows = rowBlock.rows();
+    long posAfterWritten = appender.getOffset();
 
-      int nextRowNum = rowNumInPage + tuples.size();
-      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = 
taskTupleIndexes.get(taskId);
-      if (taskIndexes == null) {
-        taskIndexes = new ArrayList<>();
-        taskTupleIndexes.put(taskId, taskIndexes);
-      }
-      taskIndexes.add(
-              new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, 
nextRowNum)));
-      rowNumInPage = nextRowNum;
-
-      if (posAfterWritten - currentPage.getFirst() > pageSize) {
-        nextPage(posAfterWritten);
-        rowNumInPage = 0;
-      }
+    int nextRowNum = rowNumInPage + rows;
+    List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = 
taskTupleIndexes.get(taskId);
+    if (taskIndexes == null) {
+      taskIndexes = new ArrayList<>();
+      taskTupleIndexes.put(taskId, taskIndexes);
+    }
+    taskIndexes.add(
+        new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, 
nextRowNum)));
+    rowNumInPage = nextRowNum;
 
-      return writtenBytes;
+    if (posAfterWritten - currentPage.getFirst() > pageSize) {
+      nextPage(posAfterWritten);
+      rowNumInPage = 0;
     }
+    return rowBlock;
   }
 
   public long getOffset() throws IOException {
@@ -129,42 +129,35 @@ public class HashShuffleAppenderWrapper implements 
Closeable {
   }
 
   public void flush() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
+    if (closed.get()) {
+      return;
     }
+    appender.flush();
   }
 
   @Override
   public void close() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
-      offset = appender.getOffset();
-      if (offset > currentPage.getFirst()) {
-        nextPage(offset);
-      }
-      appender.close();
-      if (LOG.isDebugEnabled()) {
-        if (!pages.isEmpty()) {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size()
-              + ", lastPage=" + pages.get(pages.size() - 1));
-        } else {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size());
-        }
+    if (closed.getAndSet(true)) {
+      return;
+    }
+    appender.flush();
+    offset = appender.getOffset();
+    if (offset > currentPage.getFirst()) {
+      nextPage(offset);
+    }
+    appender.close();
+    if (LOG.isDebugEnabled()) {
+      if (!pages.isEmpty()) {
+        LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size()
+            + ", lastPage=" + pages.get(pages.size() - 1));
+      } else {
+        LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + 
offset + ", pages=" + pages.size());
       }
-      closed.set(true);
     }
   }
 
   public TableStats getStats() {
-    synchronized(appender) {
-      return appender.getStats();
-    }
+    return appender.getStats();
   }
 
   public List<Pair<Long, Integer>> getPages() {
@@ -184,4 +177,8 @@ public class HashShuffleAppenderWrapper implements 
Closeable {
   public void taskFinished(TaskAttemptId taskId) {
     taskTupleIndexes.remove(taskId);
   }
+
+  public int getVolumeId() {
+    return volumeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index cda39f9..26bd135 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -36,14 +36,15 @@ import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.BitArray;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 
 public class RawFile {
@@ -99,7 +100,7 @@ public class RawFile {
       }
 
       if(buf == null) {
-        buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, 
DEFAULT_BUFFER_SIZE));
+        buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, 
DEFAULT_BUFFER_SIZE)).order(ByteOrder.LITTLE_ENDIAN);
         buffer = buf.nioBuffer(0, buf.capacity());
       }
 
@@ -282,96 +283,94 @@ public class RawFile {
         }
 
         switch (columnTypes[i].getType()) {
-          case BOOLEAN :
-            outTuple.put(i, DatumFactory.createBool(buffer.get()));
-            break;
-
-          case BIT :
-            outTuple.put(i, DatumFactory.createBit(buffer.get()));
-            break;
-
-          case CHAR :
-            int realLen = readRawVarint32();
-            byte[] buf = new byte[realLen];
-            buffer.get(buf);
-            outTuple.put(i, DatumFactory.createChar(buf));
-            break;
-
-          case INT2 :
-            outTuple.put(i, DatumFactory.createInt2(buffer.getShort()));
-            break;
-
-          case INT4 :
-            outTuple.put(i, 
DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
-            break;
-
-          case INT8 :
-            outTuple.put(i, 
DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
-            break;
-
-          case FLOAT4 :
-            outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
-            break;
-
-          case FLOAT8 :
-            outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
-            break;
-
-          case TEXT : {
-            int len = readRawVarint32();
-            byte [] strBytes = new byte[len];
-            buffer.get(strBytes);
-            outTuple.put(i, DatumFactory.createText(strBytes));
-            break;
-          }
+        case BOOLEAN:
+          outTuple.put(i, DatumFactory.createBool(buffer.get()));
+          break;
 
-          case BLOB : {
-            int len = readRawVarint32();
-            byte [] rawBytes = new byte[len];
-            buffer.get(rawBytes);
-            outTuple.put(i, DatumFactory.createBlob(rawBytes));
-            break;
-          }
+        case BIT:
+          outTuple.put(i, DatumFactory.createBit(buffer.get()));
+          break;
 
-          case PROTOBUF: {
-            int len = readRawVarint32();
-            byte [] rawBytes = new byte[len];
-            buffer.get(rawBytes);
+        case CHAR:
+          int realLen = readRawVarint32();
+          byte[] buf = new byte[realLen];
+          buffer.get(buf);
+          outTuple.put(i, DatumFactory.createChar(buf));
+          break;
 
-            outTuple.put(i, ProtobufDatumFactory.createDatum(columnTypes[i], 
rawBytes));
-            break;
-          }
+        case INT2:
+          outTuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+          break;
 
-          case INET4 :
-            byte [] ipv4Bytes = new byte[4];
-            buffer.get(ipv4Bytes);
-            outTuple.put(i, DatumFactory.createInet4(ipv4Bytes));
-            break;
-
-          case DATE: {
-            int val = buffer.getInt();
-            if (val < Integer.MIN_VALUE + 1) {
-              outTuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], 
val));
-            }
-            break;
+        case INT4:
+          outTuple.put(i, 
DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
+          break;
+
+        case INT8:
+          outTuple.put(i, 
DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
+          break;
+
+        case FLOAT4:
+          outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+          break;
+
+        case FLOAT8:
+          outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+          break;
+
+        case TEXT: {
+          int len = readRawVarint32();
+          byte[] strBytes = new byte[len];
+          buffer.get(strBytes);
+          outTuple.put(i, DatumFactory.createText(strBytes));
+          break;
+        }
+
+        case BLOB: {
+          int len = readRawVarint32();
+          byte[] rawBytes = new byte[len];
+          buffer.get(rawBytes);
+          outTuple.put(i, DatumFactory.createBlob(rawBytes));
+          break;
+        }
+
+        case PROTOBUF: {
+          int len = readRawVarint32();
+          byte[] rawBytes = new byte[len];
+          buffer.get(rawBytes);
+
+          outTuple.put(i, ProtobufDatumFactory.createDatum(columnTypes[i], 
rawBytes));
+          break;
+        }
+
+        case INET4:
+          outTuple.put(i, DatumFactory.createInet4(buffer.getInt()));
+          break;
+
+        case DATE: {
+          int val = buffer.getInt();
+          if (val < Integer.MIN_VALUE + 1) {
+            outTuple.put(i, DatumFactory.createNullDatum());
+          } else {
+            outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
           }
-          case TIME:
-          case TIMESTAMP: {
-            long val = buffer.getLong();
-            if (val < Long.MIN_VALUE + 1) {
-              outTuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], 
val));
-            }
-            break;
+          break;
+        }
+        case TIME:
+        case TIMESTAMP: {
+          long val = buffer.getLong();
+          if (val < Long.MIN_VALUE + 1) {
+            outTuple.put(i, DatumFactory.createNullDatum());
+          } else {
+            outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
           }
-          case NULL_TYPE:
-            outTuple.put(i, NullDatum.get());
-            break;
+          break;
+        }
+        case NULL_TYPE:
+          outTuple.put(i, NullDatum.get());
+          break;
 
-          default:
+        default:
         }
       }
 
@@ -400,6 +399,7 @@ public class RawFile {
       buffer.clear();
       forceFillBuffer = true;
       filePosition = fragment.getStartKey();
+      recordCount = 0;
       channel.position(filePosition);
       eos = false;
     }
@@ -461,331 +461,11 @@ public class RawFile {
     }
   }
 
-  public static class RawFileAppender extends FileAppender {
-    private FileChannel channel;
-    private RandomAccessFile randomAccessFile;
-    private DataType[] columnTypes;
-
-    private ByteBuffer buffer;
-    private ByteBuf buf;
-    private BitArray nullFlags;
-    private int headerSize = 0;
-    private static final int RECORD_SIZE = 4;
-    private long pos;
-
-    private TableStatistics stats;
-
-    public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
-                           Schema schema, TableMeta meta, Path workDir) throws 
IOException {
-      super(conf, taskAttemptId, schema, meta, workDir);
-    }
-
-    public void init() throws IOException {
-      File file;
-      try {
-        if (path.toUri().getScheme() != null) {
-          file = new File(path.toUri());
-        } else {
-          file = new File(path.toString());
-        }
-      } catch (IllegalArgumentException iae) {
-        throw new IOException(iae);
-      }
-
-      randomAccessFile = new RandomAccessFile(file, "rw");
-      channel = randomAccessFile.getChannel();
-      pos = 0;
-
-      columnTypes = new DataType[schema.size()];
-      for (int i = 0; i < schema.size(); i++) {
-        columnTypes[i] = schema.getColumn(i).getDataType();
-      }
-
-      buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, 
DEFAULT_BUFFER_SIZE));
-      buffer = buf.nioBuffer(0, buf.capacity());
-
-      // comput the number of bytes, representing the null flags
-
-      nullFlags = new BitArray(schema.size());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
-      if (tableStatsEnabled) {
-        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
-      }
-
-      super.init();
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    private void flushBuffer() throws IOException {
-      buffer.flip();
-      channel.write(buffer);
-      buffer.clear();
-    }
-
-    private boolean flushBufferAndReplace(int recordOffset, int 
sizeToBeWritten)
-        throws IOException {
-
-      // if the buffer reaches the limit,
-      // write the bytes from 0 to the previous record.
-      if (buffer.remaining() < sizeToBeWritten) {
-
-        int limit = buffer.position();
-        buffer.limit(recordOffset);
-        buffer.flip();
-        channel.write(buffer);
-        buffer.position(recordOffset);
-        buffer.limit(limit);
-        buffer.compact();
-
-        //increase the write-buffer
-        if(buffer.remaining() < sizeToBeWritten) {
-          buf.setIndex(buffer.position(), buffer.limit());
-          buf.ensureWritable(sizeToBeWritten);
-          buffer = buf.nioBuffer(0, buf.capacity());
-          buffer.position(buf.readerIndex());
-        }
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    /**
-     * Encode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n A signed 32-bit integer.
-     * @return An unsigned 32-bit integer, stored in a signed int because
-     *         Java has no explicit unsigned support.
-     */
-    public static int encodeZigZag32(final int n) {
-      // Note:  the right-shift must be arithmetic
-      return (n << 1) ^ (n >> 31);
-    }
-
-    /**
-     * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n A signed 64-bit integer.
-     * @return An unsigned 64-bit integer, stored in a signed int because
-     *         Java has no explicit unsigned support.
-     */
-    public static long encodeZigZag64(final long n) {
-      // Note:  the right-shift must be arithmetic
-      return (n << 1) ^ (n >> 63);
-    }
-
-    /**
-     * Encode and write a varint.  {@code value} is treated as
-     * unsigned, so it won't be sign-extended if negative.
-     */
-    public void writeRawVarint32(int value) throws IOException {
-      while (true) {
-        if ((value & ~0x7F) == 0) {
-          buffer.put((byte) value);
-          return;
-        } else {
-          buffer.put((byte) ((value & 0x7F) | 0x80));
-          value >>>= 7;
-        }
-      }
-    }
-
-    /**
-     * Compute the number of bytes that would be needed to encode a varint.
-     * {@code value} is treated as unsigned, so it won't be sign-extended if
-     * negative.
-     */
-    public static int computeRawVarint32Size(final int value) {
-      if ((value & (0xffffffff <<  7)) == 0) return 1;
-      if ((value & (0xffffffff << 14)) == 0) return 2;
-      if ((value & (0xffffffff << 21)) == 0) return 3;
-      if ((value & (0xffffffff << 28)) == 0) return 4;
-      return 5;
-    }
-
-    /** Encode and write a varint. */
-    public void writeRawVarint64(long value) throws IOException {
-      while (true) {
-        if ((value & ~0x7FL) == 0) {
-          buffer.put((byte) value);
-          return;
-        } else {
-          buffer.put((byte) ((value & 0x7F) | 0x80));
-          value >>>= 7;
-        }
-      }
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-
-      if (buffer.remaining() < headerSize) {
-        flushBuffer();
-      }
-
-      // skip the row header
-      int recordOffset = buffer.position();
-      buffer.position(recordOffset + headerSize);
-      // reset the null flags
-      nullFlags.clear();
-      for (int i = 0; i < schema.size(); i++) {
-        // it is to calculate min/max values, and it is only used for the 
intermediate file.
-        if (tableStatsEnabled) {
-          stats.analyzeField(i, t);
-        }
-
-        if (t.isBlankOrNull(i)) {
-          nullFlags.set(i);
-          continue;
-        }
-
-        // 10 is the maximum bytes size of all types
-        if (flushBufferAndReplace(recordOffset, 10)) {
-          recordOffset = 0;
-        }
-
-        switch(columnTypes[i].getType()) {
-          case NULL_TYPE:
-            nullFlags.set(i);
-            continue;
-
-          case BOOLEAN:
-          case BIT:
-            buffer.put(t.getByte(i));
-            break;
-
-          case INT2 :
-            buffer.putShort(t.getInt2(i));
-            break;
-
-          case INT4 :
-            writeRawVarint32(encodeZigZag32(t.getInt4(i)));
-            break;
-
-          case INT8 :
-            writeRawVarint64(encodeZigZag64(t.getInt8(i)));
-            break;
-
-          case FLOAT4 :
-            buffer.putFloat(t.getFloat4(i));
-            break;
-
-          case FLOAT8 :
-            buffer.putDouble(t.getFloat8(i));
-            break;
-
-          case CHAR:
-          case TEXT: {
-            byte [] strBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, strBytes.length + 
computeRawVarint32Size(strBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(strBytes.length);
-            buffer.put(strBytes);
-            break;
-          }
+  public static class RawFileAppender extends DirectRawFileWriter {
 
-        case DATE:
-          buffer.putInt(t.getInt4(i));
-          break;
-
-        case TIME:
-        case TIMESTAMP:
-          buffer.putLong(t.getInt8(i));
-          break;
-
-          case BLOB : {
-            byte [] rawBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 
computeRawVarint32Size(rawBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case PROTOBUF: {
-            byte [] rawBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 
computeRawVarint32Size(rawBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case INET4 :
-            buffer.put(t.getBytes(i));
-            break;
-
-          default:
-            throw new IOException("Cannot support data type: " + 
columnTypes[i].getType());
-        }
-      }
-
-      // write a record header
-      int bufferPos = buffer.position();
-      buffer.position(recordOffset);
-      buffer.putInt(bufferPos - recordOffset);
-      byte [] flags = nullFlags.toArray();
-      buffer.putShort((short) flags.length);
-      buffer.put(flags);
-
-      pos += bufferPos - recordOffset;
-      buffer.position(bufferPos);
-
-      if (tableStatsEnabled) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if(buffer != null){
-        flushBuffer();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flush();
-      if (tableStatsEnabled) {
-        stats.setNumBytes(getOffset());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " 
+ path);
-      }
-
-      if(buf != null){
-        buffer.clear();
-        buffer = null;
-
-        buf.release();
-        buf = null;
-      }
-
-      IOUtils.cleanup(LOG, channel, randomAccessFile);
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (tableStatsEnabled) {
-        stats.setNumBytes(pos);
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
+    public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId, 
Schema schema,
+                           TableMeta meta, Path workDir) throws IOException {
+      super(conf, taskAttemptId, schema, meta, workDir, null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
index 0172484..550de63 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -35,7 +35,6 @@ import org.apache.tajo.tuple.RowBlockReader;
 import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.tuple.memory.RowBlock;
 import org.apache.tajo.tuple.memory.UnSafeTuple;
-import org.apache.tajo.tuple.memory.ZeroCopyTuple;
 import org.apache.tajo.unit.StorageUnit;
 
 import java.io.File;
@@ -45,6 +44,9 @@ import java.io.IOException;
 public class DirectRawFileScanner extends FileScanner implements 
SeekableScanner {
   private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class);
 
+  public static final String READ_BUFFER_SIZE = 
"tajo.storage.raw.io.read-buffer.bytes";
+  public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
+
   private SeekableInputChannel channel;
 
   private boolean eos = false;
@@ -53,7 +55,7 @@ public class DirectRawFileScanner extends FileScanner 
implements SeekableScanner
   private long filePosition;
   private long endOffset;
 
-  private ZeroCopyTuple unSafeTuple = new UnSafeTuple();
+  private UnSafeTuple unSafeTuple = new UnSafeTuple();
   private RowBlock tupleBuffer;
   private RowBlockReader reader;
 
@@ -61,15 +63,19 @@ public class DirectRawFileScanner extends FileScanner 
implements SeekableScanner
     super(conf, schema, meta, fragment);
   }
 
+  @Override
   public void init() throws IOException {
     initChannel();
 
-    tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 64 * 
StorageUnit.KB, true);
-
-    reader = tupleBuffer.getReader();
-
-    fetchNeeded = !next(tupleBuffer);
+    if (tupleBuffer == null) {
+      tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema),
+          conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+    } else {
+      tupleBuffer.clear();
+    }
 
+    fetchNeeded = true;
+    eos = false;
     super.init();
   }
 
@@ -121,12 +127,12 @@ public class DirectRawFileScanner extends FileScanner 
implements SeekableScanner
   public void seek(long offset) throws IOException {
     channel.seek(offset);
     filePosition = channel.position();
-    tupleBuffer.getMemory().clear();
+    tupleBuffer.clear();
     fetchNeeded = true;
   }
 
   public boolean next(RowBlock rowblock) throws IOException {
-    long reamin = reader.remainForRead();
+    long reamin = reader == null ? 0 : reader.remainForRead();
     boolean ret = rowblock.copyFromChannel(channel);
     reader = rowblock.getReader();
     filePosition += rowblock.getMemory().writerPosition() - reamin;
@@ -136,7 +142,7 @@ public class DirectRawFileScanner extends FileScanner 
implements SeekableScanner
   private boolean fetchNeeded = true;
 
   @Override
-  public Tuple next() throws IOException {
+  public UnSafeTuple next() throws IOException {
     if(eos) {
       return null;
     }
@@ -164,9 +170,9 @@ public class DirectRawFileScanner extends FileScanner 
implements SeekableScanner
   public void reset() throws IOException {
     // reload initial buffer
     filePosition = fragment.getStartKey();
+    recordCount = 0;
     seek(filePosition);
     eos = false;
-    reader.reset();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/411a26d5/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
index 23ef059..9cbb7a0 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -26,11 +26,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FileAppender;
@@ -38,6 +41,9 @@ import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils.TupleConverter;
+import org.apache.tajo.tuple.memory.RowWriter;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
 import org.apache.tajo.unit.StorageUnit;
 
 import java.io.File;
@@ -46,23 +52,36 @@ import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 
 public class DirectRawFileWriter extends FileAppender {
-  public static final String FILE_EXTENSION = "draw";
-  private static final int BUFFER_SIZE = 64 * StorageUnit.KB;
   private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
 
-  private FileChannel channel;
-  private RandomAccessFile randomAccessFile;
-  private FSDataOutputStream fos;
-  private boolean isLocal;
-  private long pos;
+  public static final String WRITE_BUFFER_SIZE = 
"tajo.storage.raw.io.write-buffer.bytes";
+  public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
+  private static final float OVERFLOW_RATIO = 1.1f;
+  protected FileChannel channel;
 
-  private TableStatistics stats;
-  private ShuffleType shuffleType;
-  private MemoryRowBlock memoryRowBlock;
+  protected RandomAccessFile randomAccessFile;
+  protected FSDataOutputStream fos;
+  protected long pos;
+  protected TableStatistics stats;
+
+  protected TupleConverter tupleConverter;
+  protected MemoryRowBlock rowBlock;
+  protected boolean analyzeField;
+  protected boolean hasExternalBuf;
+  protected boolean isLocal;
+
+  public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
+                             final Schema schema, final TableMeta meta, final 
Path path)
+      throws IOException {
+    this(conf, taskAttemptId, schema, meta, path, null);
+  }
 
   public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
-                             final Schema schema, final TableMeta meta, final 
Path path) throws IOException {
+                             final Schema schema, final TableMeta meta, final 
Path path,
+                             MemoryRowBlock rowBlock) throws IOException {
     super(conf, taskAttemptId, schema, meta, path);
+    this.rowBlock = rowBlock;
+    this.hasExternalBuf = rowBlock != null;
   }
 
   @Override
@@ -91,19 +110,89 @@ public class DirectRawFileWriter extends FileAppender {
 
     if (tableStatsEnabled) {
       this.stats = new TableStatistics(this.schema, columnStatsEnabled);
-      this.shuffleType = PlannerUtil.getShuffleType(
+      if (ShuffleType.RANGE_SHUFFLE == PlannerUtil.getShuffleType(
           meta.getOption(StorageConstants.SHUFFLE_TYPE,
-              PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
+              PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)))) {
+        this.analyzeField = true;
+      }
+    }
+
+    if (rowBlock == null) {
+      int bufferSize = (int) (conf.getInt(WRITE_BUFFER_SIZE, 
DEFAULT_BUFFER_SIZE) * OVERFLOW_RATIO);
+      rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 
bufferSize, true, meta.getDataFormat());
     }
 
-    memoryRowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 
BUFFER_SIZE, true);
+    tupleConverter = initConverter();
+
     pos = 0;
     super.init();
   }
 
+  public TupleConverter initConverter() {
+    switch (meta.getDataFormat()) {
+    case BuiltinStorages.DRAW:
+      return getDrawConverter();
+    case BuiltinStorages.RAW:
+      return getRawConverter();
+    default:
+      throw new TajoInternalError(new UnsupportedException());
+    }
+  }
+
+  private TupleConverter getDrawConverter() {
+    return new TupleConverter() {
+
+      @Override
+      public void convert(Tuple tuple, RowWriter writer) {
+        if (analyzeField) {
+          if (tuple instanceof UnSafeTuple) {
+
+            for (int i = 0; i < writer.dataTypes().length; i++) {
+              // it is to calculate min/max values, and it is only used for 
the intermediate file.
+              stats.analyzeField(i, tuple);
+            }
+            // write direct to memory
+            writer.addTuple(tuple);
+          } else {
+            writer.startRow();
+
+            for (int i = 0; i < writer.dataTypes().length; i++) {
+              // it is to calculate min/max values, and it is only used for 
the intermediate file.
+              stats.analyzeField(i, tuple);
+              writeField(i, tuple, writer);
+            }
+            writer.endRow();
+          }
+        } else {
+          // write direct to memory
+          writer.addTuple(tuple);
+        }
+      }
+    };
+  }
+
+  private TupleConverter getRawConverter() {
+    return new TupleConverter() {
+
+      @Override
+      public void convert(Tuple tuple, RowWriter writer) {
+        writer.startRow();
+
+        for (int i = 0; i < writer.dataTypes().length; i++) {
+          // it is to calculate min/max values, and it is only used for the 
intermediate file.
+          if (analyzeField) {
+            stats.analyzeField(i, tuple);
+          }
+          writeField(i, tuple, writer);
+        }
+        writer.endRow();
+      }
+    };
+  }
+
   @Override
   public long getOffset() throws IOException {
-    return pos + memoryRowBlock.getMemory().writerPosition();
+    return hasExternalBuf ? pos : pos + rowBlock.getMemory().writerPosition();
   }
 
   public void writeRowBlock(MemoryRowBlock rowBlock) throws IOException {
@@ -113,33 +202,27 @@ public class DirectRawFileWriter extends FileAppender {
       pos += rowBlock.getMemory().writeTo(fos);
     }
 
-    rowBlock.getMemory().clear();
-
     if (tableStatsEnabled) {
-      stats.incrementRows(rowBlock.rows() - stats.getNumRows());
+      stats.incrementRows(rowBlock.rows());
     }
   }
 
   @Override
   public void addTuple(Tuple t) throws IOException {
-    if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-      // it is to calculate min/max values, and it is only used for the 
intermediate file.
-      for (int i = 0; i < schema.size(); i++) {
-        stats.analyzeField(i, t);
-      }
-    }
 
-    memoryRowBlock.getWriter().addTuple(t);
+    tupleConverter.convert(t, rowBlock.getWriter());
 
-    if(memoryRowBlock.getMemory().readableBytes() >= BUFFER_SIZE) {
-      writeRowBlock(memoryRowBlock);
+    if(rowBlock.usedMem() > DEFAULT_BUFFER_SIZE) {
+      writeRowBlock(rowBlock);
+      rowBlock.clear();
     }
   }
 
   @Override
   public void flush() throws IOException {
-    if(memoryRowBlock.getMemory().isReadable()) {
-      writeRowBlock(memoryRowBlock);
+    if(!hasExternalBuf && rowBlock.getMemory().isReadable()) {
+      writeRowBlock(rowBlock);
+      rowBlock.clear();
     }
   }
 
@@ -155,7 +238,9 @@ public class DirectRawFileWriter extends FileAppender {
     }
 
     IOUtils.cleanup(LOG, channel, randomAccessFile, fos);
-    memoryRowBlock.release();
+    if(!hasExternalBuf && rowBlock != null) {
+      rowBlock.release();
+    }
   }
 
   @Override

Reply via email to