http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 0c8d8ce..b4ed5fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -18,7 +18,6 @@ package org.apache.tajo.master; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.map.LRUMap; @@ -30,12 +29,15 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.ResourceProtos; +import org.apache.tajo.ResourceProtos.AllocationResourceProto; +import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; +import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.scheduler.SimpleFifoScheduler; +import org.apache.tajo.master.scheduler.QuerySchedulingInfo; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; @@ -54,14 +56,13 @@ import java.util.concurrent.atomic.AtomicLong; */ public class QueryManager extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryManager.class.getName()); + private static final String EMPTY_QM_HOSTNAME = ""; // TajoMaster Context private final TajoMaster.MasterContext masterContext; private AsyncDispatcher dispatcher; - private SimpleFifoScheduler scheduler; - private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); @@ -85,7 +86,6 @@ public class QueryManager extends CompositeService { this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler()); - this.scheduler = new SimpleFifoScheduler(this); } catch (Exception e) { LOG.error("Failed to init service " + getName() + " by exception " + e, e); } @@ -95,18 +95,15 @@ public class QueryManager extends CompositeService { @Override public void serviceStop() throws Exception { - synchronized(runningQueries) { - for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stopProgress(); - } + for(QueryInProgress eachQueryInProgress: runningQueries.values()) { + eachQueryInProgress.stopProgress(); } - this.scheduler.stop(); + super.serviceStop(); } @Override public void serviceStart() throws Exception { - this.scheduler.start(); super.serviceStart(); } @@ -115,28 +112,25 @@ public class QueryManager extends CompositeService { } public Collection<QueryInProgress> getSubmittedQueries() { - synchronized (submittedQueries){ - return Collections.unmodifiableCollection(submittedQueries.values()); - } + return Collections.unmodifiableCollection(submittedQueries.values()); } public Collection<QueryInProgress> getRunningQueries() { - synchronized (runningQueries){ - return Collections.unmodifiableCollection(runningQueries.values()); - } + return Collections.unmodifiableCollection(runningQueries.values()); } public synchronized Collection<QueryInfo> getFinishedQueries() { + Set<QueryInfo> result = Sets.newTreeSet(); + synchronized (historyCache) { + result.addAll(historyCache.values()); + } + try { - Set<QueryInfo> result = Sets.newTreeSet(); result.addAll(this.masterContext.getHistoryReader().getQueries(null)); - synchronized (historyCache) { - result.addAll(historyCache.values()); - } return result; } catch (Throwable e) { LOG.error(e, e); - return Lists.newArrayList(); + return result; } } @@ -171,6 +165,9 @@ public class QueryManager extends CompositeService { return queryInProgress.getQueryInfo(); } + /** + * submit query to scheduler + */ public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql, String jsonExpr, LogicalRootNode plan) throws Exception { @@ -178,35 +175,29 @@ public class QueryManager extends CompositeService { QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, jsonExpr, plan); - synchronized (submittedQueries) { - queryInProgress.getQueryInfo().setQueryMaster(""); - submittedQueries.put(queryInProgress.getQueryId(), queryInProgress); - } + queryInProgress.getQueryInfo().setQueryMaster(EMPTY_QM_HOSTNAME); + submittedQueries.put(queryInProgress.getQueryId(), queryInProgress); + //TODO implement scheduler queue + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo("default", queryContext.getUser(), + queryInProgress.getQueryId(), 1, queryInProgress.getQueryInfo().getStartTime()); - scheduler.addQuery(queryInProgress); + masterContext.getResourceManager().submitQuery(querySchedulingInfo); return queryInProgress.getQueryInfo(); } - public QueryInfo startQueryJob(QueryId queryId) throws Exception { - - QueryInProgress queryInProgress; + /** + * Can start query or not + */ + public boolean startQueryJob(QueryId queryId, AllocationResourceProto allocation) { - synchronized (submittedQueries) { - queryInProgress = submittedQueries.remove(queryId); - } - - synchronized (runningQueries) { + if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) { + QueryInProgress queryInProgress = submittedQueries.remove(queryId); runningQueries.put(queryInProgress.getQueryId(), queryInProgress); - } - - if (queryInProgress.startQueryMaster()) { dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInProgress.getQueryInfo())); - } else { - masterContext.getQueryJobManager().stopQuery(queryInProgress.getQueryId()); + return true; } - - return queryInProgress.getQueryInfo(); + return false; } class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> { @@ -221,10 +212,10 @@ public class QueryManager extends CompositeService { } if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { - queryInProgress.submitQueryToMaster(); + queryInProgress.submitToQueryMaster(); } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - scheduler.removeQuery(queryInProgress.getQueryId()); + queryInProgress.kill(); stopQuery(queryInProgress.getQueryId()); @@ -236,14 +227,10 @@ public class QueryManager extends CompositeService { public QueryInProgress getQueryInProgress(QueryId queryId) { QueryInProgress queryInProgress; - synchronized (submittedQueries) { - queryInProgress = submittedQueries.get(queryId); - } + queryInProgress = submittedQueries.get(queryId); if (queryInProgress == null) { - synchronized (runningQueries) { - queryInProgress = runningQueries.get(queryId); - } + queryInProgress = runningQueries.get(queryId); } return queryInProgress; } @@ -253,13 +240,8 @@ public class QueryManager extends CompositeService { QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { queryInProgress.stopProgress(); - synchronized(submittedQueries) { - submittedQueries.remove(queryId); - } - - synchronized(runningQueries) { - runningQueries.remove(queryId); - } + submittedQueries.remove(queryId); + runningQueries.remove(queryId); QueryInfo queryInfo = queryInProgress.getQueryInfo(); synchronized (historyCache) { @@ -304,8 +286,8 @@ public class QueryManager extends CompositeService { return executedQuerySize.get(); } - public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat( - QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) { + public synchronized TajoHeartbeatResponse.ResponseCommand queryHeartbeat( + TajoHeartbeatRequest queryHeartbeat) { QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId())); if(queryInProgress == null) { return null; @@ -317,7 +299,7 @@ public class QueryManager extends CompositeService { return null; } - private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) { + private QueryInfo makeQueryInfoFromHeartbeat(ResourceProtos.TajoHeartbeatRequest queryHeartbeat) { QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId())); WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java deleted file mode 100644 index 2aac005..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.event.TaskFatalErrorEvent; -import org.apache.tajo.master.rm.TajoWorkerContainer; -import org.apache.tajo.master.rm.TajoWorkerContainerId; -import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.worker.TajoWorker; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class TajoContainerProxy extends ContainerProxy { - private final QueryContext queryContext; - private final TajoWorker.WorkerContext workerContext; - private final String planJson; - - public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context, - Configuration conf, TajoContainer container, - QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) { - super(context, conf, executionBlockId, container); - this.queryContext = queryContext; - this.workerContext = context.getQueryMasterContext().getWorkerContext(); - this.planJson = planJson; - } - - @Override - public synchronized void launch(ContainerLaunchContext containerLaunchContext) { - context.getResourceAllocator().addContainer(containerId, this); - - this.hostName = container.getNodeId().getHost(); - this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort(); - this.state = ContainerState.RUNNING; - - if (LOG.isDebugEnabled()) { - LOG.debug("Launch Container:" + executionBlockId + "," + containerId.getId() + "," + - container.getId() + "," + container.getNodeId() + ", pullServer=" + port); - } - - assignExecutionBlock(executionBlockId, container); - } - - /** - * It sends a kill RPC request to a corresponding worker. - * - * @param taskAttemptId The TaskAttemptId to be killed. - */ - public void killTaskAttempt(TaskAttemptId taskAttemptId) { - NettyClientBase tajoWorkerRpc = null; - try { - InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); - } catch (Throwable e) { - /* Worker RPC failure */ - context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); - } - } - - private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) { - NettyClientBase tajoWorkerRpc; - try { - - InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - - PlanProto.ShuffleType shuffleType = - context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType(); - - TajoWorkerProtocol.RunExecutionBlockRequestProto request = - TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto()) - .setNodeId(container.getNodeId().toString()) - .setContainerId(container.getId().toString()) - .setQueryOutputPath(context.getStagingDir().toString()) - .setQueryContext(queryContext.getProto()) - .setPlanJson(planJson) - .setShuffleType(shuffleType) - .build(); - - tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public synchronized void stopContainer() { - if (LOG.isDebugEnabled()) { - LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerId + ", state:" + this.state); - } - if(isCompletelyDone()) { - LOG.info("Container already stopped:" + containerId); - return; - } - if(this.state == ContainerState.PREP) { - this.state = ContainerState.KILLED_BEFORE_LAUNCH; - } else { - try { - releaseWorkerResource(context, executionBlockId, Arrays.asList(containerId)); - context.getResourceAllocator().removeContainer(containerId); - } catch (Throwable t) { - // ignore the cleanup failure - String message = "cleanup failed for container " - + this.containerId + " : " - + StringUtils.stringifyException(t); - LOG.warn(message); - } finally { - this.state = ContainerState.DONE; - } - } - } - - public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, - ExecutionBlockId executionBlockId, - List<TajoContainerId> containerIds) throws Exception { - List<ContainerProtocol.TajoContainerIdProto> containerIdProtos = - new ArrayList<ContainerProtocol.TajoContainerIdProto>(); - - for(TajoContainerId eachContainerId: containerIds) { - containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); - } - - RpcClientManager manager = RpcClientManager.getInstance(); - NettyClientBase tmClient = null; - - ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - - QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.releaseWorkerResource(null, - QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .addAllContainerIds(containerIdProtos) - .build(), - NullCallback.get()); - - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index e1e85dd..9327c59 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -21,14 +21,10 @@ package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; @@ -42,8 +38,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.function.FunctionSignature; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.master.rm.WorkerResourceManager; +import org.apache.tajo.master.rm.TajoResourceManager; import org.apache.tajo.metrics.CatalogMetricsGaugeSet; import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet; import org.apache.tajo.rpc.RpcChannelFactory; @@ -68,7 +63,6 @@ import java.io.*; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -121,7 +115,7 @@ public class TajoMaster extends CompositeService { private QueryCoordinatorService tajoMasterService; private SessionManager sessionManager; - private WorkerResourceManager resourceManager; + private TajoResourceManager resourceManager; //Web Server private StaticHttpServer webServer; private TajoRestService restServer; @@ -157,66 +151,59 @@ public class TajoMaster extends CompositeService { } @Override - public void serviceInit(Configuration _conf) throws Exception { - if (!(_conf instanceof TajoConf)) { - throw new IllegalArgumentException("_conf should be a TajoConf type."); - } - this.systemConf = (TajoConf) _conf; + public void serviceInit(Configuration conf) throws Exception { + + this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); context = new MasterContext(systemConf); clock = new SystemClock(); - try { - RackResolver.init(systemConf); + RackResolver.init(systemConf); - RpcClientManager rpcManager = RpcClientManager.getInstance(); - rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); - rpcManager.setTimeoutSeconds( - systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); + RpcClientManager rpcManager = RpcClientManager.getInstance(); + rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); + rpcManager.setTimeoutSeconds( + systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); - initResourceManager(); + initResourceManager(); - this.dispatcher = new AsyncDispatcher(); - addIfService(dispatcher); + this.dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); // check the system directory and create if they are not created. checkAndInitializeSystemDirectories(); diagnoseTajoMaster(); - catalogServer = new CatalogServer(loadFunctions()); - addIfService(catalogServer); - catalog = new LocalCatalogWrapper(catalogServer, systemConf); + catalogServer = new CatalogServer(loadFunctions()); + addIfService(catalogServer); + catalog = new LocalCatalogWrapper(catalogServer, systemConf); - sessionManager = new SessionManager(dispatcher); - addIfService(sessionManager); + sessionManager = new SessionManager(dispatcher); + addIfService(sessionManager); - globalEngine = new GlobalEngine(context); - addIfService(globalEngine); + globalEngine = new GlobalEngine(context); + addIfService(globalEngine); - queryManager = new QueryManager(context); - addIfService(queryManager); + queryManager = new QueryManager(context); + addIfService(queryManager); - tajoMasterClientService = new TajoMasterClientService(context); - addIfService(tajoMasterClientService); + tajoMasterClientService = new TajoMasterClientService(context); + addIfService(tajoMasterClientService); - tajoMasterService = new QueryCoordinatorService(context); - addIfService(tajoMasterService); - - restServer = new TajoRestService(context); - addIfService(restServer); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw e; - } + tajoMasterService = new QueryCoordinatorService(context); + addIfService(tajoMasterService); + restServer = new TajoRestService(context); + addIfService(restServer); + // Try to start up all services in TajoMaster. // If anyone is failed, the master prints out the errors and immediately should shutdowns try { super.serviceInit(systemConf); } catch (Throwable t) { t.printStackTrace(); - System.exit(1); + Runtime.getRuntime().halt(-1); } LOG.info("Tajo Master is initialized."); } @@ -235,10 +222,7 @@ public class TajoMaster extends CompositeService { } private void initResourceManager() throws Exception { - Class<WorkerResourceManager> resourceManagerClass = (Class<WorkerResourceManager>) - systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class); - Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class); - resourceManager = constructor.newInstance(context); + resourceManager = new TajoResourceManager(context); addIfService(resourceManager); } @@ -391,39 +375,19 @@ public class TajoMaster extends CompositeService { } @Override - public void stop() { - if (haService != null) { - try { - haService.delete(); - } catch (Exception e) { - LOG.error(e, e); - } - } - - if (restServer != null) { - try { - restServer.stop(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } + public void serviceStop() throws Exception { + if (haService != null) haService.delete(); - if (webServer != null) { - try { - webServer.stop(); - } catch (Exception e) { - LOG.error(e, e); - } - } + if (restServer != null) restServer.stop(); + + if (webServer != null) webServer.stop(); IOUtils.cleanup(LOG, catalogServer); - if(systemMetrics != null) { - systemMetrics.stop(); - } + if (systemMetrics != null) systemMetrics.stop(); - if(pauseMonitor != null) pauseMonitor.stop(); - super.stop(); + if (pauseMonitor != null) pauseMonitor.stop(); + super.serviceStop(); LOG.info("Tajo Master main thread exiting"); } @@ -463,7 +427,7 @@ public class TajoMaster extends CompositeService { return queryManager; } - public WorkerResourceManager getResourceManager() { + public TajoResourceManager getResourceManager() { return resourceManager; } @@ -601,7 +565,7 @@ public class TajoMaster extends CompositeService { try { TajoMaster master = new TajoMaster(); - TajoConf conf = new TajoConf(new YarnConfiguration()); + TajoConf conf = new TajoConf(); master.init(conf); master.start(); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 7dbe815..e0332d5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -47,8 +47,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; import org.apache.tajo.master.exec.NonForwardQueryResultScanner; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.master.rm.NodeStatus; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.PartitionedTableScanNode; import org.apache.tajo.plan.logical.ScanNode; @@ -95,32 +94,27 @@ public class TajoMasterClientService extends AbstractService { } @Override - public void start() { + public void serviceStart() throws Exception { // start the rpc server String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS); InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr); int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM); - try { - server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum); - } catch (Exception e) { - LOG.error(e); - throw new RuntimeException(e); - } + server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum); server.start(); bindAddress = NetUtils.getConnectAddress(server.getListenAddress()); this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress)); + super.serviceStart(); LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress); - super.start(); } @Override - public void stop() { + public void serviceStop() throws Exception { if (server != null) { server.shutdown(); } - super.stop(); + super.serviceStop(); } public InetSocketAddress getBindAddress() { @@ -590,7 +584,7 @@ public class TajoMasterClientService extends AbstractService { QueryManager queryManager = context.getQueryJobManager(); QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId); - QueryInfo queryInfo = null; + QueryInfo queryInfo; if (queryInProgress == null) { queryInfo = context.getQueryJobManager().getFinishedQuery(queryId); } else { @@ -610,9 +604,6 @@ public class TajoMasterClientService extends AbstractService { return builder.build(); } - /** - * It is invoked by TajoContainerProxy. - */ @Override public BoolProto killQuery(RpcController controller, QueryIdRequest request) throws ServiceException { try { @@ -642,31 +633,20 @@ public class TajoMasterClientService extends AbstractService { context.getSessionManager().touch(request.getSessionId().getId()); GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder(); - Map<Integer, Worker> workers = context.getResourceManager().getWorkers(); - - List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet()); - Collections.sort(wokerKeys); - - WorkerResourceInfo.Builder workerBuilder - = WorkerResourceInfo.newBuilder(); - - for(Worker worker: workers.values()) { - WorkerResource workerResource = worker.getResource(); - - workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto()); - workerBuilder.setDiskSlots(workerResource.getDiskSlots()); - workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots()); - workerBuilder.setMemoryMB(workerResource.getMemoryMB()); - workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime()); - workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB()); - workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots()); - workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots()); - workerBuilder.setWorkerStatus(worker.getState().toString()); - workerBuilder.setMaxHeap(workerResource.getMaxHeap()); - workerBuilder.setFreeHeap(workerResource.getFreeHeap()); - workerBuilder.setTotalHeap(workerResource.getTotalHeap()); - workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks()); - workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks()); + List<NodeStatus> nodeStatusList = new ArrayList<NodeStatus>(context.getResourceManager().getRMContext().getNodes().values()); + Collections.sort(nodeStatusList); + + WorkerResourceInfo.Builder workerBuilder = WorkerResourceInfo.newBuilder(); + + for(NodeStatus nodeStatus : nodeStatusList) { + workerBuilder.setConnectionInfo(nodeStatus.getConnectionInfo().getProto()); + workerBuilder.setAvailableResource(nodeStatus.getAvailableResource().getProto()); + workerBuilder.setTotalResource(nodeStatus.getTotalResourceCapability().getProto()); + + workerBuilder.setLastHeartbeat(nodeStatus.getLastHeartbeatTime()); + workerBuilder.setWorkerStatus(nodeStatus.getState().toString()); + workerBuilder.setNumRunningTasks(nodeStatus.getNumRunningTasks()); + workerBuilder.setNumQueryMasterTasks(nodeStatus.getNumRunningQueryMaster()); builder.addWorkerList(workerBuilder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java deleted file mode 100644 index c1c6522..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; -import org.apache.tajo.master.container.TajoContainer; - -import java.util.Collection; - -public class TaskRunnerGroupEvent extends AbstractEvent<EventType> { - public enum EventType { - CONTAINER_REMOTE_LAUNCH, - CONTAINER_REMOTE_CLEANUP - } - - protected final ExecutionBlockId executionBlockId; - protected final Collection<TajoContainer> containers; - public TaskRunnerGroupEvent(EventType eventType, - ExecutionBlockId executionBlockId, - Collection<TajoContainer> containers) { - super(eventType); - this.executionBlockId = executionBlockId; - this.containers = containers; - } - - public Collection<TajoContainer> getContainers() { - return containers; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java deleted file mode 100644 index 9086e65..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.hadoop.yarn.event.EventHandler; - -public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> { - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java index 78d4978..0159a21 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java @@ -111,6 +111,10 @@ public class WorkerConnectionInfo implements ProtoObject<WorkerConnectionInfoPro return this.getHost() + ":" + this.getPeerRpcPort(); } + public String getHostAndQMPort() { + return this.getHost() + ":" + this.getQueryMasterPort(); + } + @Override public WorkerConnectionInfoProto getProto() { WorkerConnectionInfoProto.Builder builder = WorkerConnectionInfoProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java deleted file mode 100644 index 77562b5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.container; - - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.util.Records; - -/** - * This class is borrowed from the following source code : - * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/Container.java - * - * <p><code>TajoContainer</code> represents an allocated resource in the cluster. - * </p> - * - * <p>The <code>ResourceManager</code> is the sole authority to allocate any - * <code>TajoContainer</code> to applications. The allocated <code>TajoContainer</code> - * is always on a single node and has a unique {@link org.apache.tajo.master.container.TajoContainerId}. It has - * a specific amount of {@link org.apache.hadoop.yarn.api.records.Resource} allocated.</p> - * - * <p>It includes details such as: - * <ul> - * <li>{@link org.apache.tajo.master.container.TajoContainerId} for the container, which is globally unique.</li> - * <li> - * {@link org.apache.hadoop.yarn.api.records.NodeId} of the node on which it is allocated. - * </li> - * <li>HTTP uri of the node.</li> - * <li>{@link org.apache.hadoop.yarn.api.records.Resource} allocated to the container.</li> - * <li>{@link org.apache.hadoop.yarn.api.records.Priority} at which the container was allocated.</li> - * <li> - * TajoContainer {@link org.apache.hadoop.yarn.api.records.Token} of the container, used to securely verify - * authenticity of the allocation. - * </li> - * </ul> - * </p> - * - * <p>Typically, an <code>ApplicationMaster</code> receives the - * <code>TajoContainer</code> from the <code>ResourceManager</code> during - * resource-negotiation and then talks to the <code>NodeManager</code> to - * start/stop containers.</p> - * - * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) - * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) - * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#stopContainers(org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest) - */ -@Public -@Stable -public abstract class TajoContainer implements Comparable<TajoContainer> { - - @Private - @Unstable - public static TajoContainer newInstance(TajoContainerId containerId, NodeId nodeId, - String nodeHttpAddress, Resource resource, Priority priority, - Token containerToken) { - TajoContainer container = Records.newRecord(TajoContainer.class); - container.setId(containerId); - container.setNodeId(nodeId); - container.setNodeHttpAddress(nodeHttpAddress); - container.setResource(resource); - container.setPriority(priority); - container.setContainerToken(containerToken); - return container; - } - - /** - * Get the globally unique identifier for the container. - * @return globally unique identifier for the container - */ - @Public - @Stable - public abstract TajoContainerId getId(); - - @Private - @Unstable - public abstract void setId(TajoContainerId id); - - /** - * Get the identifier of the node on which the container is allocated. - * @return identifier of the node on which the container is allocated - */ - @Public - @Stable - public abstract NodeId getNodeId(); - - @Private - @Unstable - public abstract void setNodeId(NodeId nodeId); - - /** - * Get the http uri of the node on which the container is allocated. - * @return http uri of the node on which the container is allocated - */ - @Public - @Stable - public abstract String getNodeHttpAddress(); - - @Private - @Unstable - public abstract void setNodeHttpAddress(String nodeHttpAddress); - - /** - * Get the <code>Resource</code> allocated to the container. - * @return <code>Resource</code> allocated to the container - */ - @Public - @Stable - public abstract Resource getResource(); - - @Private - @Unstable - public abstract void setResource(Resource resource); - - /** - * Get the <code>Priority</code> at which the <code>TajoContainer</code> was - * allocated. - * @return <code>Priority</code> at which the <code>TajoContainer</code> was - * allocated - */ - @Public - @Stable - public abstract Priority getPriority(); - - @Private - @Unstable - public abstract void setPriority(Priority priority); - - /** - * Get the <code>TajoContainerToken</code> for the container. - * <p><code>TajoContainerToken</code> is the security token used by the framework - * to verify authenticity of any <code>TajoContainer</code>.</p> - * - * <p>The <code>ResourceManager</code>, on container allocation provides a - * secure token which is verified by the <code>NodeManager</code> on - * container launch.</p> - * - * <p>Applications do not need to care about <code>TajoContainerToken</code>, they - * are transparently handled by the framework - the allocated - * <code>TajoContainer</code> includes the <code>TajoContainerToken</code>.</p> - * - * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) - * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) - * - * @return <code>TajoContainerToken</code> for the container - */ - @Public - @Stable - public abstract Token getContainerToken(); - - @Private - @Unstable - public abstract void setContainerToken(Token containerToken); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java deleted file mode 100644 index 7bc27c6..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.container; - -import java.text.NumberFormat; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; - -/** - * This class is borrowed from the following source code : - * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java - * - * <p><code>TajoContainerId</code> represents a globally unique identifier - * for a {@link org.apache.tajo.master.container.TajoContainer} in the cluster.</p> - */ -@Public -@Stable -public abstract class TajoContainerId implements Comparable<TajoContainerId>{ - - @Private - @Unstable - public static TajoContainerId newInstance(ApplicationAttemptId appAttemptId, - int containerId) { - TajoContainerId id = new TajoContainerIdPBImpl(); - id.setId(containerId); - id.setApplicationAttemptId(appAttemptId); - id.build(); - return id; - } - - /** - * Get the <code>ApplicationAttemptId</code> of the application to which the - * <code>Container</code> was assigned. - * <p> - * Note: If containers are kept alive across application attempts via - * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)} - * the <code>TajoContainerId</code> does not necessarily contain the current - * running application attempt's <code>ApplicationAttemptId</code> This - * container can be allocated by previously exited application attempt and - * managed by the current running attempt thus have the previous application - * attempt's <code>ApplicationAttemptId</code>. - * </p> - * - * @return <code>ApplicationAttemptId</code> of the application to which the - * <code>Container</code> was assigned - */ - @Public - @Stable - public abstract ApplicationAttemptId getApplicationAttemptId(); - - @Private - @Unstable - protected abstract void setApplicationAttemptId(ApplicationAttemptId atId); - - /** - * Get the identifier of the <code>TajoContainerId</code>. - * @return identifier of the <code>TajoContainerId</code> - */ - @Public - @Stable - public abstract int getId(); - - @Private - @Unstable - protected abstract void setId(int id); - - - // TODO: fail the app submission if attempts are more than 10 or something - private static final ThreadLocal<NumberFormat> appAttemptIdFormat = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(2); - return fmt; - } - }; - // TODO: Why thread local? - // ^ NumberFormat instances are not threadsafe - private static final ThreadLocal<NumberFormat> containerIdFormat = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(6); - return fmt; - } - }; - - @Override - public int hashCode() { - // Generated by eclipse. - final int prime = 435569; - int result = 7507; - result = prime * result + getId(); - result = prime * result + getApplicationAttemptId().hashCode(); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TajoContainerId other = (TajoContainerId) obj; - if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId())) - return false; - if (this.getId() != other.getId()) - return false; - return true; - } - - @Override - public int compareTo(TajoContainerId other) { - if (this.getApplicationAttemptId().compareTo( - other.getApplicationAttemptId()) == 0) { - return this.getId() - other.getId(); - } else { - return this.getApplicationAttemptId().compareTo( - other.getApplicationAttemptId()); - } - - } - - @Override - public String toString() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(4); - - StringBuilder sb = new StringBuilder(); - sb.append("container_"); - ApplicationId appId = getApplicationAttemptId().getApplicationId(); - sb.append(appId.getClusterTimestamp()).append("_"); - sb.append(fmt.format(appId.getId())) - .append("_"); - sb.append( - appAttemptIdFormat.get().format( - getApplicationAttemptId().getAttemptId())).append("_"); - sb.append(containerIdFormat.get().format(getId())); - return sb.toString(); - } - - protected abstract void build(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java deleted file mode 100644 index cf9e012..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.container; - - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; - -import com.google.common.base.Preconditions; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.master.container.TajoContainerId; - -/** - * This class is borrowed from the following source code : - * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java - * - */ -@Private -@Unstable -public class TajoContainerIdPBImpl extends TajoContainerId { - ContainerProtocol.TajoContainerIdProto proto = null; - ContainerProtocol.TajoContainerIdProto.Builder builder = null; - private ApplicationAttemptId applicationAttemptId = null; - - public TajoContainerIdPBImpl() { - builder = ContainerProtocol.TajoContainerIdProto.newBuilder(); - } - - public TajoContainerIdPBImpl(ContainerProtocol.TajoContainerIdProto proto) { - this.proto = proto; - this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId()); - } - - public ContainerProtocol.TajoContainerIdProto getProto() { - return proto; - } - - @Override - public int getId() { - Preconditions.checkNotNull(proto); - return proto.getId(); - } - - @Override - protected void setId(int id) { - Preconditions.checkNotNull(builder); - builder.setId((id)); - } - - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return this.applicationAttemptId; - } - - @Override - protected void setApplicationAttemptId(ApplicationAttemptId atId) { - if (atId != null) { - Preconditions.checkNotNull(builder); - builder.setAppAttemptId(convertToProtoFormat(atId)); - } - this.applicationAttemptId = atId; - } - - private ApplicationAttemptIdPBImpl convertFromProtoFormat( - ApplicationAttemptIdProto p) { - return new ApplicationAttemptIdPBImpl(p); - } - - private ApplicationAttemptIdProto convertToProtoFormat( - ApplicationAttemptId t) { - return ((ApplicationAttemptIdPBImpl)t).getProto(); - } - - @Override - protected void build() { - proto = builder.build(); - builder = null; - } -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java deleted file mode 100644 index 88c4823..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.master.container; - - -import static org.apache.hadoop.yarn.util.StringHelper._split; - -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; - - -/** - * This class is borrowed from the following source code : - * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java - * - * This class contains a set of utilities which help converting data structures - * from/to 'serializableFormat' to/from hadoop/nativejava data structures. - * - */ -@Private -public class TajoConverterUtils { - - public static final String CONTAINER_PREFIX = "container"; - - private static ApplicationAttemptId toApplicationAttemptId( - Iterator<String> it) throws NumberFormatException { - ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()), - Integer.parseInt(it.next())); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next())); - return appAttemptId; - } - - public static String toString(TajoContainerId cId) { - return cId == null ? null : cId.toString(); - } - - public static TajoContainerId toTajoContainerId(String containerIdStr) { - Iterator<String> it = _split(containerIdStr).iterator(); - if (!it.next().equals(CONTAINER_PREFIX)) { - throw new IllegalArgumentException("Invalid TajoContainerId prefix: " - + containerIdStr); - } - try { - ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); - TajoContainerId containerId = - TajoContainerId.newInstance(appAttemptID, Integer.parseInt(it.next())); - return containerId; - } catch (NumberFormatException n) { - throw new IllegalArgumentException("Invalid TajoContainerId: " - + containerIdStr, n); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java deleted file mode 100644 index c3a9a59..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; - -public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEventType> { - - private final ExecutionBlockId executionBlockId; - private final Priority priority; - private final Resource resource; - private final boolean isLeafQuery; - private final int requiredNum; - private final float progress; - - public ContainerAllocationEvent(ContainerAllocatorEventType eventType, - ExecutionBlockId executionBlockId, - Priority priority, - Resource resource, - int requiredNum, - boolean isLeafQuery, float progress) { - super(eventType); - this.executionBlockId = executionBlockId; - this.priority = priority; - this.resource = resource; - this.requiredNum = requiredNum; - this.isLeafQuery = isLeafQuery; - this.progress = progress; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } - - public Priority getPriority() { - return priority; - } - - public int getRequiredNum() { - return requiredNum; - } - - public boolean isLeafQuery() { - return isLeafQuery; - } - - public Resource getCapability() { - return resource; - } - - public float getProgress() { - return progress; - } - - public Resource getResource() { - return resource; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java deleted file mode 100644 index 183aeb5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -public enum ContainerAllocatorEventType { - // producer: TaskAttempt, consumer: ContainerAllocator - CONTAINER_REQ, - CONTAINER_DEALLOCATE, - CONTAINER_FAILED -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java deleted file mode 100644 index 723ac1a..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.master.event.ContainerEvent.EventType; - -public class ContainerEvent extends AbstractEvent<EventType> { - public enum EventType { - CONTAINER_LAUNCHED, - CONTAINER_STOPPED - } - - private final ContainerId cId; - - public ContainerEvent(EventType eventType, ContainerId cId) { - super(eventType); - this.cId = cId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java deleted file mode 100644 index c34b174..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tajo.ExecutionBlockId; - -import java.util.Map; - -public class GrouppedContainerAllocatorEvent - extends ContainerAllocationEvent { - private final Map<String, Integer> requestMap; - - public GrouppedContainerAllocatorEvent(ContainerAllocatorEventType eventType, - ExecutionBlockId executionBlockId, - Priority priority, - Resource resource, - Map<String, Integer> requestMap, - boolean isLeafQuery, float progress) { - super(eventType, executionBlockId, priority, - resource, requestMap.size(), isLeafQuery, progress); - this.requestMap = requestMap; - } - - public Map<String, Integer> getRequestMap() { - return this.requestMap; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java index 5cf9887..5a36ba9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java @@ -20,27 +20,26 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.master.container.TajoContainerId; /** * This event is sent to a running TaskAttempt on a worker. */ public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> { private final TaskAttemptId taskAttemptId; - private final TajoContainerId containerId; + private final int workerId; - public LocalTaskEvent(TaskAttemptId taskAttemptId, TajoContainerId containerId, + public LocalTaskEvent(TaskAttemptId taskAttemptId, int workerId, LocalTaskEventType eventType) { super(eventType); this.taskAttemptId = taskAttemptId; - this.containerId = containerId; + this.workerId = workerId; } public TaskAttemptId getTaskAttemptId() { return taskAttemptId; } - public TajoContainerId getContainerId() { - return containerId; + public int getWorkerId() { + return workerId; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java index 3a387fa..9ce7f09 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java @@ -21,6 +21,7 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.QueryId; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.resource.NodeResource; import org.apache.tajo.session.Session; /** @@ -36,15 +37,17 @@ public class QueryStartEvent extends AbstractEvent { private final QueryContext queryContext; private final String jsonExpr; private final String logicalPlanJson; + private final NodeResource allocation; public QueryStartEvent(QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, - String logicalPlanJson) { + String logicalPlanJson, NodeResource allocation) { super(EventType.QUERY_START); this.queryId = queryId; this.session = session; this.queryContext = queryContext; this.jsonExpr = jsonExpr; this.logicalPlanJson = logicalPlanJson; + this.allocation = allocation; } public QueryId getQueryId() { @@ -67,6 +70,10 @@ public class QueryStartEvent extends AbstractEvent { return logicalPlanJson; } + public NodeResource getAllocation() { + return allocation; + } + @Override public String toString() { return getClass().getName() + "," + getType() + "," + queryId; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java deleted file mode 100644 index 0d29e44..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.container.TajoContainer; - -import java.util.List; - -public class StageContainerAllocationEvent extends StageEvent { - private List<TajoContainer> allocatedContainer; - - public StageContainerAllocationEvent(final ExecutionBlockId id, - List<TajoContainer> allocatedContainer) { - super(id, StageEventType.SQ_CONTAINER_ALLOCATED); - this.allocatedContainer = allocatedContainer; - } - - public List<TajoContainer> getAllocatedContainer() { - return this.allocatedContainer; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java index 763d426..d9beaa8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java @@ -26,9 +26,7 @@ public enum StageEventType { // Producer: Query SQ_INIT, SQ_START, - SQ_CONTAINER_ALLOCATED, SQ_KILL, - SQ_LAUNCH, // Producer: Task SQ_TASK_COMPLETED, http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java index 924fb59..8a3dcb0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java @@ -19,20 +19,20 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ResourceProtos.ExecutionBlockReport; /** * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage */ public class StageShuffleReportEvent extends StageEvent { - private TajoWorkerProtocol.ExecutionBlockReport report; + private ExecutionBlockReport report; - public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) { + public StageShuffleReportEvent(ExecutionBlockId executionBlockId, ExecutionBlockReport report) { super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT); this.report = report; } - public TajoWorkerProtocol.ExecutionBlockReport getReport() { + public ExecutionBlockReport getReport() { return report; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java index 1611370..08ef805 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java @@ -20,23 +20,16 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.container.TajoContainerId; public class TaskAttemptAssignedEvent extends TaskAttemptEvent { - private final TajoContainerId cId; private final WorkerConnectionInfo workerConnectionInfo; - public TaskAttemptAssignedEvent(TaskAttemptId id, TajoContainerId cId, + public TaskAttemptAssignedEvent(TaskAttemptId id, WorkerConnectionInfo connectionInfo) { super(id, TaskAttemptEventType.TA_ASSIGNED); - this.cId = cId; this.workerConnectionInfo = connectionInfo; } - public TajoContainerId getContainerId() { - return cId; - } - public WorkerConnectionInfo getWorkerConnectionInfo(){ return workerConnectionInfo; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java index e35b154..f59b50b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java @@ -33,6 +33,7 @@ public enum TaskAttemptEventType { //Producer:Scheduler TA_ASSIGNED, + TA_ASSIGN_CANCEL, TA_SCHEDULE_CANCELED, //Producer:Scheduler http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java index 8c5f016..a9af288 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto; +import org.apache.tajo.ResourceProtos.TaskStatusProto; public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { private final TaskStatusProto status; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java index 5a016fb..6799ce1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java @@ -20,9 +20,8 @@ package org.apache.tajo.master.event; import com.google.protobuf.RpcCallback; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ResourceProtos.TaskRequestProto; import org.apache.tajo.querymaster.TaskAttempt; -import org.apache.tajo.master.container.TajoContainerId; public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent { private final TaskAttemptScheduleContext context; @@ -44,30 +43,13 @@ public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent { } public static class TaskAttemptScheduleContext { - private TajoContainerId containerId; private String host; - private RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback; + private RpcCallback<TaskRequestProto> callback; public TaskAttemptScheduleContext() { } - public TaskAttemptScheduleContext(TajoContainerId containerId, - String host, - RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) { - this.containerId = containerId; - this.host = host; - this.callback = callback; - } - - public TajoContainerId getContainerId() { - return containerId; - } - - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - public String getHost() { return host; } @@ -76,11 +58,11 @@ public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent { this.host = host; } - public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() { + public RpcCallback<TaskRequestProto> getCallback() { return callback; } - public void setCallback(RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) { + public void setCallback(RpcCallback<TaskRequestProto> callback) { this.callback = callback; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java index 20204aa..66275b1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; +import org.apache.tajo.ResourceProtos.TaskCompletionReport; public class TaskCompletionEvent extends TaskAttemptEvent { private TaskCompletionReport report; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java index 03888bd..d50fcb8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport; +import org.apache.tajo.ResourceProtos.TaskFatalErrorReport; public class TaskFatalErrorEvent extends TaskAttemptEvent { private final String message; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java index 3f72ed9..495eaf2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java @@ -18,13 +18,10 @@ package org.apache.tajo.master.event; -import com.google.protobuf.RpcCallback; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; +import org.apache.tajo.ResourceProtos.AllocationResourceProto; import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType; -import org.apache.tajo.master.container.TajoContainerId; public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { @@ -32,36 +29,28 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { TASK_REQ } - private final int workerId; - private final TajoContainerId containerId; + private final AllocationResourceProto responseProto; private final ExecutionBlockId executionBlockId; - - private final RpcCallback<TaskRequestProto> callback; + private final int workerId; public TaskRequestEvent(int workerId, - TajoContainerId containerId, - ExecutionBlockId executionBlockId, - RpcCallback<TaskRequestProto> callback) { + AllocationResourceProto responseProto, + ExecutionBlockId executionBlockId) { super(TaskRequestEventType.TASK_REQ); this.workerId = workerId; - this.containerId = containerId; + this.responseProto = responseProto; this.executionBlockId = executionBlockId; - this.callback = callback; - } - - public int getWorkerId() { - return this.workerId; - } - - public TajoContainerId getContainerId() { - return this.containerId; } public ExecutionBlockId getExecutionBlockId() { return executionBlockId; } - public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() { - return this.callback; + public AllocationResourceProto getResponseProto() { + return responseProto; + } + + public int getWorkerId() { + return workerId; } }
