Repository: tajo Updated Branches: refs/heads/master 7a1ac28df -> cb9793b99
TAJO-1228: TajoClient should communicate with only TajoMaster without TajoWorker. Closes #317 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cb9793b9 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cb9793b9 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cb9793b9 Branch: refs/heads/master Commit: cb9793b990f1c882e3371a44e6c3f28fe913c1a7 Parents: 7a1ac28 Author: Hyunsik Choi <[email protected]> Authored: Mon Jan 5 17:44:14 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Jan 5 17:44:14 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/client/QueryClientImpl.java | 99 +++------------ .../apache/tajo/client/SessionConnection.java | 19 +-- tajo-client/src/main/proto/ClientProtos.proto | 22 ++-- .../main/proto/QueryMasterClientProtocol.proto | 4 - .../tajo/master/TajoMasterClientService.java | 18 +-- .../master/querymaster/QueryInProgress.java | 66 +++++----- .../tajo/master/querymaster/QueryInfo.java | 41 +++++- .../master/querymaster/QueryJobManager.java | 5 + .../tajo/master/querymaster/QueryMaster.java | 6 +- .../tajo/worker/TajoWorkerClientService.java | 127 ------------------- .../src/main/proto/TajoMasterProtocol.proto | 7 +- .../org/apache/tajo/TajoTestingCluster.java | 24 +--- 13 files changed, 135 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b0b1685..8a4ffe1 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1228: TajoClient should communicate with only TajoMaster without + TajoWorker. (hyunsik) + TAJO-1176: Implements queryable virtual tables for catalog information (jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index f923965..bab3518 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -36,7 +36,6 @@ import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.ServerCallable; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; import java.io.IOException; @@ -95,19 +94,7 @@ public class QueryClientImpl implements QueryClient { @Override public void closeQuery(QueryId queryId) { - if(connection.queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - try { - qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub(); - queryMaster.closeQuery(null, queryId.getProto()); - } catch (Exception e) { - LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connection.connPool.closeConnection(qmClient); - connection.queryMasterMap.remove(queryId); - } - } + // nothing to do } @Override @@ -318,63 +305,21 @@ public class QueryClientImpl implements QueryClient { ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder(); builder.setQueryId(queryId.getProto()); - ClientProtos.GetQueryStatusResponse res = null; - - if(connection.queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - - try { - - qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId), - QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(qmClient); - } - - } else { + GetQueryStatusResponse res = null; - NettyClientBase tmClient = null; - - try { - tmClient = connection.getTajoMasterConnection(false); - connection.checkSessionAndGet(tmClient); - builder.setSessionId(connection.sessionId); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - - res = tajoMasterService.getQueryStatus(null, builder.build()); - - String queryMasterHost = res.getQueryMasterHost(); - - if(queryMasterHost != null && !queryMasterHost.isEmpty()) { - NettyClientBase qmClient = null; - - try { - - InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); - qmClient = connection.connPool.getConnection( - qmAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - - connection.queryMasterMap.put(queryId, qmAddr); + NettyClientBase tmClient = null; + try { + tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + builder.setSessionId(connection.sessionId); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(qmClient); - } - } + res = tajoMasterService.getQueryStatus(null, builder.build()); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); - } + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(tmClient); } return new QueryStatus(res); } @@ -404,29 +349,25 @@ public class QueryClientImpl implements QueryClient { return null; } - NettyClientBase client = null; + NettyClientBase tmClient = null; try { - InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId); - if(queryMasterAddr == null) { - LOG.warn("No Connection to QueryMaster for " + queryId); - return null; - } - - client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); + tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder(); builder.setQueryId(queryId.getProto()); - GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build()); + builder.setSessionId(connection.sessionId); + GetQueryResultResponse response = tajoMasterService.getQueryResult(null,builder.build()); return response; } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } finally { - connection.connPool.releaseConnection(client); + connection.connPool.releaseConnection(tmClient); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index c849f2d..1bc8050 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -21,7 +21,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; @@ -46,7 +46,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; @@ -59,8 +58,6 @@ public class SessionConnection implements Closeable { private final TajoConf conf; - final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>(); - final InetSocketAddress tajoMasterAddr; final RpcConnectionPool connPool; @@ -117,23 +114,11 @@ public class SessionConnection implements Closeable { return Collections.unmodifiableMap(sessionVarsCache); } - public <T> T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, - ConnectTimeoutException, ClassNotFoundException { - InetSocketAddress addr = queryMasterMap.get(queryId); - return connPool.getConnection(addr, protocolClass, asyncMode).getStub(); - } - public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); } - public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode) - throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - InetSocketAddress addr = queryMasterMap.get(queryId); - return connPool.getConnection(addr, protocolClass, asyncMode); - } - public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { return connPool.getConnection(addr, protocolClass, asyncMode); @@ -321,8 +306,6 @@ public class SessionConnection implements Closeable { if(connPool != null) { connPool.shutdown(); } - - queryMasterMap.clear(); } protected InetSocketAddress getTajoMasterAddr() { http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index a741268..a9f5498 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -87,7 +87,7 @@ message GetQueryResultRequest { message GetQueryResultResponse { optional TableDescProto tableDesc = 1; optional string errorMessage = 2; - required string tajoUserName = 3; + optional string tajoUserName = 3; } message QueryIdRequest { @@ -242,15 +242,17 @@ message FunctionResponse { message QueryInfoProto { required string queryId = 1; optional string sql = 2; - optional QueryState queryState = 3; - optional float progress = 4; - optional int64 startTime = 5; - optional int64 finishTime = 6; - optional string lastMessage = 7; - optional string hostNameOfQM = 8; - optional int32 queryMasterPort = 9; - optional int32 queryMasterClientPort = 10; - optional int32 queryMasterInfoPort = 11; + optional KeyValueSetProto contextVars= 3; + optional QueryState queryState = 4; + optional float progress = 5; + optional int64 startTime = 6; + optional int64 finishTime = 7; + optional string lastMessage = 8; + optional string hostNameOfQM = 9; + optional int32 queryMasterPort = 10; + optional int32 queryMasterClientPort = 11; + optional int32 queryMasterInfoPort = 12; + optional TableDescProto resultDesc = 13; } message StageHistoryProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/QueryMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto index 3d8d70b..0b11566 100644 --- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto @@ -28,9 +28,5 @@ import "PrimitiveProtos.proto"; import "ClientProtos.proto"; service QueryMasterClientProtocolService { - rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto); - rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse); - rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse); - rpc closeQuery(QueryIdProto) returns (BoolProto); rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index c413b65..249d335 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -345,6 +345,7 @@ public class TajoMasterClientService extends AbstractService { } GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder(); + builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); // If we cannot the QueryInfo instance from the finished list, // the query result was expired due to timeout. @@ -354,20 +355,16 @@ public class TajoMasterClientService extends AbstractService { return builder.build(); } - try { - //TODO After implementation Tajo's user security feature, Should be modified. - builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); - } catch (IOException e) { - LOG.warn("Can't get current user name"); - } switch (queryInfo.getQueryState()) { case QUERY_SUCCEEDED: - // TODO check this logic needed - //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto()); + if (queryInfo.hasResultdesc()) { + builder.setTableDesc(queryInfo.getResultDesc().getProto()); + } break; case QUERY_FAILED: case QUERY_ERROR: builder.setErrorMessage("Query " + queryId + " is failed"); + break; default: builder.setErrorMessage("Query " + queryId + " is still running"); } @@ -479,6 +476,11 @@ public class TajoMasterClientService extends AbstractService { if (queryInfo != null) { builder.setResultCode(ResultCode.OK); builder.setState(queryInfo.getQueryState()); + + boolean isCreateTable = queryInfo.getQueryContext().isCreateTable(); + boolean isInsert = queryInfo.getQueryContext().isInsert(); + builder.setHasResult(!(isCreateTable || isInsert)); + builder.setProgress(queryInfo.getProgress()); builder.setSubmitTime(queryInfo.getStartTime()); if(queryInfo.getQueryMasterHost() != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index e361c7f..ca0bd72 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -24,13 +24,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -39,6 +36,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; @@ -57,8 +55,6 @@ public class QueryInProgress extends CompositeService { private Session session; - private QueryContext queryContext; - private TajoAsyncDispatcher dispatcher; private LogicalRootNode plan; @@ -75,8 +71,6 @@ public class QueryInProgress extends CompositeService { private QueryMasterProtocolService queryMasterRpcClient; - private ContainerProtocol.TajoContainerIdProto qmContainerId; - public QueryInProgress( TajoMaster.MasterContext masterContext, Session session, @@ -85,11 +79,10 @@ public class QueryInProgress extends CompositeService { super(QueryInProgress.class.getName()); this.masterContext = masterContext; this.session = session; - this.queryContext = queryContext; this.queryId = queryId; this.plan = plan; - queryInfo = new QueryInfo(queryId, sql, jsonExpr); + queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); queryInfo.setStartTime(System.currentTimeMillis()); } @@ -145,7 +138,7 @@ public class QueryInProgress extends CompositeService { } if(queryMasterRpc != null) { - RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc); + RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); } masterContext.getHistoryWriter().appendHistory(queryInfo); @@ -212,7 +205,7 @@ public class QueryInProgress extends CompositeService { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); queryMasterRpc = - RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true); + RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } @@ -235,8 +228,8 @@ public class QueryInProgress extends CompositeService { QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder(); builder.setQueryId(queryId.getProto()) + .setQueryContext(queryInfo.getQueryContext().getProto()) .setSession(session.getProto()) - .setQueryContext(queryContext.getProto()) .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); @@ -267,28 +260,37 @@ public class QueryInProgress extends CompositeService { private void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); - this.queryInfo.setQueryState(queryInfo.getQueryState()); - this.queryInfo.setProgress(queryInfo.getProgress()); - this.queryInfo.setFinishTime(queryInfo.getFinishTime()); - if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { - this.queryInfo.setLastMessage(queryInfo.getLastMessage()); - LOG.info(queryId + queryInfo.getLastMessage()); - } - if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { - //TODO needed QueryMaster's detail status(failed before or after launching worker) - //queryMasterStopped.set(true); - LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); - } + // to avoid partial update by different heartbeats + synchronized (this.queryInfo) { - if(!querySubmitted.get()) { - getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo)); - } + // terminal state will let client to retrieve a query result + // So, we must set the query result before changing query state + if (isFinishState(queryInfo.getQueryState())) { + if (queryInfo.hasResultdesc()) { + this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + } + } + + this.queryInfo.setQueryState(queryInfo.getQueryState()); + this.queryInfo.setProgress(queryInfo.getProgress()); + this.queryInfo.setFinishTime(queryInfo.getFinishTime()); - if(isFinishState(this.queryInfo.getQueryState())) { - getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo)); + // Update diagnosis message + if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { + this.queryInfo.setLastMessage(queryInfo.getLastMessage()); + LOG.info(queryId + queryInfo.getLastMessage()); + } + + // if any error occurs, print outs the error message + if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { + LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); + } + + + if (isFinishState(this.queryInfo.getQueryState())) { + stop(); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 00b95ac..559fc14 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -22,7 +22,9 @@ package org.apache.tajo.master.querymaster; import com.google.gson.annotations.Expose; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; import org.apache.tajo.json.GsonObject; import org.apache.tajo.util.TajoIdUtils; @@ -31,15 +33,17 @@ import org.apache.tajo.util.history.History; public class QueryInfo implements GsonObject, History { private QueryId queryId; @Expose + private QueryContext context; + @Expose private String sql; @Expose - private TajoProtos.QueryState queryState; + private volatile TajoProtos.QueryState queryState; @Expose - private float progress; + private volatile float progress; @Expose - private long startTime; + private volatile long startTime; @Expose - private long finishTime; + private volatile long finishTime; @Expose private String lastMessage; @Expose @@ -52,18 +56,22 @@ public class QueryInfo implements GsonObject, History { private int queryMasterInfoPort; @Expose private String queryIdStr; + @Expose + private volatile TableDesc resultDesc; private String jsonExpr; public QueryInfo(QueryId queryId) { - this(queryId, null, null); + this(queryId, null, null, null); } - public QueryInfo(QueryId queryId, String sql, String jsonExpr) { + public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) { this.queryId = queryId; this.queryIdStr = queryId.toString(); + this.context = queryContext; this.sql = sql; this.jsonExpr = jsonExpr; + this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT; } @@ -71,6 +79,10 @@ public class QueryInfo implements GsonObject, History { return queryId; } + public QueryContext getQueryContext() { + return context; + } + public String getSql() { return sql; } @@ -147,6 +159,18 @@ public class QueryInfo implements GsonObject, History { this.progress = progress; } + public void setResultDesc(TableDesc result) { + this.resultDesc = result; + } + + public boolean hasResultdesc() { + return resultDesc != null; + } + + public TableDesc getResultDesc() { + return resultDesc; + } + @Override public String toString() { return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster=" @@ -182,6 +206,7 @@ public class QueryInfo implements GsonObject, History { builder.setQueryId(queryId.toString()) .setQueryState(queryState) + .setContextVars(context.getProto()) .setProgress(progress) .setStartTime(startTime) .setFinishTime(finishTime) @@ -189,6 +214,10 @@ public class QueryInfo implements GsonObject, History { .setQueryMasterClientPort(queryMasterClientPort) .setQueryMasterInfoPort(queryMasterInfoPort); + if (resultDesc != null) { + builder.setResultDesc(resultDesc.getProto()); + } + if (sql != null) { builder.setSql(sql); } http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index ddbd3e1..34a0d01 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; @@ -300,6 +301,10 @@ public class QueryJobManager extends CompositeService { queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime()); } + if (queryHeartbeat.hasResultDesc()) { + queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc())); + } + return queryInfo; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 7ddd787..7623026 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -454,10 +454,12 @@ public class QueryMaster extends CompositeService implements EventHandler { TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); - builder.setState(queryMasterTask.getState()); builder.setQueryId(queryMasterTask.getQueryId().getProto()); - + builder.setState(queryMasterTask.getState()); if (queryMasterTask.getQuery() != null) { + if (queryMasterTask.getQuery().getResultDesc() != null) { + builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); + } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 0f4a60c..1c83110 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -24,31 +24,20 @@ import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.history.QueryHistory; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collection; public class TajoWorkerClientService extends AbstractService { private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class); @@ -79,15 +68,12 @@ public class TajoWorkerClientService extends AbstractService { this.serviceHandler = new TajoWorkerClientProtocolServiceHandler(); // init RPC Server in constructor cause Heartbeat Thread use bindAddr - // Setup RPC server try { - // TODO initial port num is value of config and find unused port with sequence InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port); if (initIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initIsa); } - // TODO blocking/non-blocking?? int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM); this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum); this.rpcServer.start(); @@ -124,119 +110,6 @@ public class TajoWorkerClientService extends AbstractService { public class TajoWorkerClientProtocolServiceHandler implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface { - @Override - public PrimitiveProtos.BoolProto updateSessionVariables( - RpcController controller, - ClientProtos.UpdateSessionVariableRequest request) throws ServiceException { - return null; - } - - private boolean hasResultTableDesc(QueryContext queryContext) { - return !(queryContext.isCreateTable() || queryContext.isInsert()); - } - - @Override - public ClientProtos.GetQueryResultResponse getQueryResult( - RpcController controller, - ClientProtos.GetQueryResultRequest request) throws ServiceException { - QueryId queryId = new QueryId(request.getQueryId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - - ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder(); - try { - builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); - } catch (IOException e) { - LOG.warn("Can't get current user name"); - } - - if(queryMasterTask == null || queryMasterTask.getQuery() == null) { - builder.setErrorMessage("No Query for " + queryId); - } else { - switch (queryMasterTask.getState()) { - case QUERY_SUCCEEDED: -// if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) { - builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); - //} - break; - case QUERY_FAILED: - case QUERY_ERROR: - builder.setErrorMessage("Query " + queryId + " is failed"); - default: - builder.setErrorMessage("Query " + queryId + " is still running"); - } - } - return builder.build(); - } - - @Override - public ClientProtos.GetQueryStatusResponse getQueryStatus( - RpcController controller, - ClientProtos.GetQueryStatusRequest request) throws ServiceException { - ClientProtos.GetQueryStatusResponse.Builder builder - = ClientProtos.GetQueryStatusResponse.newBuilder(); - QueryId queryId = new QueryId(request.getQueryId()); - - builder.setQueryId(request.getQueryId()); - - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - builder.setResultCode(ClientProtos.ResultCode.OK); - builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED); - } else { - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId); - - builder.setResultCode(ClientProtos.ResultCode.OK); - builder.setQueryMasterHost(bindAddr.getHostName()); - builder.setQueryMasterPort(bindAddr.getPort()); - - if (queryMasterTask == null) { - queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - } - if (queryMasterTask == null) { - builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED); - return builder.build(); - } - - builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())); - - queryMasterTask.touchSessionTime(); - Query query = queryMasterTask.getQuery(); - - if (query != null) { - builder.setState(queryMasterTask.getState()); - builder.setProgress(query.getProgress()); - builder.setSubmitTime(query.getAppSubmitTime()); - if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { - builder.setFinishTime(query.getFinishTime()); - } else { - builder.setFinishTime(System.currentTimeMillis()); - } - } - Collection<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = queryMasterTask.getDiagnostics(); - if(!diagnostics.isEmpty()) { - TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next(); - builder.setErrorMessage(firstError.getErrorMessage()); - builder.setErrorTrace(firstError.getErrorTrace()); - } - - if (queryMasterTask.isInitError()) { - Throwable initError = queryMasterTask.getInitError(); - builder.setErrorMessage( - initError.getMessage() == null ? initError.getClass().getName() : initError.getMessage()); - builder.setErrorTrace(StringUtils.stringifyException(initError)); - builder.setState(queryMasterTask.getState()); - } - } - return builder.build(); - } - - @Override - public PrimitiveProtos.BoolProto closeQuery ( - RpcController controller, - TajoIdProtos.QueryIdProto request) throws ServiceException { - final QueryId queryId = new QueryId(request); - LOG.info("Stop Query:" + queryId); - return BOOL_TRUE; - } @Override public GetQueryHistoryResponse getQueryHistory(RpcController controller, QueryIdRequest request) throws ServiceException { http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/proto/TajoMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index e5eab4f..cc83e47 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -66,9 +66,10 @@ message TajoHeartbeat { required WorkerConnectionInfoProto connectionInfo = 1; optional QueryIdProto queryId = 2; optional QueryState state = 3; - optional string statusMessage = 4; - optional float queryProgress = 5; - optional int64 queryFinishTime = 6; + optional TableDescProto resultDesc = 4; + optional string statusMessage = 5; + optional float queryProgress = 6; + optional int64 queryFinishTime = 7; } message TajoHeartbeatResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 5ff637c..841be45 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -44,8 +44,8 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.querymaster.*; import org.apache.tajo.master.querymaster.Query; -import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.querymaster.StageState; import org.apache.tajo.master.rm.TajoWorkerResourceManager; @@ -781,14 +781,16 @@ public class TajoTestingCluster { } public void waitForQueryRunning(QueryId queryId, int delay) throws Exception { - QueryMasterTask qmt = null; + QueryInProgress qip = null; int i = 0; - while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) { + while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) { try { Thread.sleep(delay); - if(qmt == null){ - qmt = getQueryMasterTask(queryId); + if(qip == null){ + + TajoMaster master = getMaster(); + qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId); } } catch (InterruptedException e) { } @@ -824,16 +826,4 @@ public class TajoTestingCluster { } } } - - public QueryMasterTask getQueryMasterTask(QueryId queryId) { - QueryMasterTask qmt = null; - for (TajoWorker worker : getTajoWorkers()) { - qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId); - if (qmt != null) { - break; - } - } - - return qmt; - } }
