Repository: tajo Updated Branches: refs/heads/master a1d5aeee7 -> c294d88e1
TAJO-1701: Remove forward or non-forward query concept in TajoClient. Closes #645 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c294d88e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c294d88e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c294d88e Branch: refs/heads/master Commit: c294d88e12216a0ccee695957d01b51d57edeb87 Parents: a1d5aee Author: Hyunsik Choi <[email protected]> Authored: Fri Jul 24 15:45:05 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jul 24 15:45:05 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../java/org/apache/tajo/cli/tsql/TajoCli.java | 45 +++++++++------- .../org/apache/tajo/client/QueryClientImpl.java | 43 +++++---------- .../org/apache/tajo/client/TajoClientUtil.java | 5 +- tajo-client/src/main/proto/ClientProtos.proto | 26 +++++---- .../org/apache/tajo/master/GlobalEngine.java | 1 - .../org/apache/tajo/master/QueryManager.java | 1 + .../tajo/master/TajoMasterClientService.java | 1 - .../apache/tajo/master/exec/QueryExecutor.java | 55 +++++++++++++------- .../tajo/webapp/QueryExecutorServlet.java | 39 +++++++------- .../org/apache/tajo/jdbc/TajoStatement.java | 21 +++++--- 11 files changed, 133 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4403673..287c371 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1701: Remove forward or non-forward query concept in TajoClient. + (hyunsik) + TAJO-1651: Too long fetcher default retries. (jinho) TAJO-1700: Add better exception handling in TajoMasterClientService. http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index bc5fa7a..c6409f1 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -31,9 +31,9 @@ import org.apache.tajo.cli.tsql.ParsedResult.StatementType; import org.apache.tajo.cli.tsql.SimpleParser.ParsingState; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; -import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; @@ -42,7 +42,10 @@ import java.io.*; import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class TajoCli { public static final String ERROR_PREFIX = "ERROR: "; @@ -491,18 +494,21 @@ public class TajoCli { ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json); if (response == null) { onError("response is null", null); + } else if (ReturnStateUtil.isSuccess(response.getState())) { - if (response.getIsForwarded()) { + + switch (response.getResultType()) { + case FETCH: QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); - } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - displayFormatter.printMessage(sout, "OK"); - wasError = true; - } else { - localQueryCompleted(response, startTime); - } + break; + case ENCLOSED: + localQueryCompleted(response, startTime); + break; + default: + displayFormatter.printMessage(sout, "OK"); } + } else { if (ReturnStateUtil.isError(response.getState())) { onError(response.getState().getMessage(), null); @@ -521,17 +527,21 @@ public class TajoCli { } if (response != null) { + if (ReturnStateUtil.isSuccess(response.getState())) { - if (response.getIsForwarded()) { + + switch (response.getResultType()) { + case FETCH: QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); - } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - displayFormatter.printMessage(sout, "OK"); - } else { - localQueryCompleted(response, startTime); - } + break; + case ENCLOSED: + localQueryCompleted(response, startTime); + break; + default: + displayFormatter.printMessage(sout, "OK"); } + } else { if (ReturnStateUtil.isError(response.getState())) { onError(response.getState().getMessage(), null); @@ -585,7 +595,6 @@ public class TajoCli { int initRetries = 0; int progressRetries = 0; while (true) { - // TODO - configurable status = client.getQueryStatus(queryId); if(TajoClientUtil.isQueryWaitingForSchedule(status.getState())) { Thread.sleep(Math.min(20 * initRetries, 1000)); http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 80a49c2..aeca72b 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -183,24 +183,13 @@ public class QueryClientImpl implements QueryClient { QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded()) { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return this.createNullResultSet(queryId); - } else { + switch (response.getResultType()) { + case ENCLOSED: + return TajoClientUtil.createResultSet(this, response, defaultFetchRows); + case FETCH: return this.getQueryResultAndWait(queryId); - } - - } else { - // If a non-forwarded insert into query - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) { + default: return this.createNullResultSet(queryId); - } else { - if (response.hasResultSet() || response.hasTableDesc()) { - return TajoClientUtil.createResultSet(this, response, defaultFetchRows); - } else { - return this.createNullResultSet(queryId); - } - } } } @@ -211,22 +200,14 @@ public class QueryClientImpl implements QueryClient { throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded()) { - - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return this.createNullResultSet(queryId); - } else { - return this.getQueryResultAndWait(queryId); - } - - } else { - - if (response.hasResultSet() || response.hasTableDesc()) { - return TajoClientUtil.createResultSet(this, response, defaultFetchRows); - } else { - return this.createNullResultSet(queryId); - } + switch (response.getResultType()) { + case ENCLOSED: + return TajoClientUtil.createResultSet(this, response, defaultFetchRows); + case FETCH: + return this.getQueryResultAndWait(queryId); + default: + return this.createNullResultSet(queryId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java index 90894fe..358f1a0 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -106,9 +106,8 @@ public class TajoClientUtil { } } - public static ResultSet createNullResultSet() { - return new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null); - } + public static final ResultSet NULL_RESULT_SET = + new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null); public static ResultSet createNullResultSet(QueryId queryId) { return new TajoMemoryResultSet(queryId, new Schema(), null, 0, null); http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 021cfe7..ecfbbd9 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -121,18 +121,26 @@ message SerializedResultSet { message SubmitQueryResponse { required ReturnState state = 1; - optional QueryIdProto queryId = 2; - optional string userName = 3; - optional bool isForwarded = 4 [default = false]; - optional string queryMasterHost = 5; - optional int32 queryMasterPort = 6; + enum ResultType { + NO_RESULT = 0; // this query does not have any result. + ENCLOSED = 1; // the response encloses the query result. + FETCH = 2; // the query result should be fetched + } + + optional ResultType result_type = 2; + + optional QueryIdProto queryId = 3; + optional string userName = 4; + + optional string queryMasterHost = 6; + optional int32 queryMasterPort = 7; - optional SerializedResultSet resultSet = 7; - optional TableDescProto tableDesc = 8; - optional int32 maxRowNum = 9; + optional SerializedResultSet resultSet = 8; + optional TableDescProto tableDesc = 9; + optional int32 maxRowNum = 10; - optional KeyValueSetProto sessionVars = 12; + optional KeyValueSetProto sessionVars = 11; } message GetQueryStatusResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 8bfe65a..9cd20b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -203,7 +203,6 @@ public class GlobalEngine extends AbstractService { SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME)); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setIsForwarded(true); responseBuilder.setState(ReturnStateUtil.returnError(t)); return responseBuilder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 9a9ec50..2578c9f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -239,6 +239,7 @@ public class QueryManager extends CompositeService { if (queryInProgress == null) { queryInProgress = runningQueries.get(queryId); } + return queryInProgress; } http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index baf1320..78fc0f5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -309,7 +309,6 @@ public class TajoMasterClientService extends AbstractService { return ClientProtos.SubmitQueryResponse.newBuilder() .setState(returnError(t)) .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()) - .setIsForwarded(true) .setUserName(context.getConf().getVar(ConfVars.USERNAME)) .build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index ceb3c4a..42e5f61 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -40,7 +40,9 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec; import org.apache.tajo.engine.planner.physical.InsertRowsExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; +import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.ResultType; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.QueryManager; import org.apache.tajo.master.TajoMaster; @@ -94,7 +96,6 @@ public class QueryExecutor { LogicalPlan plan) throws Exception { SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder(); - response.setIsForwarded(false); response.setUserName(queryContext.get(SessionVars.USERNAME)); LogicalRootNode rootNode = plan.getRootBlock().getRoot(); @@ -105,12 +106,12 @@ public class QueryExecutor { } else if (PlannerUtil.checkIfDDLPlan(rootNode)) { ddlExecutor.execute(queryContext, plan); - response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - response.setState(OK); + response.setState(OK); + response.setResultType(ResultType.NO_RESULT); } else if (plan.isExplain()) { // explain query - execExplain(plan, queryContext, plan.isExplainGlobal(), response); + execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response); } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) { execQueryOnVirtualTable(queryContext, session, sql, plan, response); @@ -121,7 +122,7 @@ public class QueryExecutor { // NonFromQuery indicates a form of 'select a, x+y;' } else if (PlannerUtil.checkIfNonFromQuery(plan)) { - execNonFromQuery(queryContext, plan, response); + execNonFromQuery(queryContext, session, sql, plan, response); } else { // it requires distributed execution. So, the query is forwarded to a query master. executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response); @@ -162,9 +163,9 @@ public class QueryExecutor { response.setState(OK); } - public void execExplain(LogicalPlan plan, QueryContext queryContext, boolean isGlobal, - SubmitQueryResponse.Builder response) - throws Exception { + public void execExplain(Session session, String query, LogicalPlan plan, QueryContext queryContext, boolean isGlobal, + SubmitQueryResponse.Builder response) throws Exception { + String explainStr; boolean isTest = queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED); if (isTest) { @@ -188,7 +189,7 @@ public class QueryExecutor { schema.addColumn("explain", TajoDataTypes.Type.TEXT); RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); - ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); + SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); VTuple tuple = new VTuple(1); String[] lines = explainStr.split("\n"); @@ -202,10 +203,14 @@ public class QueryExecutor { serializedResBuilder.setSchema(schema.getProto()); serializedResBuilder.setBytesNum(bytesNum); + QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, + (LogicalRootNode) plan.getRootBlock().getRoot()); + response.setState(OK); + response.setQueryId(queryInfo.getQueryId().getProto()); + response.setResultType(ResultType.ENCLOSED); response.setResultSet(serializedResBuilder.build()); response.setMaxRowNum(lines.length); - response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); } public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan, @@ -215,16 +220,22 @@ public class QueryExecutor { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); maxRow = (int) limitNode.getFetchFirstNum(); } - QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId()); + QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, + (LogicalRootNode) plan.getRootBlock().getRoot()); - NonForwardQueryResultScanner queryResultScanner = - new NonForwardQueryResultSystemScanner(context, plan, queryId, session.getSessionId(), maxRow); + NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultSystemScanner( + context, + plan, + queryInfo.getQueryId(), + session.getSessionId(), + maxRow); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); response.setState(OK); - response.setQueryId(queryId.getProto()); + response.setQueryId(queryInfo.getQueryId().getProto()); + response.setResultType(ResultType.ENCLOSED); response.setMaxRowNum(maxRow); response.setTableDesc(queryResultScanner.getTableDesc().getProto()); } @@ -261,11 +272,12 @@ public class QueryExecutor { response.setState(OK); response.setQueryId(queryInfo.getQueryId().getProto()); + response.setResultType(ResultType.ENCLOSED); response.setMaxRowNum(maxRow); response.setTableDesc(desc.getProto()); } - public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) + public void execNonFromQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); @@ -291,15 +303,19 @@ public class QueryExecutor { Schema schema = PlannerUtil.targetToSchema(targets); RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); byte[] serializedBytes = encoder.toBytes(outTuple); - ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); + SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); serializedResBuilder.setSchema(schema.getProto()); serializedResBuilder.setBytesNum(serializedBytes.length); + QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, + (LogicalRootNode) plan.getRootBlock().getRoot()); + responseBuilder.setState(OK); + responseBuilder.setResultType(ResultType.ENCLOSED); + responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); responseBuilder.setResultSet(serializedResBuilder); responseBuilder.setMaxRowNum(1); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); } } finally { // stop script executor @@ -461,6 +477,7 @@ public class QueryExecutor { // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. responseBuilder.setMaxRowNum(-1); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultType(ResultType.NO_RESULT); responseBuilder.setState(OK); } catch (Throwable t) { throw new RuntimeException(t); @@ -495,9 +512,9 @@ public class QueryExecutor { queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode); - responseBuilder.setIsForwarded(true); - responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); responseBuilder.setState(OK); + responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); + responseBuilder.setResultType(ResultType.FETCH); if (queryInfo.getQueryMasterHost() != null) { responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 1df8e7a..24534b0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -308,30 +308,17 @@ public class QueryExecutorServlet extends HttpServlet { } public void run() { + startTime = System.currentTimeMillis(); + try { - if (!tajoClient.getCurrentDatabase().equals(database)) + if (!tajoClient.getCurrentDatabase().equals(database)) { tajoClient.selectDatabase(database); + } response = tajoClient.executeQuery(query); - if (response == null) { - LOG.error("Internal Error: SubmissionResponse is NULL"); - error = new Exception("Internal Error: SubmissionResponse is NULL"); - - } else if (isSuccess(response.getState())) { - if (response.getIsForwarded()) { - queryId = new QueryId(response.getQueryId()); - getQueryResult(queryId); - } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - } else { - getSimpleQueryResult(response); - } - - progress.set(100); - } - } else if (isError(response.getState())) { + if (isError(response.getState())) { StringBuffer errorMessage = new StringBuffer(response.getState().getMessage()); String modifiedMessage; @@ -345,7 +332,23 @@ public class QueryExecutorServlet extends HttpServlet { modifiedMessage = modifiedMessage.replaceAll(lineSeparator, "<br/>"); error = new Exception(modifiedMessage); + + } else { + + switch (response.getResultType()) { + case ENCLOSED: + getSimpleQueryResult(response); + break; + case FETCH: + queryId = new QueryId(response.getQueryId()); + getQueryResult(queryId); + break; + default:; + } + + progress.set(100); } + } catch (Exception e) { LOG.error(e.getMessage(), e); error = e; http://git-wip-us.apache.org/repos/asf/tajo/blob/c294d88e/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index 60f7ca3..ebb19e4 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@ -23,12 +23,15 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.ClientProtos; import java.sql.*; import java.util.HashMap; import java.util.Map; +import static org.apache.tajo.client.TajoClientUtil.NULL_RESULT_SET; + public class TajoStatement implements Statement { protected JdbcConnection conn; protected TajoClient tajoClient; @@ -161,18 +164,22 @@ public class TajoStatement implements Statement { SQLExceptionUtil.throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded() && !queryId.isNull()) { + + switch (response.getResultType()) { + + case ENCLOSED: + return TajoClientUtil.createResultSet(tajoClient, response, fetchSize); + + case FETCH: WaitingResultSet result = new WaitingResultSet(tajoClient, queryId, fetchSize); if (blockWait) { result.getSchema(); } return result; - } - if (response.hasResultSet() || response.hasTableDesc()) { - return TajoClientUtil.createResultSet(tajoClient, response, fetchSize); + default: + return TajoClientUtil.createNullResultSet(queryId); } - return TajoClientUtil.createNullResultSet(queryId); } protected void checkConnection(String errorMsg) throws SQLException { @@ -210,7 +217,7 @@ public class TajoStatement implements Statement { Map<String, String> variable = new HashMap<String, String>(); variable.put(tokens[0].trim(), tokens[1].trim()); client.updateSessionVariables(variable); - return TajoClientUtil.createNullResultSet(); + return NULL_RESULT_SET; } private ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException { @@ -225,7 +232,7 @@ public class TajoStatement implements Statement { } client.unsetSessionVariables(Lists.newArrayList(key)); - return TajoClientUtil.createNullResultSet(); + return NULL_RESULT_SET; } @Override
