TAJO-1340: Change the default output file format.

Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/20c7e184
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/20c7e184
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/20c7e184

Branch: refs/heads/branch-0.11.0
Commit: 20c7e184c67b3ce517374cb45fb74f3dc454edc3
Parents: f0e22b7
Author: Jinho Kim <[email protected]>
Authored: Mon Sep 14 16:33:44 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Mon Sep 14 16:33:44 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/client/QueryClient.java     |   3 +-
 .../org/apache/tajo/client/QueryClientImpl.java |  75 +++++---
 .../apache/tajo/client/SessionConnection.java   |   1 +
 .../org/apache/tajo/client/TajoClientImpl.java  |  13 +-
 .../org/apache/tajo/client/TajoClientUtil.java  |  10 +-
 .../org/apache/tajo/jdbc/FetchResultSet.java    |  80 ++++++---
 .../apache/tajo/jdbc/TajoMemoryResultSet.java   |  70 ++++++--
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |  26 +--
 tajo-client/src/main/proto/ClientProtos.proto   |  10 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   3 +
 tajo-common/pom.xml                             |   4 +
 .../main/java/org/apache/tajo/SessionVars.java  |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   7 +-
 .../org/apache/tajo/tuple/BaseTupleBuilder.java |  11 ++
 .../tuple/memory/OffHeapRowBlockWriter.java     |  14 ++
 .../tajo/tuple/memory/OffHeapRowWriter.java     |  19 +-
 .../org/apache/tajo/util/CompressionUtil.java   |  73 ++++++++
 tajo-common/src/main/proto/DataTypes.proto      |   1 +
 tajo-common/src/main/proto/tajo_protos.proto    |   4 +
 .../tajo/client/TestQueryClientExceptions.java  |  13 +-
 .../tajo/querymaster/TestTaskStatusUpdate.java  |   6 +-
 .../testDistinctAggregation_case10.result       |   2 +-
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 .../testSelectResultWithNullFalse.result        |   2 +-
 .../testSelectResultWithNullTrue.result         |   2 +-
 ...estSelectResultWithNullTrueDeprecated.result |   2 +-
 .../TestTajoCli/testStopWhenError.result        |   2 +-
 .../testStopWhenErrorDeprecated.result          |   2 +-
 .../TestWindowQuery/testStdDevPop1.result       |   4 +-
 .../TestWindowQuery/testStdDevSamp1.result      |   8 +-
 .../tajo/engine/planner/global/DataChannel.java |   3 +-
 .../engine/planner/global/GlobalPlanner.java    |  14 +-
 .../planner/physical/PhysicalPlanUtil.java      |  17 +-
 .../engine/planner/physical/SeqScanExec.java    |  38 +++-
 .../tajo/master/TajoMasterClientService.java    |  27 ++-
 .../exec/NonForwardQueryResultFileScanner.java  | 177 +++++++++++++++++--
 .../exec/NonForwardQueryResultScanner.java      |  20 ++-
 .../NonForwardQueryResultSystemScanner.java     |  75 +++++++-
 .../apache/tajo/master/exec/QueryExecutor.java  |  53 ++++--
 .../java/org/apache/tajo/querymaster/Stage.java |   4 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  18 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |  75 ++++++--
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |  88 ++++++++-
 .../tajo/plan/logical/PersistentStoreNode.java  |   3 +-
 tajo-project/pom.xml                            |   5 +
 .../org/apache/tajo/rpc/RpcClientManager.java   |  12 +-
 47 files changed, 869 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9a08667..92986e0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -36,6 +36,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1340: Change the default output file format. (jinho)
+
     TAJO-1835: TajoClient::executeQueryAndGetResult should throw 
     Query(Failed|Killed)Exception. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java 
