http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java index 5c97ba8..16d32d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java @@ -36,6 +36,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto; /** * The history class for TaskRunner processing. */ +@Deprecated public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { private Service.STATE state;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 734a8a5..d18a262 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +@Deprecated public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> { private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class); @@ -154,14 +155,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< if(context == null){ try { - context = new ExecutionBlockContext(getTajoConf(), - getWorkerContext(), - this, - startEvent.getQueryContext(), - startEvent.getPlan(), - startEvent.getExecutionBlockId(), - startEvent.getQueryMaster(), - startEvent.getShuffleType()); + context = new ExecutionBlockContext(getWorkerContext(), this, startEvent.getRequest()); context.init(); } catch (Throwable e) { LOG.fatal(e.getMessage(), e); @@ -170,7 +164,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< executionBlockContextMap.put(event.getExecutionBlockId(), context); } - TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId()); + TaskRunner taskRunner = new TaskRunner(context, startEvent.getRequest().getContainerId()); LOG.info("Start TaskRunner:" + taskRunner.getId()); taskRunnerMap.put(taskRunner.getId(), taskRunner); taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory()); http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java new file mode 100644 index 0000000..85d74e2 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ipc.TajoWorkerProtocol; + +public class ExecutionBlockStartEvent extends TaskManagerEvent { + private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto; + + public ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto) { + super(EventType.EB_START, new ExecutionBlockId(requestProto.getExecutionBlockId())); + this.requestProto = requestProto; + } + + public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() { + return requestProto; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java new file mode 100644 index 0000000..2b967ab --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.ipc.TajoWorkerProtocol; + +public class ExecutionBlockStopEvent extends TaskManagerEvent { + private TajoWorkerProtocol.ExecutionBlockListProto cleanupList; + + public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto executionBlockId, + TajoWorkerProtocol.ExecutionBlockListProto cleanupList) { + super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId)); + this.cleanupList = cleanupList; + } + + public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() { + return cleanupList; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java index 2f411e8..9a1c106 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java @@ -24,7 +24,7 @@ import com.google.protobuf.RpcCallback; import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto; import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto; -public class NodeResourceAllocateEvent extends NodeResourceManagerEvent { +public class NodeResourceAllocateEvent extends NodeResourceEvent { private BatchAllocationRequestProto request; private RpcCallback<BatchAllocationResponseProto> callback; http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java index a298d77..31d9229 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java @@ -21,7 +21,7 @@ package org.apache.tajo.worker.event; import org.apache.tajo.TajoProtos; import org.apache.tajo.resource.NodeResource; -public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { +public class NodeResourceDeallocateEvent extends NodeResourceEvent { private NodeResource resource; http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java new file mode 100644 index 0000000..6fd2e0d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NodeResourceEvent extends AbstractEvent<NodeResourceEvent.EventType> { + //consumer: NodeResourceManager + public enum EventType { + // producer: TajoWorkerManagerService + ALLOCATE, + // producer: TaskExecutor + DEALLOCATE + } + + public NodeResourceEvent(EventType eventType) { + super(eventType); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java deleted file mode 100644 index bcb3448..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.resource.NodeResource; - -public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> { - public enum EventType { - ALLOCATE, - DEALLOCATE - } - - public NodeResourceManagerEvent(EventType eventType) { - super(eventType); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java index 58ab74a..9eb8ae9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java @@ -22,19 +22,16 @@ import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.resource.NodeResource; public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> { - private final NodeResource resource; + // consumer: NodeStatusUpdater public enum EventType { + // producer: NodeResourceManager REPORT_RESOURCE, + // producer: TaskManager FLUSH_REPORTS } - public NodeStatusEvent(EventType eventType, NodeResource resource) { + public NodeStatusEvent(EventType eventType) { super(eventType); - this.resource = resource; - } - - public NodeResource getResource() { - return resource; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java new file mode 100644 index 0000000..c609c67 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.TaskAttemptId; + +public class TaskExecutorEvent extends AbstractEvent<TaskExecutorEvent.EventType> { + + // producer: NodeResourceManager, consumer: TaskExecutorEvent + public enum EventType { + START, + KILL, + ABORT + } + + private TaskAttemptId taskAttemptId; + + public TaskExecutorEvent(EventType eventType, + TaskAttemptId taskAttemptId) { + super(eventType); + this.taskAttemptId = taskAttemptId; + } + + public TaskAttemptId getTaskId() { + return taskAttemptId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java new file mode 100644 index 0000000..39b541b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TaskAttemptId; + +public class TaskManagerEvent extends AbstractEvent<TaskManagerEvent.EventType> { + // producer: NodeResourceManager, consumer: TaskManager + public enum EventType { + EB_START, + EB_STOP + } + + private ExecutionBlockId executionBlockId; + + public TaskManagerEvent(EventType eventType, + ExecutionBlockId executionBlockId) { + super(eventType); + this.executionBlockId = executionBlockId; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java index aac8973..7175251 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java @@ -21,6 +21,7 @@ package org.apache.tajo.worker.event; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; +@Deprecated public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> { public enum EventType { START, http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java index 908afa2..9406794 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java @@ -20,48 +20,20 @@ package org.apache.tajo.worker.event; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; - +@Deprecated public class TaskRunnerStartEvent extends TaskRunnerEvent { - private final QueryContext queryContext; - private final WorkerConnectionInfo queryMaster; - private final String containerId; - private final String plan; - private final PlanProto.ShuffleType shuffleType; - - public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster, - ExecutionBlockId executionBlockId, - String containerId, - QueryContext context, - String plan, - PlanProto.ShuffleType shuffleType) { - super(EventType.START, executionBlockId); - this.queryMaster = queryMaster; - this.containerId = containerId; - this.queryContext = context; - this.plan = plan; - this.shuffleType = shuffleType; - } - - public WorkerConnectionInfo getQueryMaster() { - return queryMaster; - } - - public String getContainerId() { - return containerId; - } - - public QueryContext getQueryContext() { - return queryContext; - } + private final TajoWorkerProtocol.RunExecutionBlockRequestProto request; - public String getPlan() { - return plan; + public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { + super(EventType.START, new ExecutionBlockId(request.getExecutionBlockId())); + this.request = request; } - public PlanProto.ShuffleType getShuffleType() { - return shuffleType; + public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() { + return request; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java index c8ec20d..297f30c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java @@ -20,6 +20,7 @@ package org.apache.tajo.worker.event; import org.apache.tajo.ExecutionBlockId; +@Deprecated public class TaskRunnerStopEvent extends TaskRunnerEvent { public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) { http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java new file mode 100644 index 0000000..f60e7c4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.resource.NodeResource; +import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; + +public class TaskStartEvent extends TaskExecutorEvent { + + private NodeResource allocatedResource; + private TaskRequestProto taskRequest; + + public TaskStartEvent(TaskRequestProto taskRequest, + NodeResource allocatedResource) { + super(EventType.START, new TaskAttemptId(taskRequest.getId())); + this.taskRequest = taskRequest; + this.allocatedResource = allocatedResource; + } + + public NodeResource getAllocatedResource() { + return allocatedResource; + } + + public TaskRequestProto getTaskRequest() { + return taskRequest; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 2324596..715b1e6 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -208,6 +208,7 @@ message TaskAllocationRequestProto { message BatchAllocationRequestProto { required ExecutionBlockIdProto executionBlockId = 1; repeated TaskAllocationRequestProto taskRequest = 2; + optional RunExecutionBlockRequestProto executionBlockRequest = 3; //TODO should be refactored } message BatchAllocationResponseProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index eca7f6d..0cec3da 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -19,6 +19,7 @@ package org.apache.tajo.querymaster; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.tajo.*; @@ -33,16 +34,25 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.session.Session; +import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter; +import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.worker.ExecutionBlockContext; -import org.apache.tajo.worker.Task; +import org.apache.tajo.worker.LegacyTaskImpl; +import org.apache.tajo.worker.TajoWorker; +import org.apache.tajo.worker.TaskRunnerManager; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -230,7 +240,7 @@ public class TestKillQuery { QueryId qid = LocalTajoTestingUtility.newQueryId(); ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1); TaskId tid = QueryIdFactory.newTaskId(eid); - TajoConf conf = new TajoConf(); + final TajoConf conf = new TajoConf(); TaskRequestImpl taskRequest = new TaskRequestImpl(); taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(), @@ -238,18 +248,37 @@ public class TestKillQuery { taskRequest.setInterQuery(); TaskAttemptId attemptId = new TaskAttemptId(tid, 1); + WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder + requestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder(); + + requestProto.setExecutionBlockId(eid.getProto()) + .setQueryMaster(queryMaster.getProto()) + .setNodeId(queryMaster.getHost()+":" + queryMaster.getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + TajoWorker.WorkerContext workerContext = new MockWorkerContext() { + @Override + public TajoConf getConf() { + return conf; + } + }; + ExecutionBlockContext context = - new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null, null); + new ExecutionBlockContext(workerContext, null, requestProto.build()); - org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId, + org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", CommonTestingUtil.getTestDir(), attemptId, conf, context, taskRequest); task.kill(); - assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); try { task.run(); - assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); } catch (Exception e) { - assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); } } @@ -271,4 +300,94 @@ public class TestKillQuery { super.dispatch(event); } } + + abstract class MockWorkerContext implements TajoWorker.WorkerContext { + + @Override + public QueryMaster getQueryMaster() { + return null; + } + + public abstract TajoConf getConf(); + + @Override + public ServiceTracker getServiceTracker() { + return null; + } + + @Override + public QueryMasterManagerService getQueryMasterManagerService() { + return null; + } + + @Override + public TaskRunnerManager getTaskRunnerManager() { + return null; + } + + @Override + public CatalogService getCatalog() { + return null; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + return null; + } + + @Override + public String getWorkerName() { + return null; + } + + @Override + public LocalDirAllocator getLocalDirAllocator() { + return null; + } + + @Override + public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() { + return null; + } + + @Override + public TajoSystemMetrics getWorkerSystemMetrics() { + return null; + } + + @Override + public HashShuffleAppenderManager getHashShuffleAppenderManager() { + return null; + } + + @Override + public HistoryWriter getTaskHistoryWriter() { + return null; + } + + @Override + public HistoryReader getHistoryReader() { + return null; + } + + @Override + public void cleanup(String strPath) { + + } + + @Override + public void cleanupTemporalDirectories() { + + } + + @Override + public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) { + + } + + @Override + public void setNumClusterNodes(int numClusterNodes) { + + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java new file mode 100644 index 0000000..9d4e1f3 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.ipc.TajoWorkerProtocol; + +import java.io.IOException; + +public class MockExecutionBlock extends ExecutionBlockContext { + + public MockExecutionBlock(TajoWorker.WorkerContext workerContext, + TajoWorkerProtocol.RunExecutionBlockRequestProto request) throws IOException { + super(workerContext, null, request); + } + + @Override + public void init() throws Throwable { + //skip + } + + @Override + public void fatalError(TaskAttemptId taskAttemptId, String message) { + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java new file mode 100644 index 0000000..18b9405 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.worker.event.NodeResourceEvent; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +public class MockNodeResourceManager extends NodeResourceManager { + + volatile boolean enableTaskHandlerEvent = true; + private final Semaphore barrier; + + public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, EventHandler taskEventHandler) { + super(dispatcher, taskEventHandler); + this.barrier = barrier; + } + + @Override + public void handle(NodeResourceEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected void startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { + if(enableTaskHandlerEvent) { + super.startExecutionBlock(request); + } + } + + @Override + protected void startTask(TajoWorkerProtocol.TaskRequestProto request, NodeResource resource) { + if(enableTaskHandlerEvent) { + super.startTask(request, resource); + } + } + + /** + * skip task execution and deallocation for testing + * */ + public void setTaskHandlerEvent(boolean flag) { + enableTaskHandlerEvent = flag; + } + + protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> createTaskRequests( + ExecutionBlockId ebId, int memory, int size) { + + Queue<TajoWorkerProtocol.TaskAllocationRequestProto> + requestProtoList = new LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>(); + for (int i = 0; i < size; i++) { + + TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0); + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); + builder.setId(taskAttemptId.getProto()); + builder.setShouldDie(true); + builder.setOutputTable(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); + builder.setClusteredOutput(false); + + + requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder() + .setResource(NodeResources.createResource(memory).getProto()) + .setTaskRequest(builder.build()).build()); + } + return requestProtoList; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java index 2d7d0be..dfcfd4f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java @@ -39,9 +39,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater { private Map<Integer, NodeResource> resources = Maps.newHashMap(); private MockResourceTracker resourceTracker; - public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo, + public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) { - super(connectionInfo, resourceManager); + super(workerContext, resourceManager); this.barrier = barrier; this.resourceTracker = new MockResourceTracker(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java new file mode 100644 index 0000000..f62733f --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.worker.event.TaskExecutorEvent; + +import java.io.IOException; +import java.util.concurrent.Semaphore; + +public class MockTaskExecutor extends TaskExecutor { + + protected final Semaphore barrier; + + public MockTaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) { + super(taskManager, rmEventHandler); + this.barrier = barrier; + } + + @Override + public void handle(TaskExecutorEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) { + final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); + + //ignore status changed log + final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) { + private TajoProtos.TaskAttemptState state; + + @Override + public TajoProtos.TaskAttemptState getState() { + return state; + } + + @Override + public void setState(TajoProtos.TaskAttemptState state) { + this.state = state; + } + }; + + return new Task() { + @Override + public void init() throws IOException { + + } + + @Override + public void fetch() { + + } + + @Override + public void run() throws Exception { + taskAttemptContext.stop(); + taskAttemptContext.setProgress(1.0f); + taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED); + } + + @Override + public void kill() { + + } + + @Override + public void abort() { + + } + + @Override + public void cleanup() { + + } + + @Override + public boolean hasFetchPhase() { + return false; + } + + @Override + public boolean isProgressChanged() { + return false; + } + + @Override + public boolean isStopped() { + return taskAttemptContext.isStopped(); + } + + @Override + public void updateProgress() { + + } + + @Override + public TaskAttemptContext getTaskContext() { + return taskAttemptContext; + } + + @Override + public ExecutionBlockContext getExecutionBlockContext() { + return context; + } + + @Override + public TajoWorkerProtocol.TaskStatusProto getReport() { + TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder(); + builder.setWorkerName("localhost:0"); + builder.setId(taskAttemptContext.getTaskId().getProto()) + .setProgress(taskAttemptContext.getProgress()) + .setState(taskAttemptContext.getState()); + + builder.setInputStats(new TableStats().getProto()); + return builder.build(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java new file mode 100644 index 0000000..678b063 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.worker.event.TaskManagerEvent; + +import java.io.IOException; +import java.util.concurrent.Semaphore; + +public class MockTaskManager extends TaskManager { + + private final Semaphore barrier; + + public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) { + super(dispatcher, workerContext, rmEventHandler); + this.barrier = barrier; + } + + @Override + protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { + try { + return new MockExecutionBlock(getWorkerContext(), request); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void stopExecutionBlock(ExecutionBlockContext context, + TajoWorkerProtocol.ExecutionBlockListProto cleanupList) { + //skip for testing + } + + @Override + public void handle(TaskManagerEvent event) { + super.handle(event); + barrier.release(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java new file mode 100644 index 0000000..e8c2b9c --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter; +import org.apache.tajo.util.metrics.TajoSystemMetrics; + +public abstract class MockWorkerContext implements TajoWorker.WorkerContext { + TajoSystemMetrics tajoSystemMetrics; + + @Override + public QueryMaster getQueryMaster() { + return null; + } + + public abstract TajoConf getConf(); + + @Override + public ServiceTracker getServiceTracker() { + return null; + } + + @Override + public QueryMasterManagerService getQueryMasterManagerService() { + return null; + } + + @Override + public TaskRunnerManager getTaskRunnerManager() { + return null; + } + + @Override + public CatalogService getCatalog() { + return null; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + return null; + } + + @Override + public String getWorkerName() { + return null; + } + + @Override + public LocalDirAllocator getLocalDirAllocator() { + return null; + } + + @Override + public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() { + return null; + } + + @Override + public TajoSystemMetrics getWorkerSystemMetrics() { + + if (tajoSystemMetrics == null) { + tajoSystemMetrics = new TajoSystemMetrics(getConf(), "test-file-group", "localhost"); + tajoSystemMetrics.start(); + } + return tajoSystemMetrics; + } + + @Override + public HashShuffleAppenderManager getHashShuffleAppenderManager() { + return null; + } + + @Override + public HistoryWriter getTaskHistoryWriter() { + return null; + } + + @Override + public HistoryReader getHistoryReader() { + return null; + } + + @Override + public void cleanup(String strPath) { + + } + + @Override + public void cleanupTemporalDirectories() { + + } + + @Override + public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) { + + } + + @Override + public void setNumClusterNodes(int numClusterNodes) { + + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index 513eb69..65627c1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -104,13 +104,13 @@ public class TestFetcher { @Test public void testAdjustFetchProcess() { - assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0); - assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0); - assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0); - assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0); - assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0); - assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0); - assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0); + assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(0, 0), 0); + assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(10, 10), 0); + assertEquals(0.05f, LegacyTaskImpl.adjustFetchProcess(10, 9), 0); + assertEquals(0.1f, LegacyTaskImpl.adjustFetchProcess(10, 8), 0); + assertEquals(0.25f, LegacyTaskImpl.adjustFetchProcess(10, 5), 0); + assertEquals(0.45f, LegacyTaskImpl.adjustFetchProcess(10, 1), 0); + assertEquals(0.5f, LegacyTaskImpl.adjustFetchProcess(10, 0), 0); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java index 7407acc..2cee7d0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -19,13 +19,15 @@ package org.apache.tajo.worker; import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.resource.NodeResources; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.event.NodeResourceAllocateEvent; @@ -42,9 +44,15 @@ import static org.junit.Assert.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; public class TestNodeResourceManager { - private NodeResourceManager resourceManager; - private MockNodeStatusUpdater statusUpdater; + private MockNodeResourceManager resourceManager; + private NodeStatusUpdater statusUpdater; + private TaskManager taskManager; + private TaskExecutor taskExecutor; private AsyncDispatcher dispatcher; + private AsyncDispatcher taskDispatcher; + private TajoWorker.WorkerContext workerContext; + + private CompositeService service; private int taskMemory; private TajoConf conf; @@ -61,29 +69,55 @@ public class TestNodeResourceManager { conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1); dispatcher = new AsyncDispatcher(); - dispatcher.init(conf); - dispatcher.start(); - - resourceManager = new NodeResourceManager(dispatcher); - resourceManager.init(conf); - resourceManager.start(); - - WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager); - statusUpdater.init(conf); - statusUpdater.start(); + taskDispatcher = new AsyncDispatcher(); + + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; + + taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler()); + taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler()); + resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, taskDispatcher.getEventHandler()); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager); + + service = new CompositeService("MockService") { + @Override + protected void serviceInit(Configuration conf) throws Exception { + addIfService(dispatcher); + addIfService(taskDispatcher); + addIfService(taskManager); + addIfService(taskExecutor); + addIfService(resourceManager); + addIfService(statusUpdater); + super.serviceInit(conf); + } + }; + + service.init(conf); + service.start(); } @After public void tearDown() { - resourceManager.stop(); - statusUpdater.stop(); - dispatcher.stop(); + service.stop(); } @Test public void testNodeResourceAllocateEvent() throws Exception { int requestSize = 4; + resourceManager.setTaskHandlerEvent(false); //skip task execution CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); @@ -91,14 +125,14 @@ public class TestNodeResourceManager { requestProto.setExecutionBlockId(ebId.getProto()); assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); - requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + // allocated all assertEquals(0, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManager.getAllocatedSize()); } @@ -106,6 +140,7 @@ public class TestNodeResourceManager { public void testNodeResourceCancellation() throws Exception { int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); int overSize = 10; + resourceManager.setTaskHandlerEvent(false); //skip task execution CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); @@ -113,18 +148,19 @@ public class TestNodeResourceManager { requestProto.setExecutionBlockId(ebId.getProto()); assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); - requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize)); + requestProto.addAllTaskRequest( + MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); assertEquals(overSize, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManager.getAllocatedSize()); } @Test public void testNodeResourceDeallocateEvent() throws Exception { int requestSize = 4; + resourceManager.setTaskHandlerEvent(false); //skip task execution CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); @@ -132,21 +168,20 @@ public class TestNodeResourceManager { requestProto.setExecutionBlockId(ebId.getProto()); assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); - requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); assertEquals(0, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManager.getAllocatedSize()); //deallocate for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) { // direct invoke handler for testing resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource())); } - assertEquals(0, resourceManager.getAllocatedSize()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); } @@ -154,12 +189,38 @@ public class TestNodeResourceManager { public void testParallelRequest() throws Exception { final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2; final int taskSize = 100000; + resourceManager.setTaskHandlerEvent(true); + final AtomicInteger totalComplete = new AtomicInteger(); final AtomicInteger totalCanceled = new AtomicInteger(); final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); - final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize); + final Queue<TaskAllocationRequestProto> + totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize); + + // first request with starting ExecutionBlock + TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder + ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder(); + ebRequestProto.setExecutionBlockId(ebId.getProto()) + .setQueryMaster(workerContext.getConnectionInfo().getProto()) + .setNodeId(workerContext.getConnectionInfo().getHost() + ":" + + workerContext.getConnectionInfo().getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + TaskAllocationRequestProto task = totalTasks.poll(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + requestProto.setExecutionBlockRequest(ebRequestProto.build()); + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + assertTrue(callFuture.get().getCancellationTaskCount() == 0); + totalComplete.incrementAndGet(); + // start parallel request ExecutorService executor = Executors.newFixedThreadPool(parallelCount); List<Future> futureList = Lists.newArrayList(); @@ -187,7 +248,6 @@ public class TestNodeResourceManager { totalCanceled.addAndGet(proto.getCancellationTaskCount()); } else { complete++; - dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource())); } } catch (Exception e) { fail(e.getMessage()); @@ -209,27 +269,4 @@ public class TestNodeResourceManager { executor.shutdown(); assertEquals(taskSize, totalComplete.get()); } - - protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) { - Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>(); - for (int i = 0; i < size; i++) { - - ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0); - - TajoWorkerProtocol.TaskRequestProto.Builder builder = - TajoWorkerProtocol.TaskRequestProto.newBuilder(); - builder.setId(taskAttemptId.getProto()); - builder.setShouldDie(true); - builder.setOutputTable(""); - builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); - builder.setClusteredOutput(false); - - - requestProtoList.add(TaskAllocationRequestProto.newBuilder() - .setResource(NodeResources.createResource(memory).getProto()) - .setTaskRequest(builder.build()).build()); - } - return requestProtoList; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java index fb3c77e..af40554 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.rm.Worker; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.event.NodeStatusEvent; import org.junit.After; @@ -37,18 +38,36 @@ public class TestNodeStatusUpdater { private MockNodeStatusUpdater statusUpdater; private AsyncDispatcher dispatcher; private TajoConf conf; + private TajoWorker.WorkerContext workerContext; + @Before public void setup() { conf = new TajoConf(); conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000); dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); - resourceManager = new NodeResourceManager(dispatcher); + resourceManager = new NodeResourceManager(dispatcher, null); resourceManager.init(conf); resourceManager.start(); } @@ -63,27 +82,25 @@ public class TestNodeStatusUpdater { @Test(timeout = 20000) public void testNodeMembership() throws Exception { CountDownLatch barrier = new CountDownLatch(1); - WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager); statusUpdater.init(conf); statusUpdater.start(); MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); barrier.await(); - assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId())); + assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId())); assertEquals(resourceManager.getTotalResource(), - resourceTracker.getTotalResource().get(worker.getId())); + resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId())); assertEquals(resourceManager.getAvailableResource(), - resourceTracker.getAvailableResource().get(worker.getId())); + resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId())); } @Test(timeout = 20000) public void testPing() throws Exception { CountDownLatch barrier = new CountDownLatch(2); - WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager); statusUpdater.init(conf); statusUpdater.start(); @@ -100,16 +117,29 @@ public class TestNodeStatusUpdater { @Test(timeout = 20000) public void testResourceReport() throws Exception { CountDownLatch barrier = new CountDownLatch(2); - WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager); statusUpdater.init(conf); statusUpdater.start(); + assertEquals(0, statusUpdater.getQueueSize()); for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) { - dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, - resourceManager.getAvailableResource())); + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE)); } barrier.await(); assertEquals(0, statusUpdater.getQueueSize()); } + + @Test(timeout = 20000) + public void testFlushResourceReport() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager); + statusUpdater.init(conf); + statusUpdater.start(); + + assertEquals(0, statusUpdater.getQueueSize()); + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); + + barrier.await(); + assertEquals(0, statusUpdater.getQueueSize()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java new file mode 100644 index 0000000..98b187b --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.*; +import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter; +import org.apache.tajo.util.metrics.TajoSystemMetrics; +import org.apache.tajo.worker.event.ExecutionBlockStartEvent; +import org.apache.tajo.worker.event.ExecutionBlockStopEvent; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import static org.junit.Assert.*; + +public class TestTaskExecutor { + + private NodeResourceManager resourceManager; + private NodeStatusUpdater statusUpdater; + private TaskManager taskManager; + private TaskExecutor taskExecutor; + private AsyncDispatcher dispatcher; + private AsyncDispatcher taskDispatcher; + private TajoWorker.WorkerContext workerContext; + + private CompositeService service; + private TajoConf conf; + private Semaphore barrier; + private Semaphore resourceManagerBarrier; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + dispatcher = new AsyncDispatcher(); + taskDispatcher = new AsyncDispatcher(); + + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; + + barrier = new Semaphore(0); + resourceManagerBarrier = new Semaphore(0); + taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler()); + taskExecutor = new TaskExecutor(barrier, taskManager, dispatcher.getEventHandler()); + resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, taskDispatcher.getEventHandler()); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager); + + service = new CompositeService("MockService") { + @Override + protected void serviceInit(Configuration conf) throws Exception { + addIfService(dispatcher); + addIfService(taskDispatcher); + addIfService(taskManager); + addIfService(taskExecutor); + addIfService(resourceManager); + addIfService(statusUpdater); + super.serviceInit(conf); + } + + + @Override + protected void serviceStop() throws Exception { + workerContext.getWorkerSystemMetrics().stop(); + super.serviceStop(); + } + }; + + service.init(conf); + service.start(); + } + + @After + public void tearDown() { + service.stop(); + } + + @Test + public void testTaskRequest() throws Exception { + int requestSize = 1; + + RunExecutionBlockRequestProto.Builder + ebRequestProto = RunExecutionBlockRequestProto.newBuilder(); + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1); + + ebRequestProto.setExecutionBlockId(ebId.getProto()) + .setQueryMaster(workerContext.getConnectionInfo().getProto()) + .setNodeId(workerContext.getConnectionInfo().getHost() + ":" + + workerContext.getConnectionInfo().getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.setExecutionBlockId(ebId.getProto()); + requestProto.setExecutionBlockRequest(ebRequestProto.build()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + //verify running task + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertEquals(1, taskExecutor.getRunningTasks()); + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertEquals(0, taskExecutor.getRunningTasks()); + assertEquals(1, taskExecutor.completeTasks); + + //verify the released resources + Thread.sleep(100); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + } + + @Test + public void testTaskException() throws Exception { + int requestSize = 1; + + RunExecutionBlockRequestProto.Builder + ebRequestProto = RunExecutionBlockRequestProto.newBuilder(); + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1); + + ebRequestProto.setExecutionBlockId(ebId.getProto()) + .setQueryMaster(workerContext.getConnectionInfo().getProto()) + .setNodeId(workerContext.getConnectionInfo().getHost()+":" + + workerContext.getConnectionInfo().getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.setExecutionBlockId(ebId.getProto()); + requestProto.setExecutionBlockRequest(ebRequestProto.build()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize)); + + taskExecutor.throwException.set(true); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + //verify running task + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertEquals(1, taskExecutor.getRunningTasks()); + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertEquals(0, taskExecutor.getRunningTasks()); + assertEquals(0, taskExecutor.completeTasks); + + //verify the released resources + Thread.sleep(100); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + } + + class TaskExecutor extends MockTaskExecutor { + int completeTasks; + AtomicBoolean throwException = new AtomicBoolean(); + + public TaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) { + super(barrier, taskManager, rmEventHandler); + } + + @Override + protected void stopTask(TaskAttemptId taskId) { + super.stopTask(taskId); + super.barrier.release(); + } + + @Override + protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) { + final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); + final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null); + + return new Task() { + @Override + public void init() throws IOException { + + try { + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void fetch() { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void run() throws Exception { + Thread.sleep(50); + + if(throwException.get()) throw new RuntimeException(); + + taskAttemptContext.stop(); + taskAttemptContext.setProgress(1.0f); + taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED); + completeTasks++; + } + + @Override + public void kill() { + + } + + @Override + public void abort() { + + } + + @Override + public void cleanup() { + } + + @Override + public boolean hasFetchPhase() { + return false; + } + + @Override + public boolean isProgressChanged() { + return false; + } + + @Override + public boolean isStopped() { + return taskAttemptContext.isStopped(); + } + + @Override + public void updateProgress() { + + } + + @Override + public TaskAttemptContext getTaskContext() { + return taskAttemptContext; + } + + @Override + public ExecutionBlockContext getExecutionBlockContext() { + return context; + } + + @Override + public TajoWorkerProtocol.TaskStatusProto getReport() { + TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder(); + builder.setWorkerName("localhost:0"); + builder.setId(taskAttemptContext.getTaskId().getProto()) + .setProgress(taskAttemptContext.getProgress()) + .setState(taskAttemptContext.getState()); + + builder.setInputStats(new TableStats().getProto()); + return builder.build(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java new file mode 100644 index 0000000..8bca489 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.ExecutionBlockStartEvent; +import org.apache.tajo.worker.event.ExecutionBlockStopEvent; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.*; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import static org.junit.Assert.*; + +public class TestTaskManager { + + private NodeResourceManager resourceManager; + private NodeStatusUpdater statusUpdater; + private TaskManager taskManager; + private TaskExecutor taskExecutor; + private AsyncDispatcher dispatcher; + private AsyncDispatcher taskDispatcher; + private TajoWorker.WorkerContext workerContext; + + private CompositeService service; + private int taskMemory; + private TajoConf conf; + private Semaphore barrier; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + taskMemory = 512; + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, + taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES)); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1); + + dispatcher = new AsyncDispatcher(); + taskDispatcher = new AsyncDispatcher(); + + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; + barrier = new Semaphore(0); + taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext, dispatcher.getEventHandler()); + taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler()); + resourceManager = new NodeResourceManager(dispatcher, taskDispatcher.getEventHandler()); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager); + + service = new CompositeService("MockService") { + @Override + protected void serviceInit(Configuration conf) throws Exception { + addIfService(dispatcher); + addIfService(taskDispatcher); + addIfService(taskManager); + addIfService(taskExecutor); + addIfService(resourceManager); + addIfService(statusUpdater); + super.serviceInit(conf); + } + + + @Override + protected void serviceStop() throws Exception { + workerContext.getWorkerSystemMetrics().stop(); + super.serviceStop(); + } + }; + + service.init(conf); + service.start(); + } + + @After + public void tearDown() { + service.stop(); + } + + @Test(timeout = 10000) + public void testExecutionBlockStart() throws Exception { + int requestSize = 1; + + TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder + ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder(); + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1); + + ebRequestProto.setExecutionBlockId(ebId.getProto()) + .setQueryMaster(workerContext.getConnectionInfo().getProto()) + .setNodeId(workerContext.getConnectionInfo().getHost() + ":" + + workerContext.getConnectionInfo().getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.setExecutionBlockId(ebId.getProto()); + requestProto.setExecutionBlockRequest(ebRequestProto.build()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertNotNull(taskManager.getExecutionBlockContext(ebId)); + assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId()); + } + + @Test(timeout = 10000) + public void testExecutionBlockStop() throws Exception { + + TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder + ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder(); + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1); + + ebRequestProto.setExecutionBlockId(ebId.getProto()) + .setQueryMaster(workerContext.getConnectionInfo().getProto()) + .setNodeId(workerContext.getConnectionInfo().getHost()+":" + + workerContext.getConnectionInfo().getQueryMasterPort()) + .setContainerId("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setPlanJson("test") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + taskDispatcher.getEventHandler().handle(new ExecutionBlockStartEvent(ebRequestProto.build())); + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertNotNull(taskManager.getExecutionBlockContext(ebId)); + assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId()); + + ExecutionBlockListProto.Builder ebList = ExecutionBlockListProto.newBuilder(); + taskDispatcher.getEventHandler().handle(new ExecutionBlockStopEvent(ebId.getProto(), ebList.build())); + assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS)); + assertNull(taskManager.getExecutionBlockContext(ebId)); + } +}
