Repository: tajo
Updated Branches:
  refs/heads/master a1d5aeee7 -> c294d88e1


TAJO-1701: Remove forward or non-forward query concept in TajoClient.

Closes #645


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c294d88e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c294d88e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c294d88e

Branch: refs/heads/master
Commit: c294d88e12216a0ccee695957d01b51d57edeb87
Parents: a1d5aee
Author: Hyunsik Choi <[email protected]>
Authored: Fri Jul 24 15:45:05 2015 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Fri Jul 24 15:45:05 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  | 45 +++++++++-------
 .../org/apache/tajo/client/QueryClientImpl.java | 43 +++++----------
 .../org/apache/tajo/client/TajoClientUtil.java  |  5 +-
 tajo-client/src/main/proto/ClientProtos.proto   | 26 +++++----
 .../org/apache/tajo/master/GlobalEngine.java    |  1 -
 .../org/apache/tajo/master/QueryManager.java    |  1 +
 .../tajo/master/TajoMasterClientService.java    |  1 -
 .../apache/tajo/master/exec/QueryExecutor.java  | 55 +++++++++++++-------
 .../tajo/webapp/QueryExecutorServlet.java       | 39 +++++++-------
 .../org/apache/tajo/jdbc/TajoStatement.java     | 21 +++++---
 11 files changed, 133 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4403673..287c371 100644
--- a/CHANGES
+++ b/CHANGES
@@ -32,6 +32,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1701: Remove forward or non-forward query concept in TajoClient. 
+    (hyunsik)
+
     TAJO-1651: Too long fetcher default retries. (jinho)
 
     TAJO-1700: Add better exception handling in TajoMasterClientService. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java 
b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index bc5fa7a..c6409f1 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -31,9 +31,9 @@ import org.apache.tajo.cli.tsql.ParsedResult.StatementType;
 import org.apache.tajo.cli.tsql.SimpleParser.ParsingState;
 import org.apache.tajo.cli.tsql.commands.*;
 import org.apache.tajo.client.*;
