Repository: tajo Updated Branches: refs/heads/master 809cba375 -> 307c6c9f5
TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager. Closes #334 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/307c6c9f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/307c6c9f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/307c6c9f Branch: refs/heads/master Commit: 307c6c9f558ae96ffa2f7351ffa3dc5788ba4dbb Parents: 809cba3 Author: Hyunsik Choi <[email protected]> Authored: Fri Jan 9 14:47:47 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jan 9 14:47:47 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/master/QueryJobManager.java | 59 +++++++------ .../apache/tajo/master/TajoMasterService.java | 7 -- .../master/rm/TajoWorkerResourceManager.java | 2 +- .../tajo/master/rm/WorkerResourceManager.java | 2 +- .../master/scheduler/SimpleFifoScheduler.java | 3 +- .../tajo/querymaster/QueryInProgress.java | 87 ++------------------ .../apache/tajo/querymaster/QueryJobEvent.java | 5 +- .../apache/tajo/querymaster/QueryMaster.java | 2 +- .../tajo/querymaster/QueryMasterTask.java | 47 +---------- .../src/main/proto/TajoMasterProtocol.proto | 1 - .../apache/tajo/querymaster/TestKillQuery.java | 2 +- 12 files changed, 53 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 369dbda..35670d7 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1282: Cleanup the relationship of QueryInProgress and + QueryJobManager. (hyunsik) + TAJO-1258: Close() for classes derived from FileAppender should be robust. (Jongyoung Park via jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java index c9b8711..6a8da27 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java @@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.scheduler.SimpleFifoScheduler; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.master.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +/** + * QueryJobManager manages all scheduled and running queries. + * It receives all Query related events and routes them to each QueryInProgress. + */ public class QueryJobManager extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); @@ -69,7 +72,7 @@ public class QueryJobManager extends CompositeService { } @Override - public void init(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { try { this.dispatcher = new AsyncDispatcher(); addService(this.dispatcher); @@ -81,24 +84,24 @@ public class QueryJobManager extends CompositeService { catchException(null, e); } - super.init(conf); + super.serviceInit(conf); } @Override - public void stop() { + public void serviceStop() throws Exception { synchronized(runningQueries) { for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stop(); + eachQueryInProgress.stopProgress(); } } this.scheduler.stop(); - super.stop(); + super.serviceStop(); } @Override - public void start() { + public void serviceStart() throws Exception { this.scheduler.start(); - super.start(); + super.serviceStart(); } public EventHandler getEventHandler() { @@ -164,39 +167,42 @@ public class QueryJobManager extends CompositeService { runningQueries.put(queryInProgress.getQueryId(), queryInProgress); } - addService(queryInProgress); - queryInProgress.init(getConfig()); - queryInProgress.start(); - - if (!queryInProgress.startQueryMaster()) { + if (queryInProgress.startQueryMaster()) { + dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, + queryInProgress.getQueryInfo())); + } else { stopQuery(queryId); } return queryInProgress.getQueryInfo(); } - public TajoMaster.MasterContext getMasterContext() { - return masterContext; - } - class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> { + @Override public void handle(QueryJobEvent event) { QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); - if(queryInProgress == null) { + + + if (queryInProgress == null) { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + + if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { + queryInProgress.submmitQueryToMaster(); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { stopQuery(event.getQueryInfo().getQueryId()); - } else if (queryInProgress.isStarted()) { - queryInProgress.getEventHandler().handle(event); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - + queryInProgress.kill(); stopQuery(queryInProgress.getQueryId()); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { + queryInProgress.heartbeat(event.getQueryInfo()); } } } @@ -219,7 +225,7 @@ public class QueryJobManager extends CompositeService { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { - queryInProgress.stop(); + queryInProgress.stopProgress(); synchronized(submittedQueries) { submittedQueries.remove(queryId); } @@ -245,7 +251,6 @@ public class QueryJobManager extends CompositeService { avgExecutionTime.set(executionTime); } executedQuerySize.incrementAndGet(); - removeService(queryInProgress); } else { LOG.warn("No QueryInProgress while query stopping: " + queryId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index a7df206..02bdfa1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -136,13 +136,6 @@ public class TajoMasterService extends AbstractService { } @Override - public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request, - RpcCallback<BoolProto> done) { - context.getQueryJobManager().stopQuery(new QueryId(request)); - done.run(BOOL_TRUE); - } - - @Override public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request, RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) { http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index c4200d5..9f2a3d5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -526,7 +526,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } @Override - public void stopQueryMaster(QueryId queryId) { + public void releaseQueryMaster(QueryId queryId) { if(!rmContext.getQueryMasterContainer().containsKey(queryId)) { LOG.warn("No QueryMaster resource info for " + queryId); return; http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index b237cc5..79ec0ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -80,7 +80,7 @@ public interface WorkerResourceManager extends Service { * * @param queryId QueryId to be stopped */ - public void stopQueryMaster(QueryId queryId); + public void releaseQueryMaster(QueryId queryId); /** * http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java index bd8ca28..a091ed5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java @@ -58,7 +58,8 @@ public class SimpleFifoScheduler implements Scheduler { LOG.info("Size of Fifo queue is " + qSize); } - QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, + queryInProgress.getQueryInfo().getStartTime()); boolean result = pool.add(querySchedulingInfo); if (getRunningQueries().size() == 0) wakeupProcessor(); return result; http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java index bda2ec1..f83f244 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java @@ -20,11 +20,7 @@ package org.apache.tajo.querymaster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; @@ -35,12 +31,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; @@ -48,15 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; -public class QueryInProgress extends CompositeService { +public class QueryInProgress { private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); private QueryId queryId; private Session session; - private AsyncDispatcher dispatcher; - private LogicalRootNode plan; private AtomicBoolean querySubmitted = new AtomicBoolean(false); @@ -76,7 +70,7 @@ public class QueryInProgress extends CompositeService { Session session, QueryContext queryContext, QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) { - super(QueryInProgress.class.getName()); + this.masterContext = masterContext; this.session = session; this.queryId = queryId; @@ -86,23 +80,14 @@ public class QueryInProgress extends CompositeService { queryInfo.setStartTime(System.currentTimeMillis()); } - @Override - public void init(Configuration conf) { - dispatcher = new AsyncDispatcher(); - this.addService(dispatcher); - - dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); - super.init(conf); - } - public synchronized void kill() { + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); if(queryMasterRpcClient != null){ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); } } - @Override - public void stop() { + public void stopProgress() { if(stopped.getAndSet(true)) { return; } @@ -110,52 +95,15 @@ public class QueryInProgress extends CompositeService { LOG.info("========================================================="); LOG.info("Stop query:" + queryId); - masterContext.getResourceManager().stopQueryMaster(queryId); - - long startTime = System.currentTimeMillis(); - while(true) { - try { - if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) { - LOG.info(queryId + " QueryMaster stopped"); - break; - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - break; - } - - try { - synchronized (this){ - wait(100); - } - } catch (InterruptedException e) { - break; - } - if(System.currentTimeMillis() - startTime > 60 * 1000) { - LOG.warn("Failed to stop QueryMaster:" + queryId); - break; - } - } + masterContext.getResourceManager().releaseQueryMaster(queryId); if(queryMasterRpc != null) { RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); } masterContext.getHistoryWriter().appendHistory(queryInfo); - super.stop(); - } - - @Override - public void start() { - super.start(); } - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - - public boolean startQueryMaster() { try { LOG.info("Initializing QueryInProgress for QueryID=" + queryId); @@ -173,8 +121,6 @@ public class QueryInProgress extends CompositeService { queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); - getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); - return true; } catch (Exception e) { catchException(e); @@ -182,23 +128,6 @@ public class QueryInProgress extends CompositeService { } } - class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> { - @Override - public void handle(QueryJobEvent queryJobEvent) { - if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { - heartbeat(queryJobEvent.getQueryInfo()); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { - QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId); - queryInProgress.getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { - submmitQueryToMaster(); - } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - kill(); - } - } - } - private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); @@ -207,7 +136,7 @@ public class QueryInProgress extends CompositeService { queryMasterRpcClient = queryMasterRpc.getStub(); } - private synchronized void submmitQueryToMaster() { + public synchronized void submmitQueryToMaster() { if(querySubmitted.get()) { return; } @@ -256,7 +185,7 @@ public class QueryInProgress extends CompositeService { return !stopped.get() && this.querySubmitted.get(); } - private void heartbeat(QueryInfo queryInfo) { + public void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); // to avoid partial update by different heartbeats http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java index 1a1f2ff..27eb2b6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java @@ -35,12 +35,9 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> { } public enum Type { - QUERY_JOB_START, + QUERY_MASTER_START, QUERY_JOB_HEARTBEAT, - QUERY_JOB_FINISH, QUERY_JOB_STOP, - QUERY_MASTER_START, - QUERY_MASTER_STOP, QUERY_JOB_KILL } } http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 76df397..02760a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -473,7 +473,7 @@ public class QueryMaster extends CompositeService implements EventHandler { public void handle(QueryStartEvent event) { LOG.info("Start QueryStartEventHandler:" + event.getQueryId()); QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext, - event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson()); + event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr()); synchronized(queryMasterTasks) { queryMasterTasks.put(event.getQueryId(), queryMasterTask); http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index bab5903..fd52488 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -41,13 +41,10 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -58,8 +55,7 @@ import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; @@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService { private Query query; - private MasterPlan masterPlan; - private String jsonExpr; - private String logicalPlanJson; - private AsyncDispatcher dispatcher; private final long querySubmitTime; @@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService { new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, - QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, - String logicalPlanJson) { + QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) { super(QueryMasterTask.class.getName()); this.queryMasterContext = queryMasterContext; @@ -133,7 +124,6 @@ public class QueryMasterTask extends CompositeService { this.session = session; this.queryContext = queryContext; this.jsonExpr = jsonExpr; - this.logicalPlanJson = logicalPlanJson; this.querySubmitTime = System.currentTimeMillis(); } @@ -198,42 +188,11 @@ public class QueryMasterTask extends CompositeService { LOG.fatal(t.getMessage(), t); } - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - - super.stop(); - - //TODO change report to tajo master if (queryMetrics != null) { queryMetrics.report(new MetricsConsoleReporter()); } + super.stop(); LOG.info("Stopped QueryMasterTask:" + queryId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/proto/TajoMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index cc83e47..bc73596 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -143,6 +143,5 @@ service TajoMasterProtocolService { rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc stopQueryMaster(QueryIdProto) returns (BoolProto); rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index a125196..bd899cd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -90,7 +90,7 @@ public class TestKillQuery { QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); + queryId, session, defaultContext, expr.toJson()); queryMasterTask.init(conf); queryMasterTask.getQueryTaskContext().getDispatcher().start();
