http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java deleted file mode 100644 index 6e0d9fd..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import com.google.protobuf.RpcCallback; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; -import org.apache.tajo.master.querymaster.QueryUnitAttempt; -import org.apache.tajo.master.container.TajoContainerId; - -public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent { - private final QueryUnitAttemptScheduleContext context; - private final QueryUnitAttempt queryUnitAttempt; - - public QueryUnitAttemptScheduleEvent(EventType eventType, ExecutionBlockId executionBlockId, - QueryUnitAttemptScheduleContext context, QueryUnitAttempt queryUnitAttempt) { - super(eventType, executionBlockId); - this.context = context; - this.queryUnitAttempt = queryUnitAttempt; - } - - public QueryUnitAttempt getQueryUnitAttempt() { - return queryUnitAttempt; - } - - public QueryUnitAttemptScheduleContext getContext() { - return context; - } - - public static class QueryUnitAttemptScheduleContext { - private TajoContainerId containerId; - private String host; - private RpcCallback<QueryUnitRequestProto> callback; - - public QueryUnitAttemptScheduleContext() { - - } - - public QueryUnitAttemptScheduleContext(TajoContainerId containerId, - String host, - RpcCallback<QueryUnitRequestProto> callback) { - this.containerId = containerId; - this.host = host; - this.callback = callback; - } - - public TajoContainerId getContainerId() { - return containerId; - } - - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public RpcCallback<QueryUnitRequestProto> getCallback() { - return callback; - } - - public void setCallback(RpcCallback<QueryUnitRequestProto> callback) { - this.callback = callback; - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java index 8003ef3..79b6e2e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java @@ -30,7 +30,7 @@ public enum SubQueryEventType { SQ_KILL, SQ_LAUNCH, - // Producer: QueryUnit + // Producer: Task SQ_TASK_COMPLETED, SQ_FAILED, http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java index 0502534..816bc48 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java @@ -18,22 +18,22 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskId; import org.apache.tajo.master.TaskState; /** * Event Class: From Task to SubQuery */ public class SubQueryTaskEvent extends SubQueryEvent { - private QueryUnitId taskId; + private TaskId taskId; private TaskState state; - public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) { + public SubQueryTaskEvent(TaskId taskId, TaskState state) { super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED); this.taskId = taskId; this.state = state; } - public QueryUnitId getTaskId() { + public TaskId getTaskId() { return this.taskId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java index 3b9edcb..1611370 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java @@ -18,7 +18,7 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.container.TajoContainerId; @@ -26,7 +26,7 @@ public class TaskAttemptAssignedEvent extends TaskAttemptEvent { private final TajoContainerId cId; private final WorkerConnectionInfo workerConnectionInfo; - public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId, + public TaskAttemptAssignedEvent(TaskAttemptId id, TajoContainerId cId, WorkerConnectionInfo connectionInfo) { super(id, TaskAttemptEventType.TA_ASSIGNED); this.cId = cId; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java index f2df144..1b84de0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java @@ -19,18 +19,18 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> { - private final QueryUnitAttemptId id; + private final TaskAttemptId id; - public TaskAttemptEvent(QueryUnitAttemptId id, + public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType taskAttemptEventType) { super(taskAttemptEventType); this.id = id; } - public QueryUnitAttemptId getTaskAttemptId() { + public TaskAttemptId getTaskAttemptId() { return this.id; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java index 8f153af..3274ef7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java @@ -19,13 +19,13 @@ package org.apache.tajo.master.event; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; public class TaskAttemptScheduleEvent extends TaskAttemptEvent { private Configuration conf; public TaskAttemptScheduleEvent(final Configuration conf, - final QueryUnitAttemptId id, + final TaskAttemptId id, final TaskAttemptEventType taskAttemptEventType) { super(id, taskAttemptEventType); this.conf = conf; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java index d980e05..8c5f016 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java @@ -18,13 +18,13 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto; public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { private final TaskStatusProto status; - public TaskAttemptStatusUpdateEvent(final QueryUnitAttemptId id, + public TaskAttemptStatusUpdateEvent(final TaskAttemptId id, TaskStatusProto status) { super(id, TaskAttemptEventType.TA_UPDATE); this.status = status; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java new file mode 100644 index 0000000..91ef942 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import com.google.protobuf.RpcCallback; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.querymaster.TaskAttempt; +import org.apache.tajo.master.container.TajoContainerId; + +public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent { + private final TaskAttemptScheduleContext context; + private final TaskAttempt taskAttempt; + + public TaskAttemptToSchedulerEvent(EventType eventType, ExecutionBlockId executionBlockId, + TaskAttemptScheduleContext context, TaskAttempt taskAttempt) { + super(eventType, executionBlockId); + this.context = context; + this.taskAttempt = taskAttempt; + } + + public TaskAttempt getTaskAttempt() { + return taskAttempt; + } + + public TaskAttemptScheduleContext getContext() { + return context; + } + + public static class TaskAttemptScheduleContext { + private TajoContainerId containerId; + private String host; + private RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback; + + public TaskAttemptScheduleContext() { + + } + + public TaskAttemptScheduleContext(TajoContainerId containerId, + String host, + RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) { + this.containerId = containerId; + this.host = host; + this.callback = callback; + } + + public TajoContainerId getContainerId() { + return containerId; + } + + public void setContainerId(TajoContainerId containerId) { + this.containerId = containerId; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() { + return callback; + } + + public void setCallback(RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) { + this.callback = callback; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java index 3ee389a..20204aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java @@ -18,14 +18,14 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; public class TaskCompletionEvent extends TaskAttemptEvent { private TaskCompletionReport report; public TaskCompletionEvent(TaskCompletionReport report) { - super(new QueryUnitAttemptId(report.getId()), TaskAttemptEventType.TA_DONE); + super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_DONE); this.report = report; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java index 234491b..377a8e0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java @@ -19,17 +19,17 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskId; public class TaskEvent extends AbstractEvent<TaskEventType> { - private final QueryUnitId id; + private final TaskId id; - public TaskEvent(QueryUnitId id, TaskEventType taskEventType) { + public TaskEvent(TaskId id, TaskEventType taskEventType) { super(taskEventType); this.id = id; } - public QueryUnitId getTaskId() { + public TaskId getTaskId() { return id; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java index a4d9900..03888bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java @@ -18,19 +18,19 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport; public class TaskFatalErrorEvent extends TaskAttemptEvent { private final String message; public TaskFatalErrorEvent(TaskFatalErrorReport report) { - super(new QueryUnitAttemptId(report.getId()), + super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_FATAL_ERROR); this.message = report.getErrorMessage(); } - public TaskFatalErrorEvent(QueryUnitAttemptId attemptId, String message) { + public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) { super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR); this.message = message; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java index 9e8e3dd..3f72ed9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java @@ -21,7 +21,8 @@ package org.apache.tajo.master.event; import com.google.protobuf.RpcCallback; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType; import org.apache.tajo.master.container.TajoContainerId; @@ -35,12 +36,12 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { private final TajoContainerId containerId; private final ExecutionBlockId executionBlockId; - private final RpcCallback<QueryUnitRequestProto> callback; + private final RpcCallback<TaskRequestProto> callback; public TaskRequestEvent(int workerId, TajoContainerId containerId, ExecutionBlockId executionBlockId, - RpcCallback<QueryUnitRequestProto> callback) { + RpcCallback<TaskRequestProto> callback) { super(TaskRequestEventType.TASK_REQ); this.workerId = workerId; this.containerId = containerId; @@ -60,7 +61,7 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { return executionBlockId; } - public RpcCallback<QueryUnitRequestProto> getCallback() { + public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() { return this.callback; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java index 28654f0..a4f120c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java @@ -18,17 +18,17 @@ package org.apache.tajo.master.event; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; public class TaskTAttemptEvent extends TaskEvent { - private final QueryUnitAttemptId attemptId; - public TaskTAttemptEvent(QueryUnitAttemptId attemptId, + private final TaskAttemptId attemptId; + public TaskTAttemptEvent(TaskAttemptId attemptId, TaskEventType eventType) { - super(attemptId.getQueryUnitId(), eventType); + super(attemptId.getTaskId(), eventType); this.attemptId = attemptId; } - public QueryUnitAttemptId getTaskAttemptId() { + public TaskAttemptId getTaskAttemptId() { return attemptId; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index f4bd8a3..e7e2bc0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -122,7 +122,7 @@ public class QueryMasterManagerService extends CompositeService @Override public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request, - RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) { + RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) { try { ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId()); QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId()); @@ -144,15 +144,15 @@ public class QueryMasterManagerService extends CompositeService public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { try { - QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()); - QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId()); + QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); + TaskAttemptId attemptId = new TaskAttemptId(request.getId()); QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); if (queryMasterTask == null) { queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); } - SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()); - QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId()); - QueryUnitAttempt attempt = task.getAttempt(attemptId.getId()); + SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId()); + Task task = sq.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId.getId()); if(LOG.isDebugEnabled()){ LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); @@ -161,10 +161,10 @@ public class QueryMasterManagerService extends CompositeService if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { LOG.warn(attemptId + " Killed"); attempt.handle( - new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); + new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); } else { queryMasterTask.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request)); + new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); } done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { @@ -185,11 +185,11 @@ public class QueryMasterManagerService extends CompositeService RpcCallback<PrimitiveProtos.BoolProto> done) { try { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId())); + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); if (queryMasterTask != null) { queryMasterTask.handleTaskFailed(report); } else { - LOG.warn("No QueryMasterTask: " + new QueryUnitAttemptId(report.getId())); + LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId())); } done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { @@ -203,7 +203,7 @@ public class QueryMasterManagerService extends CompositeService RpcCallback<PrimitiveProtos.BoolProto> done) { try { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId())); + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); if (queryMasterTask != null) { queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 8f63416..9ab4f0a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -285,12 +285,12 @@ public class QueryMasterTask extends CompositeService { private class TaskEventDispatcher implements EventHandler<TaskEvent> { public void handle(TaskEvent event) { - QueryUnitId taskId = event.getTaskId(); + TaskId taskId = event.getTaskId(); if(LOG.isDebugEnabled()) { LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); } - QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()). - getQueryUnit(taskId); + Task task = query.getSubQuery(taskId.getExecutionBlockId()). + getTask(taskId); task.handle(event); } } @@ -298,10 +298,10 @@ public class QueryMasterTask extends CompositeService { private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> { public void handle(TaskAttemptEvent event) { - QueryUnitAttemptId attemptId = event.getTaskAttemptId(); - SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()); - QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId()); - QueryUnitAttempt attempt = task.getAttempt(attemptId); + TaskAttemptId attemptId = event.getTaskAttemptId(); + SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId()); + Task task = subQuery.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId); attempt.handle(event); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java deleted file mode 100644 index 75402c2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ /dev/null @@ -1,907 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.QueryUnitId; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.FragmentPair; -import org.apache.tajo.master.TaskState; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.DataLocation; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.Pair; -import org.apache.tajo.util.TajoIdUtils; -import org.apache.tajo.util.history.QueryUnitHistory; -import org.apache.tajo.worker.FetchImpl; - -import java.net.URI; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; - -public class QueryUnit implements EventHandler<TaskEvent> { - /** Class Logger */ - private static final Log LOG = LogFactory.getLog(QueryUnit.class); - - private final Configuration systemConf; - private QueryUnitId taskId; - private EventHandler eventHandler; - private StoreTableNode store = null; - private LogicalNode plan = null; - private List<ScanNode> scan; - - private Map<String, Set<FragmentProto>> fragMap; - private Map<String, Set<FetchImpl>> fetchMap; - - private int totalFragmentNum; - - private List<ShuffleFileOutput> shuffleFileOutputs; - private TableStats stats; - private final boolean isLeafTask; - private List<IntermediateEntry> intermediateData; - - private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts; - private final int maxAttempts = 3; - private Integer nextAttempt = -1; - private QueryUnitAttemptId lastAttemptId; - - private QueryUnitAttemptId successfulAttempt; - private String succeededHost; - private int succeededHostPort; - private int succeededPullServerPort; - - private int failedAttempts; - private int finishedAttempts; // finish are total of success, failed and killed - - private long launchTime; - private long finishTime; - - private List<DataLocation> dataLocations = Lists.newArrayList(); - - private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); - - private QueryUnitHistory finalQueryUnitHistory; - - protected static final StateMachineFactory - <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory = - new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW) - - // Transitions from NEW state - .addTransition(TaskState.NEW, TaskState.SCHEDULED, - TaskEventType.T_SCHEDULE, - new InitialScheduleTransition()) - .addTransition(TaskState.NEW, TaskState.KILLED, - TaskEventType.T_KILL, - new KillNewTaskTransition()) - - // Transitions from SCHEDULED state - .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, - TaskEventType.T_ATTEMPT_LAUNCHED, - new AttemptLaunchedTransition()) - .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, - TaskEventType.T_KILL, - new KillTaskTransition()) - - // Transitions from RUNNING state - .addTransition(TaskState.RUNNING, TaskState.RUNNING, - TaskEventType.T_ATTEMPT_LAUNCHED) - .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, - TaskEventType.T_ATTEMPT_SUCCEEDED, - new AttemptSucceededTransition()) - .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, - TaskEventType.T_KILL, - new KillTaskTransition()) - .addTransition(TaskState.RUNNING, - EnumSet.of(TaskState.RUNNING, TaskState.FAILED), - TaskEventType.T_ATTEMPT_FAILED, - new AttemptFailedOrRetryTransition()) - - // Transitions from KILL_WAIT state - .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, - TaskEventType.T_ATTEMPT_KILLED, - ATTEMPT_KILLED_TRANSITION) - .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, - TaskEventType.T_ATTEMPT_LAUNCHED, - new KillTaskTransition()) - .addTransition(TaskState.KILL_WAIT, TaskState.FAILED, - TaskEventType.T_ATTEMPT_FAILED, - new AttemptFailedTransition()) - .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, - TaskEventType.T_ATTEMPT_SUCCEEDED, - ATTEMPT_KILLED_TRANSITION) - // Ignore-able transitions. - .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, - EnumSet.of( - TaskEventType.T_KILL, - TaskEventType.T_SCHEDULE)) - - // Transitions from SUCCEEDED state - // Ignore-able transitions - .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED, - EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - // Transitions from FAILED state - // Ignore-able transitions - .addTransition(TaskState.FAILED, TaskState.FAILED, - EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - // Transitions from KILLED state - .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition()) - // Ignore-able transitions - .addTransition(TaskState.KILLED, TaskState.KILLED, - EnumSet.of( - TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - .installTopology(); - - private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine; - - - private final Lock readLock; - private final Lock writeLock; - private QueryUnitAttemptScheduleContext scheduleContext; - - public QueryUnit(Configuration conf, QueryUnitAttemptScheduleContext scheduleContext, - QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) { - this.systemConf = conf; - this.taskId = id; - this.eventHandler = eventHandler; - this.isLeafTask = isLeafTask; - scan = new ArrayList<ScanNode>(); - fetchMap = Maps.newHashMap(); - fragMap = Maps.newHashMap(); - shuffleFileOutputs = new ArrayList<ShuffleFileOutput>(); - attempts = Collections.emptyMap(); - lastAttemptId = null; - nextAttempt = -1; - failedAttempts = 0; - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - this.scheduleContext = scheduleContext; - - stateMachine = stateMachineFactory.make(this); - totalFragmentNum = 0; - } - - public boolean isLeafTask() { - return this.isLeafTask; - } - - public TaskState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - public TaskAttemptState getLastAttemptStatus() { - QueryUnitAttempt lastAttempt = getLastAttempt(); - if (lastAttempt != null) { - return lastAttempt.getState(); - } else { - return TaskAttemptState.TA_ASSIGNED; - } - } - - public QueryUnitHistory getQueryUnitHistory() { - if (finalQueryUnitHistory != null) { - if (finalQueryUnitHistory.getFinishTime() == 0) { - finalQueryUnitHistory = makeQueryUnitHistory(); - } - return finalQueryUnitHistory; - } else { - return makeQueryUnitHistory(); - } - } - - private QueryUnitHistory makeQueryUnitHistory() { - QueryUnitHistory queryUnitHistory = new QueryUnitHistory(); - - QueryUnitAttempt lastAttempt = getLastAttempt(); - if (lastAttempt != null) { - queryUnitHistory.setId(lastAttempt.getId().toString()); - queryUnitHistory.setState(lastAttempt.getState().toString()); - queryUnitHistory.setProgress(lastAttempt.getProgress()); - } - queryUnitHistory.setHostAndPort(succeededHost + ":" + succeededHostPort); - queryUnitHistory.setRetryCount(this.getRetryCount()); - queryUnitHistory.setLaunchTime(launchTime); - queryUnitHistory.setFinishTime(finishTime); - - queryUnitHistory.setNumShuffles(getShuffleOutpuNum()); - if (!getShuffleFileOutputs().isEmpty()) { - ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0); - if (queryUnitHistory.getNumShuffles() > 0) { - queryUnitHistory.setShuffleKey("" + shuffleFileOutputs.getPartId()); - queryUnitHistory.setShuffleFileName(shuffleFileOutputs.getFileName()); - } - } - - List<String> fragmentList = new ArrayList<String>(); - for (FragmentProto eachFragment : getAllFragments()) { - try { - Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment); - fragmentList.add(fragment.toString()); - } catch (Exception e) { - LOG.error(e.getMessage()); - fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage()); - } - } - queryUnitHistory.setFragments(fragmentList.toArray(new String[]{})); - - List<String[]> fetchList = new ArrayList<String[]>(); - for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) { - for (FetchImpl f : e.getValue()) { - for (URI uri : f.getSimpleURIs()){ - fetchList.add(new String[] {e.getKey(), uri.toString()}); - } - } - } - - queryUnitHistory.setFetchs(fetchList.toArray(new String[][]{})); - - List<String> dataLocationList = new ArrayList<String>(); - for(DataLocation eachLocation: getDataLocations()) { - dataLocationList.add(eachLocation.toString()); - } - - queryUnitHistory.setDataLocations(dataLocationList.toArray(new String[]{})); - return queryUnitHistory; - } - - public void setLogicalPlan(LogicalNode plan) { - this.plan = plan; - - LogicalNode node = plan; - ArrayList<LogicalNode> s = new ArrayList<LogicalNode>(); - s.add(node); - while (!s.isEmpty()) { - node = s.remove(s.size()-1); - if (node instanceof UnaryNode) { - UnaryNode unary = (UnaryNode) node; - s.add(s.size(), unary.getChild()); - } else if (node instanceof BinaryNode) { - BinaryNode binary = (BinaryNode) node; - s.add(s.size(), binary.getLeftChild()); - s.add(s.size(), binary.getRightChild()); - } else if (node instanceof ScanNode) { - scan.add((ScanNode)node); - } else if (node instanceof TableSubQueryNode) { - s.add(((TableSubQueryNode) node).getSubQuery()); - } - } - } - - private void addDataLocation(Fragment fragment) { - String[] hosts = fragment.getHosts(); - int[] diskIds = null; - if (fragment instanceof FileFragment) { - diskIds = ((FileFragment)fragment).getDiskIds(); - } - for (int i = 0; i < hosts.length; i++) { - dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i])); - } - } - - public void addFragment(Fragment fragment, boolean useDataLocation) { - Set<FragmentProto> fragmentProtos; - if (fragMap.containsKey(fragment.getTableName())) { - fragmentProtos = fragMap.get(fragment.getTableName()); - } else { - fragmentProtos = new HashSet<FragmentProto>(); - fragMap.put(fragment.getTableName(), fragmentProtos); - } - fragmentProtos.add(fragment.getProto()); - if (useDataLocation) { - addDataLocation(fragment); - } - totalFragmentNum++; - } - - public void addFragments(Collection<Fragment> fragments) { - for (Fragment eachFragment: fragments) { - addFragment(eachFragment, false); - } - } - - public void setFragment(FragmentPair[] fragmentPairs) { - for (FragmentPair eachFragmentPair : fragmentPairs) { - this.addFragment(eachFragmentPair.getLeftFragment(), true); - if (eachFragmentPair.getRightFragment() != null) { - this.addFragment(eachFragmentPair.getRightFragment(), true); - } - } - } - - public List<DataLocation> getDataLocations() { - return dataLocations; - } - - public String getSucceededHost() { - return succeededHost; - } - - public void addFetches(String tableId, Collection<FetchImpl> fetches) { - Set<FetchImpl> fetchSet; - if (fetchMap.containsKey(tableId)) { - fetchSet = fetchMap.get(tableId); - } else { - fetchSet = Sets.newHashSet(); - } - fetchSet.addAll(fetches); - fetchMap.put(tableId, fetchSet); - } - - public void setFetches(Map<String, Set<FetchImpl>> fetches) { - this.fetchMap.clear(); - this.fetchMap.putAll(fetches); - } - - public Collection<FragmentProto> getAllFragments() { - Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>(); - for (Set<FragmentProto> eachFragmentSet : fragMap.values()) { - fragmentProtos.addAll(eachFragmentSet); - } - return fragmentProtos; - } - - public LogicalNode getLogicalPlan() { - return this.plan; - } - - public QueryUnitId getId() { - return taskId; - } - - public Collection<FetchImpl> getFetchHosts(String tableId) { - return fetchMap.get(tableId); - } - - public Collection<Set<FetchImpl>> getFetches() { - return fetchMap.values(); - } - - public Map<String, Set<FetchImpl>> getFetchMap() { - return fetchMap; - } - - public Collection<FetchImpl> getFetch(ScanNode scan) { - return this.fetchMap.get(scan.getTableName()); - } - - public ScanNode[] getScanNodes() { - return this.scan.toArray(new ScanNode[scan.size()]); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append(plan.getType() + " \n"); - for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) { - builder.append(e.getKey()).append(" : "); - for (FragmentProto fragment : e.getValue()) { - builder.append(fragment).append(", "); - } - } - for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) { - builder.append(e.getKey()).append(" : "); - for (FetchImpl t : e.getValue()) { - for (URI uri : t.getURIs()){ - builder.append(uri).append(" "); - } - } - } - - return builder.toString(); - } - - public void setStats(TableStats stats) { - this.stats = stats; - } - - public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) { - this.shuffleFileOutputs = Collections.unmodifiableList(partitions); - } - - public TableStats getStats() { - return this.stats; - } - - public List<ShuffleFileOutput> getShuffleFileOutputs() { - return this.shuffleFileOutputs; - } - - public int getShuffleOutpuNum() { - return this.shuffleFileOutputs.size(); - } - - public QueryUnitAttempt newAttempt() { - QueryUnitAttempt attempt = new QueryUnitAttempt(scheduleContext, - QueryIdFactory.newQueryUnitAttemptId(this.getId(), ++nextAttempt), - this, eventHandler); - lastAttemptId = attempt.getId(); - return attempt; - } - - public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) { - return attempts.get(attemptId); - } - - public QueryUnitAttempt getAttempt(int attempt) { - return this.attempts.get(QueryIdFactory.newQueryUnitAttemptId(this.getId(), attempt)); - } - - public QueryUnitAttempt getLastAttempt() { - return getAttempt(this.lastAttemptId); - } - - public QueryUnitAttempt getSuccessfulAttempt() { - readLock.lock(); - try { - if (null == successfulAttempt) { - return null; - } - return attempts.get(successfulAttempt); - } finally { - readLock.unlock(); - } - } - - public int getRetryCount () { - return this.nextAttempt; - } - - public int getTotalFragmentNum() { - return totalFragmentNum; - } - - private static class InitialScheduleTransition implements - SingleArcTransition<QueryUnit, TaskEvent> { - - @Override - public void transition(QueryUnit task, TaskEvent taskEvent) { - task.addAndScheduleAttempt(); - } - } - - public long getLaunchTime() { - return launchTime; - } - - public long getFinishTime() { - return finishTime; - } - - @VisibleForTesting - public void setLaunchTime(long launchTime) { - this.launchTime = launchTime; - } - - @VisibleForTesting - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public long getRunningTime() { - if(finishTime > 0) { - return finishTime - launchTime; - } else { - return System.currentTimeMillis() - launchTime; - } - } - - // This is always called in the Write Lock - private void addAndScheduleAttempt() { - // Create new task attempt - QueryUnitAttempt attempt = newAttempt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Created attempt " + attempt.getId()); - } - switch (attempts.size()) { - case 0: - attempts = Collections.singletonMap(attempt.getId(), attempt); - break; - - case 1: - Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts - = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3); - newAttempts.putAll(attempts); - attempts = newAttempts; - attempts.put(attempt.getId(), attempt); - break; - - default: - attempts.put(attempt.getId(), attempt); - break; - } - - if (failedAttempts > 0) { - eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), - TaskAttemptEventType.TA_RESCHEDULE)); - } else { - eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), - TaskAttemptEventType.TA_SCHEDULE)); - } - } - - private void finishTask() { - this.finishTime = System.currentTimeMillis(); - finalQueryUnitHistory = makeQueryUnitHistory(); - } - - private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> { - - @Override - public void transition(QueryUnit task, TaskEvent taskEvent) { - task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED)); - } - } - - private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> { - - @Override - public void transition(QueryUnit task, TaskEvent taskEvent) { - task.finishTask(); - task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL)); - } - } - - private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{ - - @Override - public void transition(QueryUnit task, TaskEvent event) { - task.finishTask(); - task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED)); - } - } - - private static class AttemptSucceededTransition - implements SingleArcTransition<QueryUnit, TaskEvent>{ - - @Override - public void transition(QueryUnit task, - TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); - - task.successfulAttempt = attemptEvent.getTaskAttemptId(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); - task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); - - task.finishTask(); - task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED)); - } - } - - private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> { - @Override - public void transition(QueryUnit task, - TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); - task.launchTime = System.currentTimeMillis(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); - } - } - - private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> { - @Override - public void transition(QueryUnit task, TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - LOG.info("============================================================="); - LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); - LOG.info("============================================================="); - task.failedAttempts++; - task.finishedAttempts++; - - task.finishTask(); - task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED)); - } - } - - private static class AttemptFailedOrRetryTransition implements - MultipleArcTransition<QueryUnit, TaskEvent, TaskState> { - - @Override - public TaskState transition(QueryUnit task, TaskEvent taskEvent) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; - task.failedAttempts++; - task.finishedAttempts++; - boolean retry = task.failedAttempts < task.maxAttempts; - - LOG.info("===================================================================================="); - LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " + - "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<"); - LOG.info("===================================================================================="); - - if (retry) { - if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(); - } - } else { - task.finishTask(); - task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED)); - return TaskState.FAILED; - } - - return task.getState(); - } - } - - @Override - public void handle(TaskEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getTaskId() + " of type " - + event.getType()); - } - - try { - writeLock.lock(); - TaskState oldState = getState(); - try { - stateMachine.doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() - , e); - eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()), - QueryEventType.INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getState()) { - LOG.debug(taskId + " Task Transitioned from " + oldState + " to " - + getState()); - } - } - } - - finally { - writeLock.unlock(); - } - } - - public void setIntermediateData(Collection<IntermediateEntry> partitions) { - this.intermediateData = new ArrayList<IntermediateEntry>(partitions); - } - - public List<IntermediateEntry> getIntermediateData() { - return this.intermediateData; - } - - public static class PullHost implements Cloneable { - String host; - int port; - int hashCode; - - public PullHost(String pullServerAddr, int pullServerPort){ - this.host = pullServerAddr; - this.port = pullServerPort; - this.hashCode = Objects.hashCode(host, port); - } - public String getHost() { - return host; - } - - public int getPort() { - return this.port; - } - - public String getPullAddress() { - return host + ":" + port; - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PullHost) { - PullHost other = (PullHost) obj; - return host.equals(other.host) && port == other.port; - } - - return false; - } - - @Override - public PullHost clone() throws CloneNotSupportedException { - PullHost newPullHost = (PullHost) super.clone(); - newPullHost.host = host; - newPullHost.port = port; - newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port); - return newPullHost; - } - - @Override - public String toString() { - return host + ":" + port; - } - } - - public static class IntermediateEntry { - ExecutionBlockId ebId; - int taskId; - int attemptId; - int partId; - PullHost host; - long volume; - List<Pair<Long, Integer>> pages; - List<Pair<Long, Pair<Integer, Integer>>> failureRowNums; - - public IntermediateEntry(IntermediateEntryProto proto) { - this.ebId = new ExecutionBlockId(proto.getEbId()); - this.taskId = proto.getTaskId(); - this.attemptId = proto.getAttemptId(); - this.partId = proto.getPartId(); - - String[] pullHost = proto.getHost().split(":"); - this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1])); - this.volume = proto.getVolume(); - - failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - for (FailureIntermediateProto eachFailure: proto.getFailuresList()) { - - failureRowNums.add(new Pair(eachFailure.getPagePos(), - new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))); - } - - pages = new ArrayList<Pair<Long, Integer>>(); - for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) { - pages.add(new Pair(eachPage.getPos(), eachPage.getLength())); - } - } - - public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { - this.taskId = taskId; - this.attemptId = attemptId; - this.partId = partId; - this.host = host; - } - - public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) { - this.taskId = taskId; - this.attemptId = attemptId; - this.partId = partId; - this.host = host; - this.volume = volume; - } - - public ExecutionBlockId getEbId() { - return ebId; - } - - public void setEbId(ExecutionBlockId ebId) { - this.ebId = ebId; - } - - public int getTaskId() { - return this.taskId; - } - - public int getAttemptId() { - return this.attemptId; - } - - public int getPartId() { - return this.partId; - } - - public PullHost getPullHost() { - return this.host; - } - - public long getVolume() { - return this.volume; - } - - public long setVolume(long volume) { - return this.volume = volume; - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - - public void setPages(List<Pair<Long, Integer>> pages) { - this.pages = pages; - } - - public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() { - return failureRowNums; - } - - @Override - public int hashCode() { - return Objects.hashCode(ebId, taskId, partId, attemptId, host); - } - - public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) { - List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>(); - - if (pages == null || pages.isEmpty()) { - return splits; - } - int pageSize = pages.size(); - - long currentOffset = -1; - long currentBytes = 0; - - long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume; - for (int i = 0; i < pageSize; i++) { - Pair<Long, Integer> eachPage = pages.get(i); - if (currentOffset == -1) { - currentOffset = eachPage.getFirst(); - } - if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) { - splits.add(new Pair(currentOffset, currentBytes)); - currentOffset = eachPage.getFirst(); - currentBytes = 0; - realSplitVolume = splitVolume; - } - - currentBytes += eachPage.getSecond(); - } - - //add last - if (currentBytes > 0) { - splits.add(new Pair(currentOffset, currentBytes)); - } - return splits; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java deleted file mode 100644 index d88173f..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; -import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; -import org.apache.tajo.master.querymaster.QueryUnit.PullHost; -import org.apache.tajo.master.container.TajoContainerId; - -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; - -public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { - - private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class); - - private final static int EXPIRE_TIME = 15000; - - private final QueryUnitAttemptId id; - private final QueryUnit queryUnit; - final EventHandler eventHandler; - - private TajoContainerId containerId; - private WorkerConnectionInfo workerConnectionInfo; - private int expire; - - private final Lock readLock; - private final Lock writeLock; - - private final List<String> diagnostics = new ArrayList<String>(); - - private final QueryUnitAttemptScheduleContext scheduleContext; - - private float progress; - private CatalogProtos.TableStatsProto inputStats; - private CatalogProtos.TableStatsProto resultStats; - - protected static final StateMachineFactory - <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - stateMachineFactory = new StateMachineFactory - <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - (TaskAttemptState.TA_NEW) - - // Transitions from TA_NEW state - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, - TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition()) - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, - TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition()) - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_KILL, - new TaskKilledCompleteTransition()) - - // Transitions from TA_UNASSIGNED state - .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED, - TaskAttemptEventType.TA_ASSIGNED, - new LaunchTransition()) - .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillUnassignedTaskTransition()) - - // Transitions from TA_ASSIGNED state - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED, - TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, - EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED), - TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new SucceededTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - - // Transitions from TA_RUNNING state - .addTransition(TaskAttemptState.TA_RUNNING, - EnumSet.of(TaskAttemptState.TA_RUNNING), - TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new SucceededTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_LOCAL_KILLED, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_ASSIGNED, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_SCHEDULE_CANCELED, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_DONE, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, - EnumSet.of( - TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - TaskAttemptEventType.TA_UPDATE)) - - // Transitions from TA_SUCCEEDED state - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_UPDATE) - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition()) - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - // Ignore-able transitions - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_KILL) - - // Transitions from TA_KILLED state - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE) - // Ignore-able transitions - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - EnumSet.of( - TaskAttemptEventType.TA_UPDATE)) - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - EnumSet.of( - TaskAttemptEventType.TA_LOCAL_KILLED, - TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_ASSIGNED, - TaskAttemptEventType.TA_DONE), - new TaskKilledCompleteTransition()) - .installTopology(); - - private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - stateMachine; - - - public QueryUnitAttempt(final QueryUnitAttemptScheduleContext scheduleContext, - final QueryUnitAttemptId id, final QueryUnit queryUnit, - final EventHandler eventHandler) { - this.scheduleContext = scheduleContext; - this.id = id; - this.expire = QueryUnitAttempt.EXPIRE_TIME; - this.queryUnit = queryUnit; - this.eventHandler = eventHandler; - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - - stateMachine = stateMachineFactory.make(this); - } - - public TaskAttemptState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - public QueryUnitAttemptId getId() { - return this.id; - } - - public boolean isLeafTask() { - return this.queryUnit.isLeafTask(); - } - - public QueryUnit getQueryUnit() { - return this.queryUnit; - } - - public WorkerConnectionInfo getWorkerConnectionInfo() { - return this.workerConnectionInfo; - } - - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - - public synchronized void setExpireTime(int expire) { - this.expire = expire; - } - - public synchronized void updateExpireTime(int period) { - this.setExpireTime(this.expire - period); - } - - public synchronized void resetExpireTime() { - this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME); - } - - public int getLeftTime() { - return this.expire; - } - - public float getProgress() { - return progress; - } - - public TableStats getInputStats() { - if (inputStats == null) { - return null; - } - - return new TableStats(inputStats); - } - - public TableStats getResultStats() { - if (resultStats == null) { - return null; - } - return new TableStats(resultStats); - } - - private void fillTaskStatistics(TaskCompletionReport report) { - this.progress = 1.0f; - - List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); - - if (report.getShuffleFileOutputsCount() > 0) { - this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList()); - - PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort()); - for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { - IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(), - getId().getId(), p.getPartId(), host, p.getVolume()); - partitions.add(entry); - } - } - this.getQueryUnit().setIntermediateData(partitions); - - if (report.hasInputStats()) { - this.inputStats = report.getInputStats(); - } - if (report.hasResultStats()) { - this.resultStats = report.getResultStats(); - this.getQueryUnit().setStats(new TableStats(resultStats)); - } - } - - private static class TaskAttemptScheduleTransition implements - SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - - @Override - public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { - taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent( - EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(), - taskAttempt.scheduleContext, taskAttempt)); - } - } - - private static class KillUnassignedTaskTransition implements - SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - - @Override - public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { - taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent( - EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(), - taskAttempt.scheduleContext, taskAttempt)); - } - } - - private static class LaunchTransition - implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - - @Override - public void transition(QueryUnitAttempt taskAttempt, - TaskAttemptEvent event) { - TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; - taskAttempt.containerId = castEvent.getContainerId(); - taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); - taskAttempt.eventHandler.handle( - new TaskTAttemptEvent(taskAttempt.getId(), - TaskEventType.T_ATTEMPT_LAUNCHED)); - } - } - - private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - - @Override - public void transition(QueryUnitAttempt taskAttempt, - TaskAttemptEvent event) { - taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(), - TaskEventType.T_ATTEMPT_KILLED)); - LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask"); - } - } - - private static class StatusUpdateTransition - implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> { - - @Override - public TaskAttemptState transition(QueryUnitAttempt taskAttempt, - TaskAttemptEvent event) { - TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event; - - taskAttempt.progress = updateEvent.getStatus().getProgress(); - taskAttempt.inputStats = updateEvent.getStatus().getInputStats(); - taskAttempt.resultStats = updateEvent.getStatus().getResultStats(); - - return TaskAttemptState.TA_RUNNING; - } - } - - private void addDiagnosticInfo(String diag) { - if (diag != null && !diag.equals("")) { - diagnostics.add(diag); - } - } - - private static class AlreadyAssignedTransition - implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{ - - @Override - public void transition(QueryUnitAttempt queryUnitAttempt, - TaskAttemptEvent taskAttemptEvent) { - } - } - - private static class AlreadyDoneTransition - implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{ - - @Override - public void transition(QueryUnitAttempt queryUnitAttempt, - TaskAttemptEvent taskAttemptEvent) { - } - } - - private static class SucceededTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - @Override - public void transition(QueryUnitAttempt taskAttempt, - TaskAttemptEvent event) { - TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); - - try { - taskAttempt.fillTaskStatistics(report); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); - } catch (Throwable t) { - taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); - taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); - } - } - } - - private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> { - - @Override - public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) { - taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId, - LocalTaskEventType.KILL)); - } - } - - private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{ - @Override - public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) { - TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); - taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); - LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() - + " >> " + errorEvent.errorMessage()); - } - } - - @Override - public void handle(TaskAttemptEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType()); - } - try { - writeLock.lock(); - TaskAttemptState oldState = getState(); - try { - stateMachine.doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() - , e); - eventHandler.handle( - new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(), - "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); - eventHandler.handle( - new SubQueryEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(), - SubQueryEventType.SQ_INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getState()) { - LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to " - + getState()); - } - } - } - - finally { - writeLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index a240ace..cf6b917 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -44,7 +44,7 @@ import org.apache.tajo.exception.InternalException; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.master.TaskSchedulerContext; -import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.master.querymaster.Task.IntermediateEntry; import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; @@ -717,7 +717,7 @@ public class Repartitioner { List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId()); for (ExecutionBlock childBlock : childBlocks) { SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); - for (QueryUnit qu : childExecSM.getQueryUnits()) { + for (Task qu : childExecSM.getTasks()) { for (IntermediateEntry p : qu.getIntermediateData()) { FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); fetch.addPart(p.getTaskId(), p.getAttemptId()); @@ -840,8 +840,8 @@ public class Repartitioner { // make FetchImpl per PullServer, PartId Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions); for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) { - Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue()); - for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) { + Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue()); + for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) { FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(), block.getId(), interm.getKey(), e.getValue()); @@ -1191,10 +1191,10 @@ public class Repartitioner { return hashed; } - public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) { - Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>(); + public static Map<Task.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) { + Map<Task.PullHost, List<IntermediateEntry>> hashed = new HashMap<Task.PullHost, List<IntermediateEntry>>(); - QueryUnit.PullHost host; + Task.PullHost host; for (IntermediateEntry entry : entries) { host = entry.getPullHost(); if (hashed.containsKey(host)) {