-import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.FileUtil;
@@ -42,7 +42,10 @@ import java.io.*;
 import java.lang.reflect.Constructor;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 public class TajoCli {
   public static final String ERROR_PREFIX = "ERROR: ";
@@ -491,18 +494,21 @@ public class TajoCli {
     ClientProtos.SubmitQueryResponse response = 
client.executeQueryWithJson(json);
     if (response == null) {
       onError("response is null", null);
+
     } else if (ReturnStateUtil.isSuccess(response.getState())) {
-      if (response.getIsForwarded()) {
+
+      switch (response.getResultType()) {
+      case FETCH:
         QueryId queryId = new QueryId(response.getQueryId());
         waitForQueryCompleted(queryId);
-      } else {
-        if (!response.hasTableDesc() && !response.hasResultSet()) {
-          displayFormatter.printMessage(sout, "OK");
-          wasError = true;
-        } else {
-          localQueryCompleted(response, startTime);
-        }
+        break;
+      case ENCLOSED:
+        localQueryCompleted(response, startTime);
+        break;
+      default:
+        displayFormatter.printMessage(sout, "OK");
       }
+
     } else {
       if (ReturnStateUtil.isError(response.getState())) {
         onError(response.getState().getMessage(), null);
@@ -521,17 +527,21 @@ public class TajoCli {
     }
 
     if (response != null) {
+
       if (ReturnStateUtil.isSuccess(response.getState())) {
-        if (response.getIsForwarded()) {
+
+        switch (response.getResultType()) {
+        case FETCH:
           QueryId queryId = new QueryId(response.getQueryId());
           waitForQueryCompleted(queryId);
-        } else {
-          if (!response.hasTableDesc() && !response.hasResultSet()) {
-            displayFormatter.printMessage(sout, "OK");
-          } else {
-            localQueryCompleted(response, startTime);
-          }
+          break;
+        case ENCLOSED:
+          localQueryCompleted(response, startTime);
+          break;
+        default:
+          displayFormatter.printMessage(sout, "OK");
         }
+
       } else {
         if (ReturnStateUtil.isError(response.getState())) {
           onError(response.getState().getMessage(), null);
@@ -585,7 +595,6 @@ public class TajoCli {
       int initRetries = 0;
       int progressRetries = 0;
       while (true) {
-        // TODO - configurable
         status = client.getQueryStatus(queryId);
         if(TajoClientUtil.isQueryWaitingForSchedule(status.getState())) {
           Thread.sleep(Math.min(20 * initRetries, 1000));

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 80a49c2..aeca72b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -183,24 +183,13 @@ public class QueryClientImpl implements QueryClient {
 
     QueryId queryId = new QueryId(response.getQueryId());
 
-    if (response.getIsForwarded()) {
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        return this.createNullResultSet(queryId);
-      } else {
+    switch (response.getResultType()) {
+      case ENCLOSED:
+        return TajoClientUtil.createResultSet(this, response, 
defaultFetchRows);
+      case FETCH:
         return this.getQueryResultAndWait(queryId);
-      }
-
-    } else {
-      // If a non-forwarded insert into query
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && 
response.getMaxRowNum() == 0) {
+      default:
         return this.createNullResultSet(queryId);
-      } else {
-        if (response.hasResultSet() || response.hasTableDesc()) {
-          return TajoClientUtil.createResultSet(this, response, 
defaultFetchRows);
-        } else {
-          return this.createNullResultSet(queryId);
-        }
-      }
     }
   }
 
@@ -211,22 +200,14 @@ public class QueryClientImpl implements QueryClient {
     throwIfError(response.getState());
 
     QueryId queryId = new QueryId(response.getQueryId());
-    if (response.getIsForwarded()) {
-
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        return this.createNullResultSet(queryId);
-      } else {
-        return this.getQueryResultAndWait(queryId);
-      }
-
-    } else {
-
-      if (response.hasResultSet() || response.hasTableDesc()) {
-        return TajoClientUtil.createResultSet(this, response, 
defaultFetchRows);
-      } else {
-        return this.createNullResultSet(queryId);
-      }
 
+    switch (response.getResultType()) {
+    case ENCLOSED:
+      return TajoClientUtil.createResultSet(this, response, defaultFetchRows);
+    case FETCH:
+      return this.getQueryResultAndWait(queryId);
+    default:
+      return this.createNullResultSet(queryId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index 90894fe..358f1a0 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -106,9 +106,8 @@ public class TajoClientUtil {
     }
   }
 
-  public static ResultSet createNullResultSet() {
-    return new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), 
null, 0, null);
-  }
+  public static final ResultSet NULL_RESULT_SET =
+      new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), 
null, 0, null);
 
   public static ResultSet createNullResultSet(QueryId queryId) {
     return new TajoMemoryResultSet(queryId, new Schema(), null, 0, null);

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto 
b/tajo-client/src/main/proto/ClientProtos.proto
index 021cfe7..ecfbbd9 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -121,18 +121,26 @@ message SerializedResultSet {
 
 message SubmitQueryResponse {
   required ReturnState state = 1;
-  optional QueryIdProto queryId = 2;
-  optional string userName = 3;
-  optional bool isForwarded = 4 [default = false];
 
-  optional string queryMasterHost = 5;
-  optional int32 queryMasterPort = 6;
+  enum ResultType {
+    NO_RESULT = 0; // this query does not have any result.
+    ENCLOSED  = 1; // the response encloses the query result.
+    FETCH     = 2; // the query result should be fetched
+  }
+
+  optional ResultType result_type = 2;
+
+  optional QueryIdProto queryId = 3;
+  optional string userName = 4;
+
+  optional string queryMasterHost = 6;
+  optional int32 queryMasterPort = 7;
 
-  optional SerializedResultSet resultSet = 7;
-  optional TableDescProto tableDesc = 8;
-  optional int32 maxRowNum = 9;
+  optional SerializedResultSet resultSet = 8;
+  optional TableDescProto tableDesc = 9;
+  optional int32 maxRowNum = 10;
 
-  optional KeyValueSetProto sessionVars = 12;
+  optional KeyValueSetProto sessionVars = 11;
 }
 
 message GetQueryStatusResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java 
b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8bfe65a..9cd20b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -203,7 +203,6 @@ public class GlobalEngine extends AbstractService {
       SubmitQueryResponse.Builder responseBuilder = 
SubmitQueryResponse.newBuilder();
       responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setIsForwarded(true);
       responseBuilder.setState(ReturnStateUtil.returnError(t));
       return responseBuilder.build();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 9a9ec50..2578c9f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -239,6 +239,7 @@ public class QueryManager extends CompositeService {
     if (queryInProgress == null) {
       queryInProgress = runningQueries.get(queryId);
     }
+
     return queryInProgress;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index baf1320..78fc0f5 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -309,7 +309,6 @@ public class TajoMasterClientService extends 
AbstractService {
         return ClientProtos.SubmitQueryResponse.newBuilder()
             .setState(returnError(t))
             .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto())
-            .setIsForwarded(true)
             .setUserName(context.getConf().getVar(ConfVars.USERNAME))
             .build();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index ceb3c4a..42e5f61 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -40,7 +40,9 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec;
 import org.apache.tajo.engine.planner.physical.InsertRowsExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.ResultType;
 import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.master.QueryManager;
 import org.apache.tajo.master.TajoMaster;
@@ -94,7 +96,6 @@ public class QueryExecutor {
                       LogicalPlan plan) throws Exception {
 
     SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder();
-    response.setIsForwarded(false);
     response.setUserName(queryContext.get(SessionVars.USERNAME));
 
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
@@ -105,12 +106,12 @@ public class QueryExecutor {
 
     } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
       ddlExecutor.execute(queryContext, plan);
-      response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      response.setState(OK);
 
+      response.setState(OK);
+      response.setResultType(ResultType.NO_RESULT);
 
     } else if (plan.isExplain()) { // explain query
-      execExplain(plan, queryContext, plan.isExplainGlobal(), response);
+      execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), 
response);
 
     } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
       execQueryOnVirtualTable(queryContext, session, sql, plan, response);
@@ -121,7 +122,7 @@ public class QueryExecutor {
 
       // NonFromQuery indicates a form of 'select a, x+y;'
     } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
-      execNonFromQuery(queryContext, plan, response);
+      execNonFromQuery(queryContext, session, sql, plan, response);
 
     } else { // it requires distributed execution. So, the query is forwarded 
to a query master.
       executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, 
response);
@@ -162,9 +163,9 @@ public class QueryExecutor {
     response.setState(OK);
   }
 
-  public void execExplain(LogicalPlan plan, QueryContext queryContext, boolean 
isGlobal,
-                          SubmitQueryResponse.Builder response)
-      throws Exception {
+  public void execExplain(Session session, String query, LogicalPlan plan, 
QueryContext queryContext, boolean isGlobal,
+                          SubmitQueryResponse.Builder response) throws 
Exception {
+
     String explainStr;
     boolean isTest = 
queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED);
     if (isTest) {
@@ -188,7 +189,7 @@ public class QueryExecutor {
     schema.addColumn("explain", TajoDataTypes.Type.TEXT);
     RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
 
-    ClientProtos.SerializedResultSet.Builder serializedResBuilder = 
ClientProtos.SerializedResultSet.newBuilder();
+    SerializedResultSet.Builder serializedResBuilder = 
SerializedResultSet.newBuilder();
 
     VTuple tuple = new VTuple(1);
     String[] lines = explainStr.split("\n");
@@ -202,10 +203,14 @@ public class QueryExecutor {
     serializedResBuilder.setSchema(schema.getProto());
     serializedResBuilder.setBytesNum(bytesNum);
 
+    QueryInfo queryInfo = 
context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
+        (LogicalRootNode) plan.getRootBlock().getRoot());
+
     response.setState(OK);
+    response.setQueryId(queryInfo.getQueryId().getProto());
+    response.setResultType(ResultType.ENCLOSED);
     response.setResultSet(serializedResBuilder.build());
     response.setMaxRowNum(lines.length);
-    response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
   }
 
   public void execQueryOnVirtualTable(QueryContext queryContext, Session 
session, String query, LogicalPlan plan,
@@ -215,16 +220,22 @@ public class QueryExecutor {
       LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
       maxRow = (int) limitNode.getFetchFirstNum();
     }
-    QueryId queryId = 
QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+    QueryInfo queryInfo = 
context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
+        (LogicalRootNode) plan.getRootBlock().getRoot());
 
-    NonForwardQueryResultScanner queryResultScanner =
-            new NonForwardQueryResultSystemScanner(context, plan, queryId, 
session.getSessionId(), maxRow);
+    NonForwardQueryResultScanner queryResultScanner = new 
NonForwardQueryResultSystemScanner(
+        context,
+        plan,
+        queryInfo.getQueryId(),
+        session.getSessionId(),
+        maxRow);
 
     queryResultScanner.init();
     session.addNonForwardQueryResultScanner(queryResultScanner);
 
     response.setState(OK);
-    response.setQueryId(queryId.getProto());
+    response.setQueryId(queryInfo.getQueryId().getProto());
+    response.setResultType(ResultType.ENCLOSED);
     response.setMaxRowNum(maxRow);
     response.setTableDesc(queryResultScanner.getTableDesc().getProto());
   }
@@ -261,11 +272,12 @@ public class QueryExecutor {
 
     response.setState(OK);
     response.setQueryId(queryInfo.getQueryId().getProto());
+    response.setResultType(ResultType.ENCLOSED);
     response.setMaxRowNum(maxRow);
     response.setTableDesc(desc.getProto());
   }
 
-  public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, 
SubmitQueryResponse.Builder responseBuilder)
+  public void execNonFromQuery(QueryContext queryContext, Session session, 
String query, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder)
       throws Exception {
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
@@ -291,15 +303,19 @@ public class QueryExecutor {
         Schema schema = PlannerUtil.targetToSchema(targets);
         RowStoreUtil.RowStoreEncoder encoder = 
RowStoreUtil.createEncoder(schema);
         byte[] serializedBytes = encoder.toBytes(outTuple);
-        ClientProtos.SerializedResultSet.Builder serializedResBuilder = 
ClientProtos.SerializedResultSet.newBuilder();
+        SerializedResultSet.Builder serializedResBuilder = 
SerializedResultSet.newBuilder();
         
serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
         serializedResBuilder.setSchema(schema.getProto());
         serializedResBuilder.setBytesNum(serializedBytes.length);
 
+        QueryInfo queryInfo = 
context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
+            (LogicalRootNode) plan.getRootBlock().getRoot());
+
         responseBuilder.setState(OK);
+        responseBuilder.setResultType(ResultType.ENCLOSED);
+        responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
         responseBuilder.setResultSet(serializedResBuilder);
         responseBuilder.setMaxRowNum(1);
-        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
       }
     } finally {
       // stop script executor
@@ -461,6 +477,7 @@ public class QueryExecutor {
       // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only 
number of inserted rows.
       responseBuilder.setMaxRowNum(-1);
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultType(ResultType.NO_RESULT);
       responseBuilder.setState(OK);
     } catch (Throwable t) {
       throw new RuntimeException(t);
@@ -495,9 +512,9 @@ public class QueryExecutor {
 
     queryInfo = queryManager.scheduleQuery(session, queryContext, sql, 
jsonExpr, rootNode);
 
-    responseBuilder.setIsForwarded(true);
-    responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
     responseBuilder.setState(OK);
+    responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+    responseBuilder.setResultType(ResultType.FETCH);
     if (queryInfo.getQueryMasterHost() != null) {
       responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java 
b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 1df8e7a..24534b0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -308,30 +308,17 @@ public class QueryExecutorServlet extends HttpServlet {
     }
 
     public void run() {
+
       startTime = System.currentTimeMillis();
+
       try {
-        if (!tajoClient.getCurrentDatabase().equals(database))
+        if (!tajoClient.getCurrentDatabase().equals(database)) {
           tajoClient.selectDatabase(database);
+        }
 
         response = tajoClient.executeQuery(query);
 
-        if (response == null) {
-          LOG.error("Internal Error: SubmissionResponse is NULL");
-          error = new Exception("Internal Error: SubmissionResponse is NULL");
-
-        } else if (isSuccess(response.getState())) {
-          if (response.getIsForwarded()) {
-            queryId = new QueryId(response.getQueryId());
-            getQueryResult(queryId);
-          } else {
-            if (!response.hasTableDesc() && !response.hasResultSet()) {
-            } else {
-              getSimpleQueryResult(response);
-            }
-
-            progress.set(100);
-          }
-        } else if (isError(response.getState())) {
+        if (isError(response.getState())) {
           StringBuffer errorMessage = new 
StringBuffer(response.getState().getMessage());
           String modifiedMessage;
 
@@ -345,7 +332,23 @@ public class QueryExecutorServlet extends HttpServlet {
           modifiedMessage = modifiedMessage.replaceAll(lineSeparator, "<br/>");
 
           error = new Exception(modifiedMessage);
+
+        } else {
+
+          switch (response.getResultType()) {
+          case ENCLOSED:
+            getSimpleQueryResult(response);
+            break;
+          case FETCH:
+            queryId = new QueryId(response.getQueryId());
+            getQueryResult(queryId);
+            break;
+          default:;
+          }
+
+          progress.set(100);
         }
+
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         error = e;

http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java 
b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index 60f7ca3..ebb19e4 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -23,12 +23,15 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.exception.SQLExceptionUtil;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.ClientProtos;
 
 import java.sql.*;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.tajo.client.TajoClientUtil.NULL_RESULT_SET;
+
 public class TajoStatement implements Statement {
   protected JdbcConnection conn;
   protected TajoClient tajoClient;
@@ -161,18 +164,22 @@ public class TajoStatement implements Statement {
     SQLExceptionUtil.throwIfError(response.getState());
 
     QueryId queryId = new QueryId(response.getQueryId());
-    if (response.getIsForwarded() && !queryId.isNull()) {
+
+    switch (response.getResultType()) {
+
+    case ENCLOSED:
+      return TajoClientUtil.createResultSet(tajoClient, response, fetchSize);
+
+    case FETCH:
       WaitingResultSet result = new WaitingResultSet(tajoClient, queryId, 
fetchSize);
       if (blockWait) {
         result.getSchema();
       }
       return result;
-    }
 
-    if (response.hasResultSet() || response.hasTableDesc()) {
-      return TajoClientUtil.createResultSet(tajoClient, response, fetchSize);
+    default:
+      return TajoClientUtil.createNullResultSet(queryId);
     }
-    return TajoClientUtil.createNullResultSet(queryId);
   }
 
   protected void checkConnection(String errorMsg) throws SQLException {
@@ -210,7 +217,7 @@ public class TajoStatement implements Statement {
     Map<String, String> variable = new HashMap<String, String>();
     variable.put(tokens[0].trim(), tokens[1].trim());
     client.updateSessionVariables(variable);
-    return TajoClientUtil.createNullResultSet();
+    return NULL_RESULT_SET;
   }
 
   private ResultSet unSetSessionVariable(TajoClient client, String sql) throws 
SQLException {
@@ -225,7 +232,7 @@ public class TajoStatement implements Statement {
     }
     client.unsetSessionVariables(Lists.newArrayList(key));
 
-    return TajoClientUtil.createNullResultSet();
+    return NULL_RESULT_SET;
   }
 
   @Override

Reply via email to