http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index 299952e..5e1ccc1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -25,44 +25,44 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TajoProtos; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.resource.NodeResource; -import org.apache.tajo.resource.NodeResources; -import org.apache.tajo.worker.event.*; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.apache.tajo.worker.event.NodeResourceEvent; +import org.apache.tajo.worker.event.TaskStartEvent; +import org.apache.tajo.ResourceProtos.TaskRequestProto; import java.io.IOException; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * TaskExecutor uses a number of threads equal to the number of slots available for running tasks on the Worker */ -public class TaskExecutor extends AbstractService implements EventHandler<TaskExecutorEvent> { +public class TaskExecutor extends AbstractService implements EventHandler<TaskStartEvent> { private static final Log LOG = LogFactory.getLog(TaskExecutor.class); - private final TaskManager taskManager; - private final EventHandler rmEventHandler; + private final TajoWorker.WorkerContext workerContext; private final Map<TaskAttemptId, NodeResource> allocatedResourceMap; private final BlockingQueue<Task> taskQueue; private final AtomicInteger runningTasks; - private ThreadPoolExecutor fetcherExecutor; + private ExecutorService fetcherThreadPool; private ExecutorService threadPool; private TajoConf tajoConf; private volatile boolean isStopped; - public TaskExecutor(TaskManager taskManager, EventHandler rmEventHandler) { + public TaskExecutor(TajoWorker.WorkerContext workerContext) { super(TaskExecutor.class.getName()); - this.taskManager = taskManager; - this.rmEventHandler = rmEventHandler; + this.workerContext = workerContext; this.allocatedResourceMap = Maps.newConcurrentMap(); this.runningTasks = new AtomicInteger(); this.taskQueue = new LinkedBlockingQueue<Task>(); @@ -70,12 +70,8 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx @Override protected void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - this.tajoConf = (TajoConf) conf; - this.taskManager.getDispatcher().register(TaskExecutorEvent.EventType.class, this); + this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); super.serviceInit(conf); } @@ -85,12 +81,9 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx this.threadPool = Executors.newFixedThreadPool(nThreads, new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build()); - //TODO move to tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM); - int maxFetcherThreads = Runtime.getRuntime().availableProcessors() * 2; - this.fetcherExecutor = new ThreadPoolExecutor(Math.min(nThreads, maxFetcherThreads), - maxFetcherThreads, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(true)); + int maxFetcherThreads = tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM); + this.fetcherThreadPool = Executors.newFixedThreadPool(nThreads, + new ThreadFactoryBuilder().setNameFormat("Fetcher executor #%d").build()); for (int i = 0; i < nThreads; i++) { @@ -106,7 +99,7 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx isStopped = true; threadPool.shutdown(); - fetcherExecutor.shutdown(); + fetcherThreadPool.shutdown(); super.serviceStop(); } @@ -131,19 +124,28 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx return task; } - @SuppressWarnings("unchecked") protected void stopTask(TaskAttemptId taskId) { runningTasks.decrementAndGet(); - rmEventHandler.handle(new NodeResourceDeallocateEvent(allocatedResourceMap.remove(taskId))); + releaseResource(taskId); + } + + @SuppressWarnings("unchecked") + protected void releaseResource(TaskAttemptId taskId) { + NodeResource resource = allocatedResourceMap.remove(taskId); + + if(resource != null) { + workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle( + new NodeResourceDeallocateEvent(resource, NodeResourceEvent.ResourceType.TASK)); + } } protected ExecutorService getFetcherExecutor() { - return fetcherExecutor; + return fetcherThreadPool; } protected Task createTask(ExecutionBlockContext executionBlockContext, - TajoWorkerProtocol.TaskRequestProto taskRequest) throws IOException { + TaskRequestProto taskRequest) throws IOException { Task task = null; TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); if (executionBlockContext.getTasks().containsKey(taskAttemptId)) { @@ -158,37 +160,34 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx } @Override - public void handle(TaskExecutorEvent event) { - - if (event instanceof TaskStartEvent) { - TaskStartEvent startEvent = (TaskStartEvent) event; - allocatedResourceMap.put(startEvent.getTaskId(), startEvent.getAllocatedResource()); - - ExecutionBlockContext context = taskManager.getExecutionBlockContext( - startEvent.getTaskId().getTaskId().getExecutionBlockId()); - - try { - Task task = createTask(context, startEvent.getTaskRequest()); - if (task != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() + - ", allocated resource: " + startEvent.getAllocatedResource()); - } - taskQueue.put(task); - runningTasks.incrementAndGet(); - context.getWorkerContext().getWorkerSystemMetrics() - .histogram("tasks", "running").update(runningTasks.get()); - } else { - LOG.warn("Release duplicate task resource: " + startEvent.getAllocatedResource()); - stopTask(startEvent.getTaskId()); - } - } catch (InterruptedException e) { - if (!isStopped) { - LOG.fatal(e.getMessage(), e); + public void handle(TaskStartEvent event) { + + allocatedResourceMap.put(event.getTaskAttemptId(), event.getAllocatedResource()); + + ExecutionBlockContext context = workerContext.getTaskManager().getExecutionBlockContext( + event.getTaskAttemptId().getTaskId().getExecutionBlockId()); + + try { + Task task = createTask(context, event.getTaskRequest()); + if (task != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() + + ", allocated resource: " + event.getAllocatedResource()); } - } catch (IOException e) { - stopTask(startEvent.getTaskId()); + taskQueue.put(task); + runningTasks.incrementAndGet(); + context.getWorkerContext().getWorkerSystemMetrics() + .histogram("tasks", "running").update(runningTasks.get()); + } else { + LOG.warn("Release duplicate task resource: " + event.getAllocatedResource()); + stopTask(event.getTaskAttemptId()); + } + } catch (InterruptedException e) { + if (!isStopped) { + LOG.fatal(e.getMessage(), e); } + } catch (Exception e) { + stopTask(event.getTaskAttemptId()); } } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java index c2432eb..52b0d0b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java @@ -29,8 +29,8 @@ import java.util.Collections; import java.util.List; import static org.apache.tajo.TajoProtos.TaskAttemptState; -import static org.apache.tajo.ipc.TajoWorkerProtocol.FetcherHistoryProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; +import static org.apache.tajo.ResourceProtos.FetcherHistoryProto; +import static org.apache.tajo.ResourceProtos.TaskHistoryProto; /** * The history class for Task processing. http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 7697458..d77c583 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -37,15 +37,17 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.*; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.serder.LogicalNodeDeserializer; @@ -65,8 +67,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import static org.apache.tajo.ResourceProtos.*; public class TaskImpl implements Task { private static final Log LOG = LogFactory.getLog(TaskImpl.class); @@ -466,6 +467,7 @@ public class TaskImpl implements Task { @Override public void cleanup() { + // history store in memory while running stage TaskHistory taskHistory = createTaskHistory(); executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory); executionBlockContext.getTasks().remove(getId()); @@ -485,6 +487,7 @@ public class TaskImpl implements Task { stopScriptExecutors(); } + @Override public TaskHistory createTaskHistory() { TaskHistory taskHistory = null; try { @@ -508,16 +511,12 @@ public class TaskImpl implements Task { int i = 0; FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); for (Fetcher fetcher : fetcherRunners) { - // TODO store the fetcher histories - if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { - builder.setStartTime(fetcher.getStartTime()); - builder.setFinishTime(fetcher.getFinishTime()); - builder.setFileLength(fetcher.getFileLen()); - builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); - builder.setState(fetcher.getState()); - - taskHistory.addFetcherHistory(builder.build()); - } + builder.setStartTime(fetcher.getStartTime()); + builder.setFinishTime(fetcher.getFinishTime()); + builder.setFileLength(fetcher.getFileLen()); + builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); + builder.setState(fetcher.getState()); + taskHistory.addFetcherHistory(builder.build()); if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; } taskHistory.setFinishedFetchCount(i); @@ -529,6 +528,10 @@ public class TaskImpl implements Task { return taskHistory; } + public List<Fetcher> getFetchers() { + return fetcherRunners; + } + public int hashCode() { return context.hashCode(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index 7990a72..f518fd3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -29,12 +29,24 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.*; import java.io.IOException; -import java.util.*; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.tajo.ResourceProtos.ExecutionBlockListProto; +import static org.apache.tajo.ResourceProtos.ExecutionBlockContextRequest; +import static org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; /** * A TaskManager is responsible for managing executionBlock resource and tasks. @@ -45,26 +57,23 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan private final TajoWorker.WorkerContext workerContext; private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap; private final Dispatcher dispatcher; - private final EventHandler rmEventHandler; + private TaskExecutor executor; - private TajoConf tajoConf; + public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){ + this(dispatcher, workerContext, null); + } - public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) { + public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor) { super(TaskManager.class.getName()); this.dispatcher = dispatcher; this.workerContext = workerContext; this.executionBlockContextMap = Maps.newHashMap(); - this.rmEventHandler = rmEventHandler; + this.executor = executor; } @Override protected void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - - this.tajoConf = (TajoConf)conf; dispatcher.register(TaskManagerEvent.EventType.class, this); super.serviceInit(conf); } @@ -87,22 +96,51 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan return workerContext; } - protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { + protected TaskExecutor getTaskExecutor() { + if (executor == null) { + executor = workerContext.getTaskExecuor(); + } + return executor; + } + + public int getRunningTasks() { + return workerContext.getTaskExecuor().getRunningTasks(); + } + + protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, + String queryMasterHostAndPort) { + + LOG.info("QueryMaster Address:" + queryMasterHostAndPort); + + AsyncRpcClient client = null; try { - ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), null, request); + InetSocketAddress address = NetUtils.createSocketAddr(queryMasterHostAndPort); + ExecutionBlockContextRequest.Builder request = ExecutionBlockContextRequest.newBuilder(); + request.setExecutionBlockId(executionBlockId.getProto()) + .setWorker(getWorkerContext().getConnectionInfo().getProto()); + + client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true); + QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<ExecutionBlockContextResponse>(); + stub.getExecutionBlockContext(callback.getController(), request.build(), callback); + + ExecutionBlockContextResponse contextProto = + callback.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client); context.init(); return context; } catch (Throwable e) { + RpcClientManager.cleanup(client); LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); } } protected void stopExecutionBlock(ExecutionBlockContext context, - TajoWorkerProtocol.ExecutionBlockListProto cleanupList) { + ExecutionBlockListProto cleanupList) { - if(context != null){ + if (context != null) { try { context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId()); context.sendShuffleReport(); @@ -127,22 +165,72 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan @Override public void handle(TaskManagerEvent event) { - LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType()); - if (event instanceof ExecutionBlockStartEvent) { + if(LOG.isDebugEnabled()) { + LOG.debug("======================== Processing " + event + " of type " + event.getType()); + } + + switch (event.getType()) { + case TASK_START: { + //receive event from NodeResourceManager + TaskStartEvent taskStartEvent = TUtil.checkTypeAndGet(event, TaskStartEvent.class); + try { + if (!executionBlockContextMap.containsKey(taskStartEvent.getExecutionBlockId())) { + ExecutionBlockContext context = createExecutionBlock(taskStartEvent.getExecutionBlockId(), + taskStartEvent.getTaskRequest().getQueryMasterHostAndPort()); - //receive event from NodeResourceManager - if(!executionBlockContextMap.containsKey(event.getExecutionBlockId())) { - ExecutionBlockContext context = createExecutionBlock(((ExecutionBlockStartEvent) event).getRequestProto()); - executionBlockContextMap.put(context.getExecutionBlockId(), context); - } else { - LOG.warn("Already initialized ExecutionBlock: " + event.getExecutionBlockId()); + executionBlockContextMap.put(context.getExecutionBlockId(), context); + LOG.info("Running ExecutionBlocks: " + executionBlockContextMap.size() + + ", running tasks:" + getRunningTasks() + ", availableResource: " + + workerContext.getNodeResourceManager().getAvailableResource()); + } + getTaskExecutor().handle(taskStartEvent); + } catch (Exception e) { + getTaskExecutor().releaseResource(taskStartEvent.getTaskAttemptId()); + getWorkerContext().getTaskManager().getDispatcher().getEventHandler() + .handle(new ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e)); + } + break; + } + case EB_STOP: { + //receive event from QueryMaster + ExecutionBlockStopEvent executionBlockStopEvent = TUtil.checkTypeAndGet(event, ExecutionBlockStopEvent.class); + workerContext.getNodeResourceManager().getDispatcher().getEventHandler() + .handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); + stopExecutionBlock(executionBlockContextMap.remove(executionBlockStopEvent.getExecutionBlockId()), + executionBlockStopEvent.getCleanupList()); + break; + } + case QUERY_STOP: { + QueryStopEvent queryStopEvent = TUtil.checkTypeAndGet(event, QueryStopEvent.class); + + //cleanup failure ExecutionBlock + for (ExecutionBlockId ebId : executionBlockContextMap.keySet()) { + if (ebId.getQueryId().equals(queryStopEvent.getQueryId())) { + try { + executionBlockContextMap.remove(ebId).stop(); + } catch (Exception e) { + LOG.fatal(e.getMessage(), e); + } + } + } + workerContext.cleanup(queryStopEvent.getQueryId().toString()); + break; } - } else if (event instanceof ExecutionBlockStopEvent) { - //receive event from QueryMaster - rmEventHandler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); - stopExecutionBlock(executionBlockContextMap.remove(event.getExecutionBlockId()), - ((ExecutionBlockStopEvent) event).getCleanupList()); + case EB_FAIL: { + ExecutionBlockErrorEvent errorEvent = TUtil.checkTypeAndGet(event, ExecutionBlockErrorEvent.class); + LOG.error(errorEvent.getError().getMessage(), errorEvent.getError()); + ExecutionBlockContext context = executionBlockContextMap.remove(errorEvent.getExecutionBlockId()); + + if (context != null) { + context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId()); + getWorkerContext().getTaskHistoryWriter().flushTaskHistories(); + context.stop(); + } + break; + } + default: + break; } } @@ -158,17 +246,14 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan return null; } - public List<TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) throws IOException { - List<TaskHistory> histories = new ArrayList<TaskHistory>(); - ExecutionBlockContext context = executionBlockContextMap.get(executionblockId); - if (context != null) { - histories.addAll(context.getTaskHistories().values()); - } - //TODO get List<TaskHistory> from HistoryReader - return histories; + public List<org.apache.tajo.util.history.TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) + throws IOException { + + return getWorkerContext().getHistoryReader().getTaskHistory(executionblockId.getQueryId().toString(), + executionblockId.toString()); } - public TaskHistory getTaskHistory(TaskId taskId) { + public TaskHistory getTaskHistory(TaskId taskId) throws IOException { TaskHistory history = null; ExecutionBlockContext context = executionBlockContextMap.get(taskId.getExecutionBlockId()); if (context != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java deleted file mode 100644 index 207b47e..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.container.TajoContainerIdPBImpl; -import org.apache.tajo.master.container.TajoConverterUtils; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NullCallback; - -import java.util.concurrent.*; - -import static org.apache.tajo.ipc.TajoWorkerProtocol.*; - -/** - * The driver class for Tajo Task processing. - */ -@Deprecated -public class TaskRunner extends AbstractService { - /** class logger */ - private static final Log LOG = LogFactory.getLog(TaskRunner.class); - - private TajoConf systemConf; - - private volatile boolean stopped = false; - private Path baseDirPath; - - private TajoContainerId containerId; - - // for Fetcher - private ExecutorService fetchLauncher; - - // A thread to receive each assigned query unit and execute the query unit - private Thread taskLauncher; - - // Contains the object references related for TaskRunner - private ExecutionBlockContext executionBlockContext; - - private long finishTime; - - private TaskRunnerHistory history; - - public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) { - super(TaskRunner.class.getName()); - - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory fetcherFactory = builder.setNameFormat("Fetcher executor #%d").build(); - this.systemConf = executionBlockContext.getConf(); - this.fetchLauncher = Executors.newFixedThreadPool( - systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory); - try { - this.containerId = TajoConverterUtils.toTajoContainerId(containerId); - this.executionBlockContext = executionBlockContext; - this.history = executionBlockContext.createTaskRunnerHistory(this); - this.history.setState(getServiceState()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - // TODO this is expensive. we should change to unique id - public String getId() { - return getId(getContext().getExecutionBlockId(), containerId); - } - - public TajoContainerId getContainerId(){ - return containerId; - } - - public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) { - return executionBlockId + "," + containerId; - } - - public TaskRunnerHistory getHistory(){ - return history; - } - - public Path getTaskBaseDir(){ - return baseDirPath; - } - - public ExecutorService getFetchLauncher() { - return fetchLauncher; - } - - @Override - public void init(Configuration conf) { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("conf should be a TajoConf Type."); - } - this.systemConf = (TajoConf)conf; - - try { - // the base dir for an output dir - baseDirPath = getContext().createBaseDir(); - LOG.info("TaskRunner basedir is created (" + baseDirPath +")"); - } catch (Throwable t) { - t.printStackTrace(); - LOG.error(t, t); - } - super.init(conf); - this.history.setState(getServiceState()); - } - - @Override - public void start() { - super.start(); - history.setStartTime(getStartTime()); - this.history.setState(getServiceState()); - run(); - } - - @Override - public void stop() { - if(isStopped()) { - return; - } - this.finishTime = System.currentTimeMillis(); - this.history.setFinishTime(finishTime); - // If this flag become true, taskLauncher will be terminated. - - LOG.info("Stop TaskRunner: " + getId()); - synchronized (this) { - this.stopped = true; - - fetchLauncher.shutdown(); - fetchLauncher = null; - - notifyAll(); - } - - super.stop(); - this.history.setState(getServiceState()); - } - - public long getFinishTime() { - return finishTime; - } - - public ExecutionBlockContext getContext() { - return executionBlockContext; - } - - static void fatalError(QueryMasterProtocolService.Interface qmClientService, - TaskAttemptId taskAttemptId, String message) { - if (message == null) { - message = "No error message"; - } - TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() - .setId(taskAttemptId.getProto()) - .setErrorMessage(message); - - qmClientService.fatalError(null, builder.build(), NullCallback.get()); - } - - public void run() { - LOG.info("TaskRunner startup"); - try { - - taskLauncher = new Thread(new Runnable() { - - @Override - public void run() { - int receivedNum = 0; - CallFuture<TaskRequestProto> callFuture = null; - TaskRequestProto taskRequest = null; - - while(!stopped && !executionBlockContext.isStopped()) { - QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub(); - - try { - if (callFuture == null) { - callFuture = new CallFuture<TaskRequestProto>(); - LOG.info("Request GetTask: " + getId()); - GetTaskRequestProto request = GetTaskRequestProto.newBuilder() - .setExecutionBlockId(getExecutionBlockId().getProto()) - .setContainerId(((TajoContainerIdPBImpl) containerId).getProto()) - .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId()) - .build(); - - qmClientService.getTask(callFuture.getController(), request, callFuture); - } - try { - // wait for an assigning task for 3 seconds - taskRequest = callFuture.get(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - if(stopped) { - break; - } - } catch (TimeoutException te) { - if(stopped) { - break; - } - // if there has been no assigning task for a given period, - // TaskRunner will retry to request an assigning task. - if (LOG.isDebugEnabled()) { - LOG.info("Retry assigning task:" + getId()); - } - continue; - } catch (ExecutionException ee) { - if(!getContext().isStopped()){ - LOG.error(ee.getMessage(), ee); - } else { - /* EB is stopped */ - break; - } - } - - if (taskRequest != null) { - // QueryMaster can send the terminal signal to TaskRunner. - // If TaskRunner receives the terminal signal, TaskRunner will be terminated - // immediately. - if (taskRequest.getShouldDie()) { - LOG.info("Received ShouldDie flag:" + getId()); - break; - } else { - getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); - LOG.info("Accumulated Received Task: " + (++receivedNum)); - - TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); - if (getContext().getTasks().containsKey(taskAttemptId)) { - LOG.error("Duplicate Task Attempt: " + taskAttemptId); - fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId); - continue; - } - - LOG.info("Initializing: " + taskAttemptId); - Task task = null; - try { - task = new LegacyTaskImpl(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, - new TaskRequestImpl(taskRequest)); - getContext().getTasks().put(taskAttemptId, task); - - task.init(); - if (task.hasFetchPhase()) { - task.fetch(); // The fetch is performed in an asynchronous way. - } - // task.run() is a blocking call. - task.run(); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - fatalError(qmClientService, taskAttemptId, t.getMessage()); - } finally { - if(task != null) { - task.cleanup(); - } - - callFuture = null; - taskRequest = null; - } - } - } - } catch (Throwable t) { - LOG.fatal(t.getMessage(), t); - } - } - stop(); - //notify to TaskRunnerManager - getContext().stopTaskRunner(getId()); - } - }); - taskLauncher.start(); - } catch (Throwable t) { - LOG.fatal("Unhandled exception. Starting shutdown.", t); - } - } - - /** - * @return true if a stop has been requested. - */ - public boolean isStopped() { - return this.stopped; - } - - public ExecutionBlockId getExecutionBlockId() { - return getContext().getExecutionBlockId(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java deleted file mode 100644 index 16d32d4..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.base.Objects; -import com.google.common.collect.Maps; -import org.apache.hadoop.service.Service; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.container.TajoConverterUtils; - -import java.util.Collections; -import java.util.Map; - -import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto; - -/** - * The history class for TaskRunner processing. - */ -@Deprecated -public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { - - private Service.STATE state; - private TajoContainerId containerId; - private long startTime; - private long finishTime; - private ExecutionBlockId executionBlockId; - private Map<TaskAttemptId, TaskHistory> taskHistoryMap = null; - - public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) { - init(); - this.containerId = containerId; - this.executionBlockId = executionBlockId; - } - - public TaskRunnerHistory(TaskRunnerHistoryProto proto) { - this.state = Service.STATE.valueOf(proto.getState()); - this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId()); - this.startTime = proto.getStartTime(); - this.finishTime = proto.getFinishTime(); - this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId()); - this.taskHistoryMap = Maps.newTreeMap(); - for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) { - TaskHistory taskHistory = new TaskHistory(taskHistoryProto); - taskHistoryMap.put(taskHistory.getTaskAttemptId(), taskHistory); - } - } - - private void init() { - this.taskHistoryMap = Maps.newHashMap(); - } - - public int size() { - return this.taskHistoryMap.size(); - } - - @Override - public int hashCode() { - return Objects.hashCode(containerId, executionBlockId, state, startTime, - finishTime, taskHistoryMap.size()); - } - - @Override - public boolean equals(Object o) { - if (o instanceof TaskRunnerHistory) { - TaskRunnerHistory other = (TaskRunnerHistory) o; - return getProto().equals(other.getProto()); - } - return false; - } - - @Override - public TaskRunnerHistoryProto getProto() { - TaskRunnerHistoryProto.Builder builder = TaskRunnerHistoryProto.newBuilder(); - builder.setContainerId(containerId.toString()); - builder.setState(state.toString()); - builder.setExecutionBlockId(executionBlockId.getProto()); - builder.setStartTime(startTime); - builder.setFinishTime(finishTime); - for (TaskHistory taskHistory : taskHistoryMap.values()){ - builder.addTaskHistories(taskHistory.getProto()); - } - return builder.build(); - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - public long getFinishTime() { - return finishTime; - } - - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } - - public Service.STATE getState() { - return state; - } - - public void setState(Service.STATE state) { - this.state = state; - } - - public TajoContainerId getContainerId() { - return containerId; - } - - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - - public TaskHistory getTaskHistory(TaskAttemptId taskAttemptId) { - return taskHistoryMap.get(taskAttemptId); - } - - public Map<TaskAttemptId, TaskHistory> getTaskHistoryMap() { - return Collections.unmodifiableMap(taskHistoryMap); - } - - public void addTaskHistory(TaskAttemptId taskAttemptId, TaskHistory taskHistory) { - taskHistoryMap.put(taskAttemptId, taskHistory); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java deleted file mode 100644 index d18a262..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ /dev/null @@ -1,238 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.worker.event.TaskRunnerEvent; -import org.apache.tajo.worker.event.TaskRunnerStartEvent; -import org.apache.tajo.worker.event.TaskRunnerStopEvent; - -import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; - -@Deprecated -public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> { - private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class); - - private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap = Maps.newConcurrentMap(); - private final ConcurrentMap<String, TaskRunner> taskRunnerMap = Maps.newConcurrentMap(); - private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap(); - private TajoWorker.WorkerContext workerContext; - private TajoConf tajoConf; - private AtomicBoolean stop = new AtomicBoolean(false); - private FinishedTaskCleanThread finishedTaskCleanThread; - private Dispatcher dispatcher; - - public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) { - super(TaskRunnerManager.class.getName()); - - this.workerContext = workerContext; - this.dispatcher = dispatcher; - } - - public TajoWorker.WorkerContext getWorkerContext() { - return workerContext; - } - - @Override - public void init(Configuration conf) { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - tajoConf = (TajoConf)conf; - dispatcher.register(TaskRunnerEvent.EventType.class, this); - super.init(tajoConf); - } - - @Override - public void start() { - finishedTaskCleanThread = new FinishedTaskCleanThread(); - finishedTaskCleanThread.start(); - super.start(); - } - - @Override - public void stop() { - if(stop.getAndSet(true)) { - return; - } - - synchronized(taskRunnerMap) { - for(TaskRunner eachTaskRunner: taskRunnerMap.values()) { - if(!eachTaskRunner.isStopped()) { - eachTaskRunner.stop(); - } - } - } - for(ExecutionBlockContext context: executionBlockContextMap.values()) { - context.stop(); - } - - if(finishedTaskCleanThread != null) { - finishedTaskCleanThread.interrupt(); - } - - super.stop(); - } - - public void stopTaskRunner(String id) { - LOG.info("Stop Task:" + id); - TaskRunner taskRunner = taskRunnerMap.remove(id); - taskRunner.stop(); - } - - public Collection<TaskRunner> getTaskRunners() { - return Collections.unmodifiableCollection(taskRunnerMap.values()); - } - - public Collection<TaskRunnerHistory> getExecutionBlockHistories() { - return Collections.unmodifiableCollection(taskRunnerHistoryMap.values()); - } - - public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) { - return taskRunnerHistoryMap.get(taskRunnerId); - } - - public TaskRunner getTaskRunner(String taskRunnerId) { - return taskRunnerMap.get(taskRunnerId); - } - - public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) { - ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId()); - if (context != null) { - return context.getTask(taskAttemptId); - } - return null; - } - - public TaskHistory getTaskHistoryByTaskAttemptId(TaskAttemptId quAttemptId) { - synchronized (taskRunnerHistoryMap) { - for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) { - TaskHistory taskHistory = history.getTaskHistory(quAttemptId); - if (taskHistory != null) return taskHistory; - } - } - - return null; - } - - public int getNumTasks() { - return taskRunnerMap.size(); - } - - @Override - public void handle(TaskRunnerEvent event) { - LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType()); - if (event instanceof TaskRunnerStartEvent) { - TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event; - ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId()); - - if(context == null){ - try { - context = new ExecutionBlockContext(getWorkerContext(), this, startEvent.getRequest()); - context.init(); - } catch (Throwable e) { - LOG.fatal(e.getMessage(), e); - throw new RuntimeException(e); - } - executionBlockContextMap.put(event.getExecutionBlockId(), context); - } - - TaskRunner taskRunner = new TaskRunner(context, startEvent.getRequest().getContainerId()); - LOG.info("Start TaskRunner:" + taskRunner.getId()); - taskRunnerMap.put(taskRunner.getId(), taskRunner); - taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory()); - - taskRunner.init(context.getConf()); - taskRunner.start(); - - } else if (event instanceof TaskRunnerStopEvent) { - ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId()); - if(executionBlockContext != null){ - try { - executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId()); - executionBlockContext.sendShuffleReport(); - workerContext.getTaskHistoryWriter().flushTaskHistories(); - } catch (Exception e) { - LOG.fatal(e.getMessage(), e); - throw new RuntimeException(e); - } finally { - executionBlockContext.stop(); - } - } - LOG.info("Stopped execution block:" + event.getExecutionBlockId()); - } - } - - public EventHandler getEventHandler(){ - return dispatcher.getEventHandler(); - } - - public TajoConf getTajoConf() { - return tajoConf; - } - - class FinishedTaskCleanThread extends Thread { - //TODO if history size is large, the historyMap should remove immediately - public void run() { - int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD); - LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); - while(!stop.get()) { - try { - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - break; - } - try { - long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l; - cleanExpiredFinishedQueryMasterTask(expireTime); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - } - - private void cleanExpiredFinishedQueryMasterTask(long expireTime) { - synchronized(taskRunnerHistoryMap) { - List<String> expiredIds = new ArrayList<String>(); - for(Map.Entry<String, TaskRunnerHistory> entry: taskRunnerHistoryMap.entrySet()) { - /* If a task runner are abnormal termination, the finished time will be zero. */ - long finishedTime = Math.max(entry.getValue().getStartTime(), entry.getValue().getFinishTime()); - if(finishedTime < expireTime) { - expiredIds.add(entry.getKey()); - } - } - - for(String eachId: expiredIds) { - taskRunnerHistoryMap.remove(eachId); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java deleted file mode 100644 index 050e2b5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; -import org.apache.tajo.ipc.TajoResourceTrackerProtocol; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.storage.DiskDeviceInfo; -import org.apache.tajo.storage.DiskMountInfo; -import org.apache.tajo.storage.DiskUtil; - -import java.io.File; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; - -/** - * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. - */ -@Deprecated -public class WorkerHeartbeatService extends AbstractService { - /** class logger */ - private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class); - - private final TajoWorker.WorkerContext context; - private TajoConf systemConf; - private RpcClientManager connectionManager; - private WorkerHeartbeatThread thread; - private static final float HDFS_DATANODE_STORAGE_SIZE; - - static { - HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize(); - } - - public WorkerHeartbeatService(TajoWorker.WorkerContext context) { - super(WorkerHeartbeatService.class.getSimpleName()); - this.context = context; - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - this.systemConf = (TajoConf) conf; - - this.connectionManager = RpcClientManager.getInstance(); - thread = new WorkerHeartbeatThread(); - super.serviceInit(conf); - } - - @Override - public void serviceStart() throws Exception { - thread.start(); - super.serviceStart(); - } - - @Override - public void serviceStop() throws Exception { - if(thread.stopped.getAndSet(true)){ - return; - } - - synchronized (thread) { - thread.notifyAll(); - } - - super.serviceStop(); - } - - class WorkerHeartbeatThread extends Thread { - private volatile AtomicBoolean stopped = new AtomicBoolean(false); - ServerStatusProto.System systemInfo; - List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList(); - float workerDiskSlots; - int workerMemoryMB; - List<DiskDeviceInfo> diskDeviceInfos; - - public WorkerHeartbeatThread() { - int workerCpuCoreNum; - - boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED); - - try { - diskDeviceInfos = DiskUtil.getDiskDeviceInfos(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - - if(dedicatedResource) { - float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO); - int totalMemory = getTotalMemoryMB(); - workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio); - workerCpuCoreNum = Runtime.getRuntime().availableProcessors(); - - if(diskDeviceInfos == null) { - workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal; - } else { - workerDiskSlots = diskDeviceInfos.size(); - } - } else { - workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); - workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); - workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); - - if (systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE > 0) { - workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE; - } - } - - systemInfo = ServerStatusProto.System.newBuilder() - .setAvailableProcessors(workerCpuCoreNum) - .setFreeMemoryMB(0) - .setMaxMemoryMB(0) - .setTotalMemoryMB(getTotalMemoryMB()) - .build(); - } - - public void run() { - LOG.info("Worker Resource Heartbeat Thread start."); - int sendDiskInfoCount = 0; - - while(!stopped.get()) { - if(sendDiskInfoCount == 0 && diskDeviceInfos != null) { - getDiskUsageInfos(); - } - ServerStatusProto.JvmHeap jvmHeap = - ServerStatusProto.JvmHeap.newBuilder() - .setMaxHeap(Runtime.getRuntime().maxMemory()) - .setFreeHeap(Runtime.getRuntime().freeMemory()) - .setTotalHeap(Runtime.getRuntime().totalMemory()) - .build(); - - ServerStatusProto serverStatus = ServerStatusProto.newBuilder() - .addAllDisk(diskInfos) - .setRunningTaskNum( - context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks()) - .setSystem(systemInfo) - .setDiskSlots(workerDiskSlots) - .setMemoryResourceMB(workerMemoryMB) - .setJvmHeap(jvmHeap) - .build(); - - NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder() - .setConnectionInfo(context.getConnectionInfo().getProto()) - .setServerStatus(serverStatus) - .build(); - - NettyClientBase rmClient = null; - try { - CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>(); - - ServiceTracker serviceTracker = context.getServiceTracker(); - rmClient = connectionManager.getClient(serviceTracker.getResourceTrackerAddress(), - TajoResourceTrackerProtocol.class, true); - TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); - resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); - - TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); - - QueryCoordinatorProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); - if(clusterResourceSummary.getNumWorkers() > 0) { - context.setNumClusterNodes(clusterResourceSummary.getNumWorkers()); - } - context.setClusterResource(clusterResourceSummary); - - } catch (InterruptedException e) { - break; - } catch (TimeoutException te) { - LOG.warn("Heartbeat response is being delayed.", te); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - - try { - if(!stopped.get()){ - synchronized (thread){ - thread.wait(10 * 1000); - } - } - } catch (InterruptedException e) { - break; - } - sendDiskInfoCount++; - - if(sendDiskInfoCount > 10) { - sendDiskInfoCount = 0; - } - } - - LOG.info("Worker Resource Heartbeat Thread stopped."); - } - - private void getDiskUsageInfos() { - diskInfos.clear(); - for(DiskDeviceInfo eachDevice: diskDeviceInfos) { - List<DiskMountInfo> mountInfos = eachDevice.getMountInfos(); - if(mountInfos != null) { - for(DiskMountInfo eachMount: mountInfos) { - File eachFile = new File(eachMount.getMountPath()); - diskInfos.add(ServerStatusProto.Disk.newBuilder() - .setAbsolutePath(eachFile.getAbsolutePath()) - .setTotalSpace(eachFile.getTotalSpace()) - .setFreeSpace(eachFile.getFreeSpace()) - .setUsableSpace(eachFile.getUsableSpace()) - .build()); - } - } - } - } - } - - public static int getTotalMemoryMB() { - javax.management.MBeanServer mBeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer(); - long max = 0; - Object maxObject = null; - try { - javax.management.ObjectName osName = new javax.management.ObjectName("java.lang:type=OperatingSystem"); - if (!System.getProperty("java.vendor").startsWith("IBM")) { - maxObject = mBeanServer.getAttribute(osName, "TotalPhysicalMemorySize"); - } else { - maxObject = mBeanServer.getAttribute(osName, "TotalPhysicalMemory"); - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - } - if (maxObject != null) { - max = ((Long)maxObject).longValue(); - } - return ((int) (max / (1024 * 1024))); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java new file mode 100644 index 0000000..dfc54ab --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.ExecutionBlockId; + +public class ExecutionBlockErrorEvent extends TaskManagerEvent { + + private ExecutionBlockId executionBlockId; + private Throwable error; + + public ExecutionBlockErrorEvent(ExecutionBlockId executionBlockId, Throwable e) { + super(EventType.EB_FAIL); + this.executionBlockId = executionBlockId; + this.error = e; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } + + public Throwable getError() { + return error; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java deleted file mode 100644 index 85d74e2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol; - -public class ExecutionBlockStartEvent extends TaskManagerEvent { - private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto; - - public ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto) { - super(EventType.EB_START, new ExecutionBlockId(requestProto.getExecutionBlockId())); - this.requestProto = requestProto; - } - - public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() { - return requestProto; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java index 2b967ab..a1dfe50 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java @@ -20,18 +20,24 @@ package org.apache.tajo.worker.event; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ResourceProtos.ExecutionBlockListProto; public class ExecutionBlockStopEvent extends TaskManagerEvent { - private TajoWorkerProtocol.ExecutionBlockListProto cleanupList; + private ExecutionBlockListProto cleanupList; + private ExecutionBlockId executionBlockId; public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto executionBlockId, - TajoWorkerProtocol.ExecutionBlockListProto cleanupList) { - super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId)); + ExecutionBlockListProto cleanupList) { + super(EventType.EB_STOP); this.cleanupList = cleanupList; + this.executionBlockId = new ExecutionBlockId(executionBlockId); } - public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() { + public ExecutionBlockListProto getCleanupList() { return cleanupList; } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java index 9a1c106..0ee0836 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java @@ -21,26 +21,26 @@ package org.apache.tajo.worker.event; import com.google.protobuf.RpcCallback; -import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto; +import static org.apache.tajo.ResourceProtos.BatchAllocationRequest; +import static org.apache.tajo.ResourceProtos.BatchAllocationResponse; public class NodeResourceAllocateEvent extends NodeResourceEvent { - private BatchAllocationRequestProto request; - private RpcCallback<BatchAllocationResponseProto> callback; + private BatchAllocationRequest request; + private RpcCallback<BatchAllocationResponse> callback; - public NodeResourceAllocateEvent(BatchAllocationRequestProto request, - RpcCallback<BatchAllocationResponseProto> callback) { - super(EventType.ALLOCATE); + public NodeResourceAllocateEvent(BatchAllocationRequest request, + RpcCallback<BatchAllocationResponse> callback) { + super(EventType.ALLOCATE, ResourceType.TASK); this.callback = callback; this.request = request; } - public BatchAllocationRequestProto getRequest() { + public BatchAllocationRequest getRequest() { return request; } - public RpcCallback<BatchAllocationResponseProto> getCallback() { + public RpcCallback<BatchAllocationResponse> getCallback() { return callback; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java index 31d9229..d8841a2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java @@ -25,12 +25,12 @@ public class NodeResourceDeallocateEvent extends NodeResourceEvent { private NodeResource resource; - public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { - this(new NodeResource(proto)); + public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto, ResourceType resourceType) { + this(new NodeResource(proto), resourceType); } - public NodeResourceDeallocateEvent(NodeResource resource) { - super(EventType.DEALLOCATE); + public NodeResourceDeallocateEvent(NodeResource resource, ResourceType resourceType) { + super(EventType.DEALLOCATE, resourceType); this.resource = resource; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java index 6fd2e0d..c12551f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java @@ -29,7 +29,19 @@ public class NodeResourceEvent extends AbstractEvent<NodeResourceEvent.EventType DEALLOCATE } - public NodeResourceEvent(EventType eventType) { + public enum ResourceType { + QUERY_MASTER, + TASK + } + + private ResourceType resourceType; + + public NodeResourceEvent(EventType eventType, ResourceType resourceType) { super(eventType); + this.resourceType = resourceType; + } + + public ResourceType getResourceType() { + return resourceType; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java new file mode 100644 index 0000000..4422d4d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + + +import com.google.protobuf.RpcCallback; +import org.apache.tajo.ResourceProtos.AllocationResourceProto; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; + +public class QMResourceAllocateEvent extends NodeResourceEvent { + + private AllocationResourceProto request; + private RpcCallback<PrimitiveProtos.BoolProto> callback; + + public QMResourceAllocateEvent(AllocationResourceProto request, + RpcCallback<PrimitiveProtos.BoolProto> callback) { + super(EventType.ALLOCATE, ResourceType.QUERY_MASTER); + this.callback = callback; + this.request = request; + } + + public AllocationResourceProto getRequest() { + return request; + } + + public RpcCallback<PrimitiveProtos.BoolProto> getCallback() { + return callback; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java new file mode 100644 index 0000000..892db92 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.QueryId; + +public class QueryStopEvent extends TaskManagerEvent { + + + private QueryId queryId; + public QueryStopEvent(QueryId queryId) { + super(EventType.QUERY_STOP); + this.queryId = queryId; + } + + public QueryId getQueryId() { + return queryId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java deleted file mode 100644 index c609c67..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.TaskAttemptId; - -public class TaskExecutorEvent extends AbstractEvent<TaskExecutorEvent.EventType> { - - // producer: NodeResourceManager, consumer: TaskExecutorEvent - public enum EventType { - START, - KILL, - ABORT - } - - private TaskAttemptId taskAttemptId; - - public TaskExecutorEvent(EventType eventType, - TaskAttemptId taskAttemptId) { - super(eventType); - this.taskAttemptId = taskAttemptId; - } - - public TaskAttemptId getTaskId() { - return taskAttemptId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java index 39b541b..7225e70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java @@ -19,25 +19,23 @@ package org.apache.tajo.worker.event; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; public class TaskManagerEvent extends AbstractEvent<TaskManagerEvent.EventType> { // producer: NodeResourceManager, consumer: TaskManager public enum EventType { - EB_START, - EB_STOP + TASK_START, + TASK_KILL, + TASK_ABORT, + + //cleanup events + EB_STOP, + EB_FAIL, + QUERY_STOP } - private ExecutionBlockId executionBlockId; - public TaskManagerEvent(EventType eventType, - ExecutionBlockId executionBlockId) { - super(eventType); - this.executionBlockId = executionBlockId; - } - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; + public TaskManagerEvent(EventType eventType) { + super(eventType); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java deleted file mode 100644 index 7175251..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; - -@Deprecated -public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> { - public enum EventType { - START, - STOP - } - - protected final ExecutionBlockId executionBlockId; - - public TaskRunnerEvent(EventType eventType, - ExecutionBlockId executionBlockId) { - super(eventType); - this.executionBlockId = executionBlockId; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java deleted file mode 100644 index 9406794..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.plan.serder.PlanProto; -@Deprecated -public class TaskRunnerStartEvent extends TaskRunnerEvent { - - private final TajoWorkerProtocol.RunExecutionBlockRequestProto request; - - public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { - super(EventType.START, new ExecutionBlockId(request.getExecutionBlockId())); - this.request = request; - } - - public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() { - return request; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java deleted file mode 100644 index 297f30c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.tajo.ExecutionBlockId; - -@Deprecated -public class TaskRunnerStopEvent extends TaskRunnerEvent { - - public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) { - super(EventType.STOP, executionBlockId); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java index f60e7c4..1fb0c49 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java @@ -18,20 +18,24 @@ package org.apache.tajo.worker.event; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.resource.NodeResource; -import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; -public class TaskStartEvent extends TaskExecutorEvent { +import static org.apache.tajo.ResourceProtos.TaskRequestProto; + +public class TaskStartEvent extends TaskManagerEvent { private NodeResource allocatedResource; private TaskRequestProto taskRequest; + private TaskAttemptId taskAttemptId; public TaskStartEvent(TaskRequestProto taskRequest, NodeResource allocatedResource) { - super(EventType.START, new TaskAttemptId(taskRequest.getId())); + super(EventType.TASK_START); this.taskRequest = taskRequest; this.allocatedResource = allocatedResource; + this.taskAttemptId = new TaskAttemptId(taskRequest.getId()); } public NodeResource getAllocatedResource() { @@ -41,4 +45,12 @@ public class TaskStartEvent extends TaskExecutorEvent { public TaskRequestProto getTaskRequest() { return taskRequest; } + + public TaskAttemptId getTaskAttemptId() { + return taskAttemptId; + } + + public ExecutionBlockId getExecutionBlockId() { + return taskAttemptId.getTaskId().getExecutionBlockId(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java index 349fa4c..19d8b28 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java @@ -35,7 +35,7 @@ import javax.ws.rs.core.UriInfo; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.NodeStatus; import org.apache.tajo.ws.rs.JerseyResourceDelegate; import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; @@ -99,11 +99,11 @@ public class ClusterResource { JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); MasterContext masterContext = context.get(masterContextKey); - Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers(); + Map<Integer, NodeStatus> workerMap = masterContext.getResourceManager().getNodes(); List<WorkerResponse> workerList = new ArrayList<WorkerResponse>(); - for (Worker worker: workerMap.values()) { - workerList.add(new WorkerResponse(worker)); + for (NodeStatus nodeStatus : workerMap.values()) { + workerList.add(new WorkerResponse(nodeStatus)); } Map<String, List<WorkerResponse>> workerResponseMap = new HashMap<String, List<WorkerResponse>>();