b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
index ad9bfc5..f436afb 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
@@ -35,6 +35,7 @@ import java.io.Closeable;
 import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 public interface QueryClient extends Closeable {
 
@@ -106,7 +107,7 @@ public interface QueryClient extends Closeable {
 
   GetQueryResultResponse getResultResponse(QueryId queryId) throws 
TajoException;
 
-  TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int 
fetchRowNum) throws TajoException;
+  Future<TajoMemoryResultSet> fetchNextQueryResultAsync(final QueryId queryId, 
final int fetchRowNum);
 
   boolean updateQuery(final String sql) throws TajoException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 8b7f749..144e3b6 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.client;
 
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,8 +33,9 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
+import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.exception.*;
-import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import 
org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
 import org.apache.tajo.jdbc.FetchResultSet;
@@ -48,17 +50,21 @@ import java.net.InetSocketAddress;
 import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.tajo.exception.ExceptionUtil.throwIfError;
 import static org.apache.tajo.exception.ExceptionUtil.throwsIfThisError;
 import static org.apache.tajo.exception.ReturnStateUtil.ensureOk;
 import static org.apache.tajo.exception.ReturnStateUtil.isSuccess;
-import static org.apache.tajo.ipc.ClientProtos.*;
 import static 
org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
 
 public class QueryClientImpl implements QueryClient {
   private static final Log LOG = LogFactory.getLog(QueryClientImpl.class);
+  private static final CodecType DEFAULT_CODEC = CodecType.SNAPPY;
+  private final ExecutorService executor;
   private final SessionConnection conn;
   private final int defaultFetchRows;
   // maxRows number is limit value of resultSet. The value must be >= 0, and 0 
means there is not limit.
@@ -69,6 +75,7 @@ public class QueryClientImpl implements QueryClient {
     this.defaultFetchRows = 
this.conn.getProperties().getInt(SessionVars.FETCH_ROWNUM.getConfVars().keyname(),
         SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal);
     this.maxRows = 0;
+    this.executor = Executors.newSingleThreadExecutor();
   }
 
   @Override
@@ -93,6 +100,7 @@ public class QueryClientImpl implements QueryClient {
 
   @Override
   public void close() {
+    executor.shutdown();
   }
 
   @Override
@@ -150,7 +158,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public ClientProtos.SubmitQueryResponse executeQuery(final String sql) {
+  public SubmitQueryResponse executeQuery(final String sql) {
 
     final BlockingInterface stub = conn.getTMStub();
     final QueryRequest request = buildQueryRequest(sql, false);
@@ -171,7 +179,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String 
json) {
+  public SubmitQueryResponse executeQueryWithJson(final String json) {
     final BlockingInterface stub = conn.getTMStub();
     final QueryRequest request = buildQueryRequest(json, true);
 
@@ -185,7 +193,7 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public ResultSet executeQueryAndGetResult(String sql) throws TajoException {
 
-    ClientProtos.SubmitQueryResponse response = executeQuery(sql);
+    SubmitQueryResponse response = executeQuery(sql);
     throwIfError(response.getState());
 
     QueryId queryId = new QueryId(response.getQueryId());
@@ -203,7 +211,7 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public ResultSet executeJsonQueryAndGetResult(final String json) throws 
TajoException {
 
-    ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json);
+    SubmitQueryResponse response = executeQueryWithJson(json);
     throwIfError(response.getState());
 
     QueryId queryId = new QueryId(response.getQueryId());
@@ -325,30 +333,53 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final 
int fetchRowNum) throws TajoException {
+  public Future<TajoMemoryResultSet> fetchNextQueryResultAsync(final QueryId 
queryId, final int fetchRowNum) {
+
+    final SettableFuture<TajoMemoryResultSet> future = SettableFuture.create();
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          future.set(fetchNextQueryResult(queryId, fetchRowNum));
+        } catch (Throwable e) {
+          future.setException(e);
+        }
+      }
+    });
+    return future;
+  }
+
+  protected TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, 
final int fetchRowNum)
+      throws TajoException {
+
+    boolean compress = 
conn.getProperties().getBool(SessionVars.COMPRESSED_RESULT_TRANSFER);
 
     final BlockingInterface stub = conn.getTMStub();
-    final GetQueryResultDataRequest request = 
GetQueryResultDataRequest.newBuilder()
-        .setSessionId(conn.sessionId)
+    final GetQueryResultDataRequest.Builder request = 
GetQueryResultDataRequest.newBuilder();
+    request.setSessionId(conn.sessionId)
         .setQueryId(queryId.getProto())
-        .setFetchRowNum(fetchRowNum)
-        .build();
+        .setFetchRowNum(fetchRowNum);
+    if (compress) {
+      request.setCompressCodec(DEFAULT_CODEC);
+    }
 
     GetQueryResultDataResponse response;
     try {
-      response = stub.getQueryResultData(null, request);
+      response = stub.getQueryResultData(null, request.build());
     } catch (ServiceException e) {
       throw new RuntimeException(e);
     }
 
     throwIfError(response.getState());
 
-    ClientProtos.SerializedResultSet resultSet = response.getResultSet();
-    return new TajoMemoryResultSet(queryId,
-        new Schema(resultSet.getSchema()),
-        resultSet.getSerializedTuplesList(),
-        resultSet.getSerializedTuplesCount(),
-        getClientSideSessionVars());
+    if(response.hasResultSet()) {
+      SerializedResultSet resultSet = response.getResultSet();
+      return new TajoMemoryResultSet(queryId,
+          new Schema(resultSet.getSchema()),
+          resultSet, getClientSideSessionVars());
+    } else {
+      return TajoClientUtil.createNullResultSet(queryId);
+    }
   }
 
   @Override
@@ -388,7 +419,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public List<ClientProtos.BriefQueryInfo> getRunningQueryList() {
+  public List<BriefQueryInfo> getRunningQueryList() {
 
     final BlockingInterface stmb = conn.getTMStub();
 
@@ -404,7 +435,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() {
+  public List<BriefQueryInfo> getFinishedQueryList() {
 
     final BlockingInterface stub = conn.getTMStub();
 
@@ -420,7 +451,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public List<ClientProtos.WorkerResourceInfo> getClusterInfo() {
+  public List<WorkerResourceInfo> getClusterInfo() {
 
     final BlockingInterface stub = conn.getTMStub();
     final GetClusterInfoRequest request = GetClusterInfoRequest.newBuilder()
@@ -555,7 +586,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   private QueryIdRequest buildQueryIdRequest(QueryId queryId) {
-    return ClientProtos.QueryIdRequest.newBuilder()
+    return QueryIdRequest.newBuilder()
         .setSessionId(SessionIdProto.newBuilder().setId(getSessionId()))
         .setQueryId(queryId.getProto())
         .build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java 
b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index ac0ff52..3307ade 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -217,6 +217,7 @@ public class SessionConnection implements Closeable {
 
     ensureOk(response.getState());
     updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+    properties.putAll(sessionVarsCache);
     return Collections.unmodifiableMap(sessionVarsCache);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 3d69309..d9c7ff7 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -41,6 +41,7 @@ import java.net.URI;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.concurrent.Future;
 
 @ThreadSafe
 public class TajoClientImpl extends SessionConnection implements TajoClient, 
QueryClient, CatalogAdminClient {
@@ -86,6 +87,13 @@ public class TajoClientImpl extends SessionConnection 
implements TajoClient, Que
   public TajoClientImpl(ServiceTracker serviceTracker, @Nullable String 
baseDatabase) throws SQLException {
     this(serviceTracker, baseDatabase, new KeyValueSet());
   }
+
+  @Override
+  public void close() {
+    queryClient.close();
+    super.close();
+  }
+
   /*------------------------------------------------------------------------*/
   // QueryClient wrappers
   /*------------------------------------------------------------------------*/
@@ -130,8 +138,9 @@ public class TajoClientImpl extends SessionConnection 
implements TajoClient, Que
     return queryClient.getResultResponse(queryId);
   }
 
-  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final 
int fetchRowNum) throws TajoException {
-    return queryClient.fetchNextQueryResult(queryId, fetchRowNum);
+  @Override
+  public Future<TajoMemoryResultSet> fetchNextQueryResultAsync(QueryId 
queryId, int fetchRowNum) {
+    return queryClient.fetchNextQueryResultAsync(queryId, fetchRowNum);
   }
 
   public boolean updateQuery(final String sql) throws TajoException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index 3f30f97..baafbe8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -33,7 +33,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 
 import java.io.IOException;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 
 public class TajoClientUtil {
 
@@ -101,16 +100,15 @@ public class TajoClientUtil {
       ClientProtos.SerializedResultSet serializedResultSet = 
response.getResultSet();
       return new TajoMemoryResultSet(new QueryId(response.getQueryId()),
           new Schema(serializedResultSet.getSchema()),
-          serializedResultSet.getSerializedTuplesList(),
-          response.getMaxRowNum(),
+          serializedResultSet,
           client.getClientSideSessionVars());
     }
   }
 
   public static final ResultSet NULL_RESULT_SET =
-      new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), 
null, 0, null);
+      new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), 
null, null);
 
-  public static ResultSet createNullResultSet(QueryId queryId) {
-    return new TajoMemoryResultSet(queryId, new Schema(), null, 0, null);
+  public static TajoMemoryResultSet createNullResultSet(QueryId queryId) {
+    return new TajoMemoryResultSet(queryId, new Schema(), null, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java 
b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index 8f15710..6bfa258 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -21,15 +21,23 @@ package org.apache.tajo.jdbc;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.client.QueryClient;
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.DefaultTajoException;
+import org.apache.tajo.exception.SQLExceptionUtil;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class FetchResultSet extends TajoResultSetBase {
   protected QueryClient tajoClient;
   private int fetchRowNum;
   private TajoMemoryResultSet currentResultSet;
+  private Future<TajoMemoryResultSet> nextResultSet;
   private boolean finished;
   // maxRows number is limit value of resultSet. The value must be >= 0, and 0 
means there is not limit.
   private int maxRows;
@@ -49,35 +57,44 @@ public class FetchResultSet extends TajoResultSetBase {
     }
 
     try {
-      Tuple tuple = null;
-      if (currentResultSet != null) {
-        currentResultSet.next();
-        tuple = currentResultSet.cur;
-      }
-      if (currentResultSet == null || tuple == null) {
+      while (!finished) {
+        Tuple tuple;
         if (currentResultSet != null) {
-          currentResultSet.close();
-          currentResultSet = null;
-        }
-        currentResultSet = tajoClient.fetchNextQueryResult(queryId, 
fetchRowNum);
-        if (currentResultSet == null) {
-          finished = true;
-          return null;
-        }
+          currentResultSet.next();
+          tuple = currentResultSet.cur;
 
-        currentResultSet.next();
-        tuple = currentResultSet.cur;
-      }
-      if (tuple == null) {
-        if (currentResultSet != null) {
-          currentResultSet.close();
-          currentResultSet = null;
+          if (tuple == null) {
+            currentResultSet.close();
+            currentResultSet = null;
+          } else {
+            return tuple;
+          }
+
+        } else {
+          if(nextResultSet == null) {
+            nextResultSet = tajoClient.fetchNextQueryResultAsync(queryId, 
fetchRowNum);
+          } else {
+            currentResultSet = nextResultSet.get();
+
+            if(currentResultSet.totalRow == 0) {
+              currentResultSet.close();
+              currentResultSet = null;
+              nextResultSet = null;
+              finished = true;
+            } else {
+              // pre-fetch
+              nextResultSet = tajoClient.fetchNextQueryResultAsync(queryId, 
fetchRowNum);
+            }
+          }
         }
-        finished = true;
       }
-      return tuple;
-    } catch (Throwable t) {
+
+      return null;
+    } catch (ExecutionException e) {
+      Throwable t = e.getCause();
       throw new IOException(t.getMessage(), t);
+    } catch (Throwable t) {
+      throw new TajoInternalError(t);
     }
   }
 
@@ -87,6 +104,21 @@ public class FetchResultSet extends TajoResultSetBase {
       currentResultSet.close();
       currentResultSet = null;
     }
+
+    if (nextResultSet != null) {
+      try {
+        nextResultSet.get(1, TimeUnit.SECONDS).close();
+      } catch (ExecutionException e) {
+        Throwable t = e.getCause();
+        if (t instanceof DefaultTajoException) {
+          throw SQLExceptionUtil.toSQLException((DefaultTajoException) t);
+        } else {
+          throw new TajoSQLException(Errors.ResultCode.INTERNAL_ERROR, t, 
t.getMessage());
+        }
+      } catch (Throwable t) {
+        throw new TajoSQLException(Errors.ResultCode.INTERNAL_ERROR, t, 
t.getMessage());
+      }
+    }
     tajoClient.closeQuery(queryId);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java 
b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
index 4114d03..49c9e5a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
@@ -18,46 +18,78 @@
 
 package org.apache.tajo.jdbc;
 
-import com.google.protobuf.ByteString;
+import io.netty.buffer.Unpooled;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.memory.HeapRowBlockReader;
+import org.apache.tajo.tuple.memory.HeapTuple;
+import org.apache.tajo.tuple.memory.MemoryBlock;
+import org.apache.tajo.tuple.memory.ResizableMemoryBlock;
+import org.apache.tajo.util.CompressionUtil;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TajoMemoryResultSet extends TajoResultSetBase {
-  private List<ByteString> serializedTuples;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private RowStoreUtil.RowStoreDecoder decoder;
+  private MemoryBlock memory;
+  private RowBlockReader reader;
+  private volatile boolean closed;
 
-  public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> 
serializedTuples, int maxRowNum,
+
+  public TajoMemoryResultSet(QueryId queryId, Schema schema, 
SerializedResultSet resultSet,
                              Map<String, String> clientSideSessionVars) {
     super(queryId, schema, clientSideSessionVars);
-    this.totalRow = maxRowNum;
-    this.serializedTuples = serializedTuples;
-    this.decoder = RowStoreUtil.createDecoder(schema);
+    if(resultSet != null && resultSet.getRows() > 0) {
+      this.totalRow = resultSet.getRows();
+
+      try {
+        // decompress if a codec is specified
+        if (resultSet.hasDecompressCodec()) {
+          byte[] compressed = resultSet.getSerializedTuples().toByteArray();
+          byte[] uncompressed = 
CompressionUtil.decompress(resultSet.getDecompressCodec(), compressed);
+          memory = new 
ResizableMemoryBlock(Unpooled.wrappedBuffer(uncompressed));
+        } else {
+          memory = new 
ResizableMemoryBlock(resultSet.getSerializedTuples().asReadOnlyByteBuffer());
+        }
+      } catch (IOException e) {
+        throw new TajoInternalError(e);
+      }
+
+      reader = new HeapRowBlockReader(memory, SchemaUtil.toDataTypes(schema), 
resultSet.getRows());
+    } else {
+      this.totalRow = 0;
+      this.curRow = 0;
+    }
   }
 
   @Override
   protected void init() {
     cur = null;
     curRow = 0;
+    wasNull = false;
   }
 
   @Override
-  public synchronized void close() throws SQLException {
-    if (closed.getAndSet(true)) {
+  public void close() throws SQLException {
+    if (closed) {
       return;
     }
 
+    closed = true;
     cur = null;
     curRow = -1;
-    serializedTuples = null;
+    totalRow = 0;
+    reader = null;
+    if(memory != null) {
+      memory.release();
+      memory = null;
+    }
   }
 
   @Override
@@ -68,7 +100,13 @@ public class TajoMemoryResultSet extends TajoResultSetBase {
   @Override
   protected Tuple nextTuple() throws IOException {
     if (curRow < totalRow) {
-      cur = decoder.toTuple(serializedTuples.get(curRow).toByteArray());
+
+      HeapTuple heapTuple = new HeapTuple();
+      if (reader.next(heapTuple)) {
+        cur = heapTuple;
+      } else {
+        cur = null;
+      }
       return cur;
     } else {
       return null;
@@ -76,6 +114,6 @@ public class TajoMemoryResultSet extends TajoResultSetBase {
   }
 
   public boolean hasResult() {
-    return serializedTuples.size() > 0;
+    return totalRow > 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java 
b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index 3d8d9aa..a8d1239 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -106,7 +106,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private boolean getBoolean(Tuple tuple, int index) {
-    return handleNull(tuple, index) ? false : tuple.getBool(index);
+    return handleNull(tuple, index) ? false : tuple.asDatum(index).asBool();
   }
 
   @Override
@@ -120,7 +120,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private byte getByte(Tuple tuple, int index) {
-    return handleNull(tuple, index) ? 0 : tuple.getByte(index);
+    return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asByte();
   }
 
   @Override
@@ -134,7 +134,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private byte[] getBytes(Tuple tuple, int index) {
-    return handleNull(tuple, index) ? null : tuple.getBytes(index);
+    return handleNull(tuple, index) ? null : 
tuple.asDatum(index).asByteArray();
   }
 
   @Override
@@ -148,7 +148,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private double getDouble(Tuple tuple, int index) {
-    return handleNull(tuple, index) ? 0.0d : tuple.getFloat8(index);
+    return handleNull(tuple, index) ? 0.0d : tuple.asDatum(index).asFloat8();
   }
 
   @Override
@@ -162,7 +162,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private float getFloat(Tuple tuple, int index) throws SQLException {
-    return handleNull(tuple, index) ? 0.0f : tuple.getFloat4(index);
+    return handleNull(tuple, index) ? 0.0f : tuple.asDatum(index).asFloat4();
   }
 
   @Override
@@ -176,7 +176,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private int getInt(Tuple tuple, int index) throws SQLException {
-    return handleNull(tuple, index) ? 0 : tuple.getInt4(index);
+    return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt4();
   }
 
   @Override
@@ -190,7 +190,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private long getLong(Tuple tuple, int index) throws SQLException {
-    return handleNull(tuple, index) ? 0 : tuple.getInt8(index);
+    return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt8();
   }
 
   @Override
@@ -232,7 +232,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
         return toTimestamp(tuple.getTimeDate(index), timezone);
       }
       default:
-        return tuple.getText(index);
+        return tuple.asDatum(index).asChars();
     }
   }
 
@@ -247,7 +247,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private short getShort(Tuple tuple, int index) throws SQLException {
-    return handleNull(tuple, index) ? 0 : tuple.getInt2(index);
+    return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt2();
   }
 
   @Override
@@ -273,7 +273,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
       case TIMESTAMP:
         return TimestampDatum.asChars(tuple.getTimeDate(index), timezone, 
false);
       default :
-        return tuple.getText(index);
+        return tuple.asDatum(index).asChars();
     }
   }
 
@@ -298,7 +298,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private Date getDate(Tuple tuple, TimeZone tz, int index) throws 
SQLException {
-    return handleNull(tuple, index) ? null : toDate(tuple.getTimeDate(index), 
tz);
+    return handleNull(tuple, index) ? null : 
toDate(tuple.asDatum(index).asTimeMeta(), tz);
   }
 
   private Date toDate(TimeMeta tm, TimeZone tz) {
@@ -329,7 +329,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private Time getTime(Tuple tuple, TimeZone tz, int index) throws 
SQLException {
-    return handleNull(tuple, index) ? null : toTime(tuple.getTimeDate(index), 
tz);
+    return handleNull(tuple, index) ? null : 
toTime(tuple.asDatum(index).asTimeMeta(), tz);
   }
 
   private Time toTime(TimeMeta tm, TimeZone tz) {
@@ -360,7 +360,7 @@ public abstract class TajoResultSetBase implements 
ResultSet {
   }
 
   private Timestamp getTimestamp(Tuple tuple, TimeZone tz, int index) throws 
SQLException {
-    return handleNull(tuple, index) ? null : 
toTimestamp(tuple.getTimeDate(index), tz);
+    return handleNull(tuple, index) ? null : 
toTimestamp(tuple.asDatum(index).asTimeMeta(), tz);
   }
 
   private Timestamp toTimestamp(TimeMeta tm, TimeZone tz) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto 
b/tajo-client/src/main/proto/ClientProtos.proto
index 49b0c40..13cb04b 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -27,6 +27,7 @@ import "tajo_protos.proto";
 import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
+import "DataTypes.proto";
 
 message CreateSessionRequest {
   required string username = 1;
@@ -114,9 +115,11 @@ message GetQueryStatusRequest {
 }
 
 message SerializedResultSet {
-  optional SchemaProto schema = 1;
-  optional int32 bytesNum = 2;
-  repeated bytes serializedTuples = 3;
+  required SchemaProto schema = 1;
+  required int32 rows = 2;
+  optional int32 decompressedLength = 3;
+  optional CodecType decompressCodec = 4;
+  optional bytes serializedTuples = 5;
 }
 
 message SubmitQueryResponse {
@@ -161,6 +164,7 @@ message GetQueryResultDataRequest {
   required SessionIdProto sessionId = 1;
   required QueryIdProto queryId = 2;
   required int32 fetchRowNum = 3;
+  optional CodecType compressCodec = 4;
 }
 
 message GetQueryResultDataResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index bb690dd..8b178a6 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -158,6 +158,9 @@ public class TajoTestingCluster {
     // Python function path
     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, 
getClass().getResource("/python").toString());
 
+    // Query output file
+    conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, 
BuiltinStorages.DRAW);
+
     /* Since Travis CI limits the size of standard output log up to 4MB */
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml
index 1e17198..91034f7 100644
--- a/tajo-common/pom.xml
+++ b/tajo-common/pom.xml
@@ -218,6 +218,10 @@
       <artifactId>netty-buffer</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java 
b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 36234e0..7e419f0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -149,6 +149,8 @@ public enum SessionVars implements ConfigKey {
       Integer.class, Validators.min("0")),
   BLOCK_ON_RESULT(ConfVars.$RESULT_SET_BLOCK_WAIT, "Whether to block result 
set on query execution", DEFAULT,
       Boolean.class, Validators.bool()),
+  COMPRESSED_RESULT_TRANSFER(ConfVars.$COMPRESSED_RESULT_TRANSFER, "Use 
compression to optimize result transmission.",
+      CLI_SIDE_VAR, Boolean.class, Validators.bool()),
 
   
//-------------------------------------------------------------------------------
   // Only for Unit Testing

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0f393f6..f9b9201 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.ConfigKey;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
@@ -204,7 +205,7 @@ public class TajoConf extends Configuration {
     // Shuffle Configuration --------------------------------------------------
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, 
Validators.bool()),
-    SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
+    SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, 
Validators.javaString()),
     
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
         2, Validators.min("1")),
     SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  
8192),
@@ -215,6 +216,9 @@ public class TajoConf extends Configuration {
     
SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 
30),
     HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10),
 
+    // Query output Configuration 
--------------------------------------------------
+    QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", 
BuiltinStorages.TEXT, Validators.javaString()),
+
     // Storage Configuration --------------------------------------------------
     ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100),
     MINIMUM_SPLIT_SIZE("tajo.min.split.size", (long) 1),
@@ -377,6 +381,7 @@ public class TajoConf extends Configuration {
       // ResultSet ---------------------------------------------------------
     $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
     $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),
+    $COMPRESSED_RESULT_TRANSFER("tajo.resultset.compression", false),
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
index 04a0267..cb417f3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -23,6 +23,7 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.tuple.memory.*;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.TUtil;
 
 public class BaseTupleBuilder extends OffHeapRowWriter implements 
TupleBuilder, Deallocatable {
 
@@ -64,6 +65,16 @@ public class BaseTupleBuilder extends OffHeapRowWriter 
implements TupleBuilder,
   }
 
   @Override
+  public void addTuple(Tuple tuple) {
+    if (tuple instanceof UnSafeTuple) {
+      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
+      addTuple(unSafeTuple);
+    } else {
+      OffHeapRowBlockUtils.convert(tuple, this);
+    }
+  }
+
+  @Override
   public Tuple build() {
     return buildToHeapTuple();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
index 6832730..9f3d8a2 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
@@ -20,6 +20,8 @@ package org.apache.tajo.tuple.memory;
 
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
 
 public class OffHeapRowBlockWriter extends OffHeapRowWriter {
   private RowBlock rowBlock;
@@ -60,4 +62,16 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter {
   public TajoDataTypes.DataType[] dataTypes() {
     return rowBlock.getDataTypes();
   }
+
+
+  @Override
+  public void addTuple(Tuple tuple) {
+    if (tuple instanceof UnSafeTuple) {
+      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
+      addTuple(unSafeTuple);
+      rowBlock.setRows(rowBlock.rows() + 1);
+    } else {
+      OffHeapRowBlockUtils.convert(tuple, this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
index c9b233f..f082762 100644
--- 
a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
+++ 
b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
@@ -23,9 +23,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.TUtil;
 
 /**
  *
@@ -290,16 +288,11 @@ public abstract class OffHeapRowWriter implements 
RowWriter {
     putBlob(val.asByteArray());
   }
 
-  @Override
-  public void addTuple(Tuple tuple) {
-    if (tuple instanceof UnSafeTuple) {
-      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, 
UnSafeTuple.class);
-      int length = unSafeTuple.getLength();
-      ensureSize(length);
-      PlatformDependent.copyMemory(unSafeTuple.address(), address() + 
position(), length);
-      forward(length);
-    } else {
-      OffHeapRowBlockUtils.convert(tuple, this);
-    }
+
+  protected void addTuple(UnSafeTuple tuple) {
+    int length = tuple.getLength();
+    ensureSize(length);
+    PlatformDependent.copyMemory(tuple.address(), address() + position(), 
length);
+    forward(length);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java
new file mode 100644
index 0000000..0026ea8
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.tajo.TajoProtos.CodecType;
+import org.apache.tajo.exception.UnsupportedException;
+import org.iq80.snappy.Snappy;
+
+import java.io.IOException;
+
+public class CompressionUtil {
+
+  public static byte[] compress(CodecType type, byte[] uncompressed) throws 
IOException {
+    switch (type) {
+    case SNAPPY:
+      return SnappyCodec.compress(uncompressed);
+    default:
+      throw new IOException(new UnsupportedException("Cannot support " + 
type));
+    }
+  }
+
+  public static byte[] decompress(CodecType type, byte[] compressed) throws 
IOException {
+    switch (type) {
+    case SNAPPY:
+      return SnappyCodec.uncompress(compressed);
+    default:
+      throw new IOException(new UnsupportedException("Cannot support " + 
type));
+    }
+  }
+
+  public static int maxCompressedLength(CodecType type, int byteSize) throws 
IOException {
+    switch (type) {
+    case SNAPPY:
+      return SnappyCodec.maxCompressedLength(byteSize);
+    default:
+      throw new IOException(new UnsupportedException("Cannot support " + 
type));
+    }
+  }
+
+  /**
+   * Java snappy codec
+   */
+  static class SnappyCodec {
+
+    static byte[] compress(byte[] uncompressed) throws IOException {
+      return Snappy.compress(uncompressed);
+    }
+
+    static byte[] uncompress(byte[] compressed) throws IOException {
+      return Snappy.uncompress(compressed, 0, compressed.length);
+    }
+
+    static int maxCompressedLength(int byteSize) throws IOException {
+      return Snappy.maxCompressedLength(byteSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/proto/DataTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/DataTypes.proto 
b/tajo-common/src/main/proto/DataTypes.proto
index 53c3d0f..1e10be2 100644
--- a/tajo-common/src/main/proto/DataTypes.proto
+++ b/tajo-common/src/main/proto/DataTypes.proto
@@ -114,3 +114,4 @@ message DataType {
    */
   optional int32 num_nested_fields = 4;
 }
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto 
b/tajo-common/src/main/proto/tajo_protos.proto
index 8474f54..2a836d3 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -68,4 +68,8 @@ message NodeResourceProto {
   optional int32 memory = 1;
   optional int32 virtual_cores = 2;
   optional int32 disks = 3;
+}
+
+enum CodecType {
+      SNAPPY = 0;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java
index 0457e23..fd25db9 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java
@@ -24,11 +24,15 @@ import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.annotation.NotThreadSafe;
 import org.apache.tajo.error.Errors;
 import org.apache.tajo.exception.*;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import static org.junit.Assert.assertEquals;
 
 @NotThreadSafe
@@ -100,8 +104,13 @@ public class TestQueryClientExceptions {
   }
 
   @Test(expected = QueryNotFoundException.class)
-  public void testFetchNextQueryResult() throws TajoException {
-    client.fetchNextQueryResult(LocalTajoTestingUtility.newQueryId(), 100);
+  public void testAscynFetchNextQueryResult() throws Throwable {
+    Future<TajoMemoryResultSet> future = 
client.fetchNextQueryResultAsync(LocalTajoTestingUtility.newQueryId(), 100);
+    try {
+      future.get();
+    } catch (ExecutionException e) {
+      throw e.getCause();
+    }
   }
 
   @Test(expected = QueryNotFoundException.class)

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
index 4654d38..f845bb3 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
@@ -59,7 +59,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
 
       // tpch/lineitem.tbl
       long[] expectedNumRows = new long[]{5, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{604, 18, 18, 8};
+      long[] expectedNumBytes = new long[]{604, 18, 18, 48};
       long[] expectedReadBytes = new long[]{604, 604, 18, 0};
       QueryId queryId = getQueryId(res);
 
@@ -78,7 +78,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
 
       // tpch/lineitem.tbl
       long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194};
+      long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 236};
       long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0};
 
       QueryId queryId = getQueryId(res);
@@ -107,7 +107,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase 
{
 
       // in/out * stage(4)
       long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18};
+      long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64};
       long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0};
 
       QueryId queryId = getQueryId(res);

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
 
b/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
index 2035d9f..ef41ed9 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
@@ -1,3 +1,3 @@
 ?sum,?sum_1
 -------------------------------
-3,414440.9
\ No newline at end of file
+3,414440.89999999997
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 51fb61f..2d87b56 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -42,4 +42,5 @@ Available Session Variables:
 \set ARITHABORT [true or false] - If true, a running query will be terminated 
when an overflow or divide-by-zero occurs.
 \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
 \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query 
execution
+\set COMPRESSED_RESULT_TRANSFER [true or false] - Use compression to optimize 
result transmission.
 \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result
index a5a6039..6169092 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result
@@ -5,4 +5,4 @@ c_custkey,  o_orderkey,  o_orderstatus
 3,  3,  F
 4,  ,  
 5,  ,  
-(5 rows, , 30 B selected)
+(5 rows, , 127 B selected)

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result
index 566262e..dfc920b 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result
@@ -5,4 +5,4 @@ c_custkey,  o_orderkey,  o_orderstatus
 3,  3,  F
 4,  testnull,  testnull
 5,  testnull,  testnull
-(5 rows, , 30 B selected)
+(5 rows, , 127 B selected)

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result
index 36ea548..1212ade 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result
@@ -6,4 +6,4 @@ c_custkey,  o_orderkey,  o_orderstatus
 3,  3,  F
 4,  testnull,  testnull
 5,  testnull,  testnull
-(5 rows, , 30 B selected)
\ No newline at end of file
+(5 rows, , 127 B selected)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result
index e2e6b34..a0044c2 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result
@@ -1,5 +1,5 @@
 ?count
 -------------------------------
 5
-(1 rows, , 2 B selected)
+(1 rows, , 16 B selected)
 ERROR: relation 'default.lineitem2' does not exist
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result
 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result
index 17b4d56..e3f1e6b 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result
@@ -2,5 +2,5 @@ Warning: deprecated to directly use config key in 
TajoConf.ConfVars. Please exec
 ?count
 -------------------------------
 5
-(1 rows, , 2 B selected)
+(1 rows, , 16 B selected)
 ERROR: relation 'default.lineitem2' does not exist
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result
 
b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result
index ef2e5d3..a2faac0 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result
@@ -3,5 +3,5 @@ 
linenumber_stddev_pop,suppkey_stddev_pop,extendedprice_stddev_pop,discount_stdde
 0.5,197.5,12407.465,0.02500000223517418
 0.5,197.5,12407.465,0.02500000223517418
 0.0,0.0,0.0,0.0
-0.5,2371.0,3630.790000000001,0.02000000141561031
-0.5,2371.0,3630.790000000001,0.02000000141561031
\ No newline at end of file
+0.5,2371.0,3630.790000000001,0.020000001415610313
+0.5,2371.0,3630.790000000001,0.020000001415610313
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result
 
b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result
index 5db6227..625f91b 100644
--- 
a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result
+++ 
b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result
@@ -1,7 +1,7 @@
 
linenumber_stddev_samp,suppkey_stddev_samp,extendedprice_stddev_samp,discount_stddev_samp
 -------------------------------
-0.7071067811865476,279.30717856868625,17546.805277669493,0.03535534222034101
-0.7071067811865476,279.30717856868625,17546.805277669493,0.03535534222034101
+0.7071067811865476,279.30717856868625,17546.805277669493,0.035355342220341014
+0.7071067811865476,279.30717856868625,17546.805277669493,0.035355342220341014
 null,null,null,null
-0.7071067811865476,3353.1003563866084,5134.7124601286105,0.0282842732494372
-0.7071067811865476,3353.1003563866084,5134.7124601286105,0.0282842732494372
\ No newline at end of file
+0.7071067811865476,3353.1003563866084,5134.7124601286105,0.028284273249437206
+0.7071067811865476,3353.1003563866084,5134.7124601286105,0.028284273249437206
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index e09684a..2d32785 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.global;
 
 import com.google.common.base.Preconditions;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -39,7 +40,7 @@ public class DataChannel {
 
   private Schema schema;
 
-  private String storeType = "RAW";
+  private String storeType = BuiltinStorages.RAW;
 
   public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
     this.srcId = srcId;

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 8c6aa8c..5049995 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -37,9 +37,9 @@ import 
org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine;
 import 
org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.NotImplementedException;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.exception.NotImplementedException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.Target;
@@ -69,6 +69,7 @@ public class GlobalPlanner {
 
   private final TajoConf conf;
   private final String storeType;
+  private final String finalOutputStoreType;
   private final CatalogService catalog;
   private final GlobalPlanRewriteEngine rewriteEngine;
 
@@ -77,7 +78,7 @@ public class GlobalPlanner {
     this.conf = conf;
     this.catalog = catalog;
     this.storeType = conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase();
-    Preconditions.checkArgument(storeType != null);
+    this.finalOutputStoreType = 
conf.getVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT).toUpperCase();
 
     Class<? extends GlobalPlanRewriteRuleProvider> clazz =
         (Class<? extends GlobalPlanRewriteRuleProvider>) 
conf.getClassVar(GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS);
@@ -159,10 +160,10 @@ public class GlobalPlanner {
     LOG.info("\n\nOptimized master plan\n" + masterPlan.toString());
   }
 
-  private static void setFinalOutputChannel(DataChannel outputChannel, Schema 
outputSchema) {
+  private void setFinalOutputChannel(DataChannel outputChannel, Schema 
outputSchema) {
     outputChannel.setShuffleType(NONE_SHUFFLE);
     outputChannel.setShuffleOutputNum(1);
-    outputChannel.setStoreType("TEXT");
+    outputChannel.setStoreType(finalOutputStoreType);
     outputChannel.setSchema(outputSchema);
   }
 
@@ -988,9 +989,8 @@ public class GlobalPlanner {
           ((TableSubQueryNode)child).getSubQuery().getType() == 
NodeType.UNION) {
         MasterPlan masterPlan = context.plan;
         for (DataChannel dataChannel : 
masterPlan.getIncomingChannels(execBlock.getId())) {
-          // This data channel will be stored in staging directory, but 
RawFile, default file type, does not support
-          // distributed file system. It needs to change the file format for 
distributed file system.
-          dataChannel.setStoreType("TEXT");
+
+          dataChannel.setStoreType(finalOutputStoreType);
           ExecutionBlock subBlock = 
masterPlan.getExecBlock(dataChannel.getSrcId());
 
           ProjectionNode copy = PlannerUtil.clone(plan, node);

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index 1e90d04..e9f4fb9 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -209,13 +210,11 @@ public class PhysicalPlanUtil {
    */
   private static void setNullCharForTextSerializer(TableMeta meta, String 
nullChar) {
     String storeType = meta.getStoreType();
-    if (storeType.equalsIgnoreCase("TEXT")) {
+    if (storeType.equalsIgnoreCase(BuiltinStorages.TEXT)) {
       meta.putOption(StorageConstants.TEXT_NULL, nullChar);
-    } else if (storeType.equalsIgnoreCase("TEXT")) {
-      meta.putOption(StorageConstants.TEXT_NULL, nullChar);
-    } else if (storeType.equalsIgnoreCase("RCFILE")) {
+    }  else if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
       meta.putOption(StorageConstants.RCFILE_NULL, nullChar);
-    } else if (storeType.equalsIgnoreCase("SEQUENCEFILE")) {
+    } else if (storeType.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
       meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar);
     }
   }
@@ -228,13 +227,11 @@ public class PhysicalPlanUtil {
    */
   public static boolean containsNullChar(TableMeta meta) {
     String storeType = meta.getStoreType();
-    if (storeType.equalsIgnoreCase("TEXT")) {
-      return meta.containsOption(StorageConstants.TEXT_NULL);
-    } else if (storeType.equalsIgnoreCase("TEXT")) {
+    if (storeType.equalsIgnoreCase(BuiltinStorages.TEXT)) {
       return meta.containsOption(StorageConstants.TEXT_NULL);
-    } else if (storeType.equalsIgnoreCase("RCFILE")) {
+    } else if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
       return meta.containsOption(StorageConstants.RCFILE_NULL);
-    } else if (storeType.equalsIgnoreCase("SEQUENCEFILE")) {
+    } else if (storeType.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
       return meta.containsOption(StorageConstants.SEQUENCEFILE_NULL);
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 1ecabf1..63b5445 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -34,6 +34,7 @@ import org.apache.tajo.plan.expr.EvalTreeUtil;
 import org.apache.tajo.plan.expr.FieldEval;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
@@ -61,6 +62,8 @@ public class SeqScanExec extends ScanExec {
   // scanner iterator with filter or without filter
   private ScanIterator scanIt;
 
+  private boolean needProjection;
+
   public SeqScanExec(TaskAttemptContext context, ScanNode plan,
                      CatalogProtos.FragmentProto [] fragments) throws 
IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -198,7 +201,7 @@ public class SeqScanExec extends ScanExec {
       // for non-projected fields.
       Schema actualInSchema = scanner.isProjectable() ? projectedFields : 
inSchema;
 
-      this.projector = new Projector(context, actualInSchema, outSchema, 
plan.getTargets());
+      initializeProjector(actualInSchema);
 
       if (plan.hasQual()) {
         qual.bind(context.getEvalContext(), actualInSchema);
@@ -210,6 +213,37 @@ public class SeqScanExec extends ScanExec {
     super.init();
   }
 
+  protected void initializeProjector(Schema actualInSchema){
+    Target[] realTargets;
+    if (plan.getTargets() == null) {
+      realTargets = PlannerUtil.schemaToTargets(outSchema);
+    } else {
+      realTargets = plan.getTargets();
+    }
+
+    //if all column is selected and there is no have expression, projection 
can be skipped
+    if (realTargets.length == inSchema.size()) {
+      for (int i = 0; i < inSchema.size(); i++) {
+        if (realTargets[i].getEvalTree() instanceof FieldEval) {
+          FieldEval f = realTargets[i].getEvalTree();
+          if(!f.getColumnRef().equals(inSchema.getColumn(i))) {
+            needProjection = true;
+            break;
+          }
+        } else {
+          needProjection = true;
+          break;
+        }
+      }
+    } else {
+      needProjection = true;
+    }
+
+    if(needProjection) {
+      projector = new Projector(context, actualInSchema, outSchema, 
plan.getTargets());
+    }
+  }
+
   @Override
   public ScanNode getScanNode() {
     return plan;
@@ -253,6 +287,8 @@ public class SeqScanExec extends ScanExec {
 
     while(scanIt.hasNext()) {
       Tuple t = scanIt.next();
+      if(!needProjection) return t;
+
       Tuple outTuple = projector.eval(t);
       outTuple.setOffset(t.getOffset());
       return outTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 40bca04..cd43add 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.master;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
@@ -62,7 +61,6 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -533,7 +531,6 @@ public class TajoMasterClientService extends 
AbstractService {
     public GetQueryResultDataResponse getQueryResultData(RpcController 
controller, GetQueryResultDataRequest request)
         throws ServiceException {
       GetQueryResultDataResponse.Builder builder = 
GetQueryResultDataResponse.newBuilder();
-      SerializedResultSet.Builder resultSetBuilder = 
SerializedResultSet.newBuilder();
 
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
@@ -561,29 +558,25 @@ public class TajoMasterClientService extends 
AbstractService {
             scanNode.init(resultTableDesc);
           }
 
-          queryResultScanner =
-              new NonForwardQueryResultFileScanner(context.getConf(), 
session.getSessionId(), queryId, scanNode,
-                  resultTableDesc, Integer.MAX_VALUE);
+          if(request.hasCompressCodec()) {
+            queryResultScanner = new 
NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(),
+                queryId, scanNode, resultTableDesc, Integer.MAX_VALUE, 
request.getCompressCodec());
+          } else {
+            queryResultScanner = new 
NonForwardQueryResultFileScanner(context.getConf(),
+                session.getSessionId(), queryId, scanNode, resultTableDesc, 
Integer.MAX_VALUE);
+          }
+
           queryResultScanner.init();
           session.addNonForwardQueryResultScanner(queryResultScanner);
         }
 
-        List<ByteString> rows = 
queryResultScanner.getNextRows(request.getFetchRowNum());
-
-        
resultSetBuilder.setSchema(queryResultScanner.getLogicalSchema().getProto());
-        resultSetBuilder.addAllSerializedTuples(rows);
+        SerializedResultSet resultSet = 
queryResultScanner.nextRowBlock(request.getFetchRowNum());
 
-        builder.setResultSet(resultSetBuilder.build());
+        builder.setResultSet(resultSet);
         builder.setState(OK);
-
-        LOG.info("Send result to client for " +
-            request.getSessionId().getId() + "," + queryId + ", " + 
rows.size() + " rows");
-
       } catch (Throwable t) {
         printStackTraceIfError(LOG, t);
-
         builder.setState(returnError(t));
-        builder.setResultSet(resultSetBuilder.build()); // required field
       }
 
       return builder.build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 9e132b0..809b81f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -19,34 +19,48 @@
 package org.apache.tajo.master.exec;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec;
 import org.apache.tajo.engine.planner.physical.ScanExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.tuple.memory.MemoryBlock;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.util.CompressionUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultScanner {
+  private final static Log LOG = 
LogFactory.getLog(NonForwardQueryResultFileScanner.class);
 
   private QueryId queryId;
   private String sessionId;
@@ -54,13 +68,25 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
   private TableDesc tableDesc;
   private RowStoreEncoder rowEncoder;
   private int maxRow;
-  private int currentNumRows;
+  private boolean eof;
+  private volatile long totalRows;
+  private volatile int currentNumRows;
+  private volatile boolean isStopped;
   private TaskAttemptContext taskContext;
   private TajoConf tajoConf;
   private ScanNode scanNode;
+  private CodecType codecType;
+  private ExecutorService executor;
+  private MemoryRowBlock rowBlock;
+  private Future<MemoryRowBlock> nextFetch;
+
+  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, 
QueryId queryId, ScanNode scanNode,
+                                          TableDesc tableDesc, int maxRow) 
throws IOException {
+    this(tajoConf, sessionId, queryId, scanNode, tableDesc, maxRow, null);
+  }
 
   public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, 
QueryId queryId, ScanNode scanNode,
-      TableDesc tableDesc, int maxRow) throws IOException {
+      TableDesc tableDesc, int maxRow, CodecType codecType) throws IOException 
{
     this.tajoConf = tajoConf;
     this.sessionId = sessionId;
     this.queryId = queryId;
@@ -68,6 +94,7 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
     this.tableDesc = tableDesc;
     this.maxRow = maxRow;
     this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+    this.codecType = codecType;
   }
 
   public void init() throws IOException, TajoException {
@@ -91,8 +118,10 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
           new QueryContext(tajoConf), null,
           new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 
0),
           fragmentProtos, null);
-      scanExec = new PartitionMergeScanExec(taskContext, scanNode, 
fragmentProtos);
-      scanExec.init();
+      this.scanExec = new PartitionMergeScanExec(taskContext, scanNode, 
fragmentProtos);
+      this.scanExec.init();
+    } else {
+      close();
     }
   }
 
@@ -113,32 +142,52 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
     return currentNumRows;
   }
 
-  public void close() throws Exception {
+  public void close() throws IOException {
+    if(isStopped) {
+      return;
+    }
+
+    isStopped = true;
     if (scanExec != null) {
       scanExec.close();
       scanExec = null;
     }
+    
+    if(rowBlock != null) {
+      rowBlock.release();
+      rowBlock = null;
+    }
+
+    if(executor != null) {
+      executor.shutdown();
+    }
 
     //remove temporal final output
     if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
       Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, 
queryId);
       if (tableDesc.getUri().equals(temporalResultDir.toUri())) {
-        
temporalResultDir.getParent().getFileSystem(tajoConf).delete(temporalResultDir, 
true);
+        
temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), 
true);
       }
     }
+
+
+    LOG.info(String.format("\"Sent result to client for %s, queryId: %s %s 
rows: %d",
+        sessionId, queryId,
+        codecType != null ? ", compression: " + codecType : "",
+        totalRows
+    ));
   }
 
   public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
-    List<ByteString> rows = new ArrayList<ByteString>();
+    List<ByteString> rows = new ArrayList<>();
     if (scanExec == null) {
       return rows;
     }
     int rowCount = 0;
-    while (true) {
+    while (!eof) {
       Tuple tuple = scanExec.next();
       if (tuple == null) {
-        scanExec.close();
-        scanExec = null;
+        eof = true;
         break;
       }
 
@@ -149,15 +198,117 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
         break;
       }
       if (currentNumRows >= maxRow) {
-        scanExec.close();
-        scanExec = null;
+        eof = true;
         break;
       }
     }
+
+    if(eof) {
+      close();
+    }
     return rows;
   }
 
   @Override
+  public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException {
+    try {
+      final SerializedResultSet.Builder resultSetBuilder = 
SerializedResultSet.newBuilder();
+      resultSetBuilder.setSchema(getLogicalSchema().getProto());
+      resultSetBuilder.setRows(0);
+
+      if (isStopped) return resultSetBuilder.build();
+
+      if (nextFetch == null) {
+        nextFetch = fetchNextRowBlock(fetchRowNum);
+      }
+
+      MemoryRowBlock rowBlock = nextFetch.get();
+
+      if (rowBlock.rows() > 0) {
+        resultSetBuilder.setRows(rowBlock.rows());
+        MemoryBlock memoryBlock = rowBlock.getMemory();
+
+        if (codecType != null) {
+          byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()];
+          memoryBlock.getBuffer().getBytes(0, uncompressedBytes);
+
+          byte[] compressedBytes = CompressionUtil.compress(codecType, 
uncompressedBytes);
+          resultSetBuilder.setDecompressedLength(uncompressedBytes.length);
+          resultSetBuilder.setDecompressCodec(codecType);
+          
resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes));
+        } else {
+          ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, 
memoryBlock.readableBytes());
+          resultSetBuilder.setDecompressedLength(uncompressed.remaining());
+          
resultSetBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
+        }
+      }
+
+      // pre-fetch
+      if (!eof) {
+        nextFetch = fetchNextRowBlock(fetchRowNum);
+      } else {
+        close();
+      }
+      return resultSetBuilder.build();
+    } catch (Throwable t) {
+      throw new IOException(t.getMessage(), t);
+    }
+  }
+
+  /**
+   * Asynchronously fetch final output data
+   */
+  private Future<MemoryRowBlock> fetchNextRowBlock(final int fetchRowNum) 
throws IOException {
+    final SettableFuture<MemoryRowBlock> future = SettableFuture.create();
+    if (rowBlock == null) {
+      rowBlock = new 
MemoryRowBlock(SchemaUtil.toDataTypes(tableDesc.getLogicalSchema()));
+    }
+
+    if (scanExec == null) {
+      rowBlock.clear();
+      future.set(rowBlock);
+      return future;
+    }
+
+    if (executor == null) {
+      executor = Executors.newSingleThreadExecutor();
+    }
+
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          rowBlock.clear();
+          int endRow = currentNumRows + fetchRowNum;
+          while (currentNumRows < endRow) {
+            Tuple tuple = scanExec.next();
+            if (tuple == null) {
+              eof = true;
+              break;
+            } else {
+              rowBlock.getWriter().addTuple(tuple);
+              currentNumRows++;
+              if (currentNumRows >= maxRow) {
+                eof = true;
+                break;
+              }
+            }
+          }
+
+          if (rowBlock.rows() > 0) {
+            totalRows += rowBlock.rows();
+          }
+
+          future.set(rowBlock);
+        } catch (IOException e) {
+          future.setException(e);
+        }
+      }
+    });
+    return future;
+  }
+
+  @Override
   public Schema getLogicalSchema() {
     return tableDesc.getLogicalSchema();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
index a104c99..fab3a1f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
@@ -23,26 +23,30 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 
 import java.io.IOException;
 import java.util.List;
 
 public interface NonForwardQueryResultScanner {
 
-  public void close() throws Exception;
+  void close() throws Exception;
 
-  public Schema getLogicalSchema();
+  Schema getLogicalSchema();
 
-  public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
+  @Deprecated
+  List<ByteString> getNextRows(int fetchRowNum) throws IOException;
 
-  public QueryId getQueryId();
+  SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException;
+
+  QueryId getQueryId();
   
-  public String getSessionId();
+  String getSessionId();
   
-  public TableDesc getTableDesc();
+  TableDesc getTableDesc();
 
-  public void init() throws IOException, TajoException;
+  void init() throws IOException, TajoException;
 
-  public int getCurrentRowNumber();
+  int getCurrentRowNumber();
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index e176818..14077b1 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -25,11 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -46,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.rm.NodeStatus;
 import org.apache.tajo.plan.LogicalPlan;
@@ -60,11 +57,14 @@ import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.memory.MemoryBlock;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +85,8 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
   private Schema outSchema;
   private RowStoreEncoder encoder;
   private PhysicalExec physicalExec;
+  private MemoryRowBlock rowBlock;
+  private boolean eof;
   
   public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan 
plan, QueryId queryId, 
       String sessionId, int maxRow) {
@@ -134,13 +136,16 @@ public class NonForwardQueryResultSystemScanner 
implements NonForwardQueryResult
     encoder = RowStoreUtil.createEncoder(getLogicalSchema());
     
     physicalExec.init();
+    eof = false;
   }
 
   @Override
-  public void close() throws Exception {
-    tableDesc = null;
-    outSchema = null;
-    encoder = null;
+  public void close() {
+    if(rowBlock != null) {
+      rowBlock.release();
+      rowBlock = null;
+    }
+
     if (physicalExec != null) {
       try {
         physicalExec.close();
@@ -617,6 +622,58 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
   }
 
   @Override
+  public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException {
+    int rowCount = 0;
+
+    SerializedResultSet.Builder resultSetBuilder = 
SerializedResultSet.newBuilder();
+    resultSetBuilder.setSchema(getLogicalSchema().getProto());
+    resultSetBuilder.setRows(rowCount);
+    int startRow = currentRow;
+    int endRow = startRow + fetchRowNum;
+
+    if (physicalExec == null) {
+      return resultSetBuilder.build();
+    }
+
+    while (currentRow < endRow) {
+      Tuple currentTuple = physicalExec.next();
+
+      if (currentTuple == null) {
+        eof = true;
+        break;
+      } else {
+        if (rowBlock == null) {
+          rowBlock = new 
MemoryRowBlock(SchemaUtil.toDataTypes(tableDesc.getLogicalSchema()));
+        }
+
+        rowBlock.getWriter().addTuple(currentTuple);
+        currentRow++;
+        rowCount++;
+
+        if(currentRow >= maxRow) {
+          eof = true;
+          break;
+        }
+      }
+    }
+
+    if (rowCount > 0) {
+      resultSetBuilder.setRows(rowCount);
+      MemoryBlock memoryBlock = rowBlock.getMemory();
+      ByteBuffer rows = memoryBlock.getBuffer().nioBuffer(0, 
memoryBlock.readableBytes());
+
+      resultSetBuilder.setDecompressedLength(rows.remaining());
+      resultSetBuilder.setSerializedTuples(ByteString.copyFrom(rows));
+      rowBlock.clear();
+    }
+
+    if (eof) {
+      close();
+    }
+    return resultSetBuilder.build();
+  }
+
+  @Override
   public QueryId getQueryId() {
     return queryId;
   }

Reply via email to