Updated Branches: refs/heads/TEZ-1 be6d4bc09 -> 8298190dd
TEZ-111. Create tests for DAGImpl. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8298190d Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8298190d Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8298190d Branch: refs/heads/TEZ-1 Commit: 8298190ddb513652ea65f72a5104f6b3b3320c86 Parents: be6d4bc Author: Hitesh Shah <[email protected]> Authored: Mon May 20 14:49:43 2013 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon May 20 14:49:43 2013 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/event/DAGEventType.java | 17 +- .../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 45 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 76 +- .../apache/tez/dag/app/dag/impl/TestDAGImpl.java | 626 +++++++++++++++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 167 +++- 5 files changed, 814 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java index 20f14b2..14c2f30 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java @@ -30,24 +30,17 @@ public enum DAGEventType { DAG_INIT, DAG_START, - //Producer:Task - /* - JOB_TASK_COMPLETED, - JOB_MAP_TASK_RESCHEDULED, - JOB_TASK_ATTEMPT_COMPLETED, - */ - //Producer: Vertex - DAG_VERTEX_INITED, - DAG_VERTEX_STARTED, DAG_VERTEX_COMPLETED, + + //Producer: TaskImpl DAG_SCHEDULER_UPDATE, - - //Producer:Job + + //Producer:Dag DAG_COMPLETED, //Producer:Any component DAG_DIAGNOSTIC_UPDATE, INTERNAL_ERROR, - DAG_COUNTER_UPDATE, + DAG_COUNTER_UPDATE, } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 66f9c36..5f47818 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -117,7 +117,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final TaskAttemptListener taskAttemptListener; private final TaskHeartbeatHandler taskHeartbeatHandler; private final Object tasksSyncHandle = new Object(); - + private DAGScheduler dagScheduler; private final EventHandler eventHandler; @@ -129,7 +129,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final AppContext appContext; volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>(); - private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>(); + private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>(); private TezCounters dagCounters = new TezCounters(); private Object fullCountersLock = new Object(); private TezCounters fullCounters = null; @@ -150,7 +150,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = new CounterUpdateTransition(); - private static final DAGSchedulerUpdateTransition + private static final DAGSchedulerUpdateTransition DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition(); protected static final @@ -211,8 +211,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(DAGState.RUNNING, DAGState.RUNNING, DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) - .addTransition(DAGState.RUNNING, DAGState.RUNNING, - DAGEventType.DAG_SCHEDULER_UPDATE, + .addTransition(DAGState.RUNNING, DAGState.RUNNING, + DAGEventType.DAG_SCHEDULER_UPDATE, DAG_SCHEDULER_UPDATE_TRANSITION) .addTransition( DAGState.RUNNING, @@ -332,7 +332,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.jobPlan = jobPlan; this.conf = conf; this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>"; - + this.userName = appUserName; // TODO Metrics //this.metrics = metrics; @@ -374,7 +374,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, public TezConfiguration getConf() { return conf; } - + @Override public DAGPlan getJobPlan() { return jobPlan; @@ -508,7 +508,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, readLock.unlock(); } } - + // monitoring apis @Override public DAGStatusBuilder getDAGStatus() { @@ -552,11 +552,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } return vertex.getVertexStatus(); } - - + + protected void startRootVertices() { for (Vertex v : vertices.values()) { if (v.getInputVerticesCount() == 0) { + LOG.info("DEBUG: Starting root vertex " + v.getName()); eventHandler.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_START)); } @@ -605,7 +606,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, writeLock.unlock(); } } - + @Private public DAGState getInternalState() { readLock.lock(); @@ -727,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, public String getUserName() { return userName; } - + @Override public String getQueueName() { return queueName; @@ -862,7 +863,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList()); Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList()); - + // setup the dag for (Vertex v : dag.vertices.values()) { parseVertexEdges(dag, edgePlans, v); @@ -888,10 +889,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) { TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId); - + VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId); VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan(vertexPlan.getTaskLocationHintList()); - + return new VertexImpl( vertexId, vertexPlan, vertexName, dag.conf, dag.eventHandler, dag.taskAttemptListener, @@ -912,18 +913,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, for(String inEdgeId : vertexPlan.getInEdgeIdList()){ EdgePlan edgePlan = edgePlans.get(inEdgeId); - Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName()); + Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName()); EdgeProperty edgeProp = dag.edges.get(inEdgeId); inVertices.put(inVertex, edgeProp); } - + for(String outEdgeId : vertexPlan.getOutEdgeIdList()){ EdgePlan edgePlan = edgePlans.get(outEdgeId); - Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName()); + Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName()); EdgeProperty edgeProp = dag.edges.get(outEdgeId); outVertices.put(outVertex, edgeProp); } - + vertex.setInputVertices(inVertices); vertex.setOutputVertices(outVertices); } @@ -931,9 +932,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, protected void setup(DAGImpl job) throws IOException { job.initTime = job.clock.getTime(); String dagIdString = job.dagId.toString(); - + dagIdString.replace("application", "job"); - + // TODO remove - TEZ-71 String user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -1165,7 +1166,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } } - + private static class DAGSchedulerUpdateTransition implements SingleArcTransition<DAGImpl, DAGEvent> { @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 20d13cd..ce4c609 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -141,7 +141,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // TODO Metrics //private final MRAppMetrics metrics; private final AppContext appContext; - + private boolean lazyTasksCopyNeeded = false; volatile Map<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>(); private Object fullCountersLock = new Object(); @@ -154,7 +154,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private int numStartedSourceVertices = 0; private int distanceFromRoot = 0; - + private List<TezDependentTaskCompletionEvent> sourceTaskAttemptCompletionEvents; private final List<String> diagnostics = new ArrayList<String>(); @@ -167,7 +167,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, List<InputSpec> inputSpecList; List<OutputSpec> outputSpecList; - + private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); private static final TaskAttemptCompletedEventTransition @@ -200,7 +200,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, .addTransition(VertexState.INITED, VertexState.INITED, VertexEventType.V_SOURCE_VERTEX_STARTED, new SourceVertexStartedTransition()) - .addTransition(VertexState.INITED, VertexState.RUNNING, + .addTransition(VertexState.INITED, VertexState.RUNNING, VertexEventType.V_START, new StartTransition()) @@ -220,7 +220,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) .addTransition (VertexState.RUNNING, - EnumSet.of(VertexState.RUNNING, VertexState.KILLED, + EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED), VertexEventType.V_TASK_COMPLETED, new TaskCompletedTransition()) @@ -335,14 +335,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private Token<JobTokenIdentifier> jobToken; private final TezVertexID vertexId; //runtime assigned id. - private final VertexPlan vertexPlan; + private final VertexPlan vertexPlan; private final String vertexName; private final String processorName; private Map<Vertex, EdgeProperty> sourceVertices; private Map<Vertex, EdgeProperty> targetVertices; - + private VertexScheduler vertexScheduler; private VertexOutputCommitter committer; @@ -351,8 +351,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private Map<String, LocalResource> localResources; private Map<String, String> environment; private final String javaOpts; - - public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, + + public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName, TezConfiguration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, @@ -384,13 +384,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.jobToken = jobToken; this.committer = new NullVertexOutputCommitter(); this.vertexLocationHint = vertexLocationHint; - + this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig()); - this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null; + this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null; this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList()); this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList()); this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null; - + // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); @@ -404,12 +404,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public TezVertexID getVertexId() { return vertexId; } - + @Override public VertexPlan getVertexPlan() { return vertexPlan; } - + @Override public int getDistanceFromRoot() { return distanceFromRoot; @@ -521,7 +521,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.readLock.unlock(); } } - + @Override public ProgressBuilder getVertexProgress() { this.readLock.lock(); @@ -537,7 +537,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.readLock.unlock(); } } - + @Override public VertexStatusBuilder getVertexStatus() { this.readLock.lock(); @@ -631,7 +631,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, writeLock.unlock(); } } - + private VertexState getInternalState() { readLock.lock(); try { @@ -710,7 +710,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.abortVertex(VertexStatus.State.FAILED); return vertex.finished(VertexState.FAILED); } - + if(vertex.succeededTaskCount == vertex.tasks.size()) { try { if (!vertex.committed.getAndSet(true)) { @@ -721,9 +721,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e); return vertex.finished(VertexState.FAILED); } - return vertex.finished(VertexState.SUCCEEDED); + return vertex.finished(VertexState.SUCCEEDED); } - + if (vertex.completedTaskCount == vertex.tasks.size()) { // this means the vertex has some killed tasks assert vertex.killedTaskCount > 0; @@ -731,9 +731,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.abortVertex(VertexStatus.State.KILLED); return vertex.finished(VertexState.KILLED); } - + //return the current state, Vertex not finished yet - return vertex.getInternalState(); + return vertex.getInternalState(); } VertexState finished(VertexState finalState) { @@ -774,9 +774,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // TODODAGAM // TODO: Splits? - + vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks(); - + /* TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); job.numMapTasks = taskSplitMetaInfo.length; @@ -794,8 +794,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // create the Tasks but don't start them yet createTasks(vertex); - - + + boolean hasBipartite = false; if (vertex.sourceVertices != null) { @@ -806,10 +806,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } } - + if (hasBipartite) { // setup vertex scheduler - // TODO this needs to consider data size and perhaps API. + // TODO this needs to consider data size and perhaps API. // Currently implicitly BIPARTITE is the only edge type vertex.vertexScheduler = new BipartiteSlowStartVertexScheduler( vertex, @@ -907,7 +907,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public void transition(VertexImpl vertex, VertexEvent event) { - VertexEventSourceVertexStarted startEvent = + VertexEventSourceVertexStarted startEvent = (VertexEventSourceVertexStarted) event; int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1; if(vertex.distanceFromRoot < distanceFromRoot) { @@ -916,8 +916,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.numStartedSourceVertices++; if (vertex.numStartedSourceVertices == vertex.sourceVertices.size()) { // Consider inlining this. - LOG.info("Starting vertex: " + vertex.getVertexId() + - " with name: " + vertex.getName() + + LOG.info("Starting vertex: " + vertex.getVertexId() + + " with name: " + vertex.getName() + " with distanceFromRoot: " + vertex.distanceFromRoot ); vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, VertexEventType.V_START)); @@ -1040,10 +1040,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, //eventId is equal to index in the arraylist tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size()); vertex.sourceTaskAttemptCompletionEvents.add(tce); - // TODO this needs to be ordered/grouped by source vertices or else - // my tasks will not know which events are for which vertices' tasks. This + // TODO this needs to be ordered/grouped by source vertices or else + // my tasks will not know which events are for which vertices' tasks. This // differentiation was not needed for MR because there was only 1 M stage. - // if the tce is sent to the task then a solution could be to add vertex + // if the tce is sent to the task then a solution could be to add vertex // name to the tce // need to send vertex name and task index in that vertex @@ -1060,7 +1060,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); } - + vertex.vertexScheduler.onSourceTaskCompleted(attemptId); } } @@ -1073,7 +1073,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, TezDependentTaskCompletionEvent tce = ((VertexEventTaskAttemptCompleted) event).getCompletionEvent(); - // TODO this should only be sent for successful events? looks like all + // TODO this should only be sent for successful events? looks like all // need to be sent in the existing shuffle code // Notify all target vertices if (vertex.targetVertices != null) { @@ -1245,7 +1245,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public TezDAGID getDAGId() { return getDAG().getID(); } - + @Override public ApplicationAttemptId getApplicationAttemptId() { return appContext.getApplicationAttemptId(); @@ -1259,7 +1259,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public DAG getDAG() { return appContext.getDAG(); } - + @VisibleForTesting String getProcessorName() { return this.processorName; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java new file mode 100644 index 0000000..3adcd8b --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeConnectionPattern; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; +import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; +import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; +import org.apache.tez.dag.app.dag.event.DAGFinishEvent; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventType; +import org.apache.tez.dag.app.dag.event.VertexEvent; +import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; +import org.apache.tez.dag.app.dag.event.VertexEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.avro.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.engine.common.security.JobTokenSecretManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class TestDAGImpl { + + private static final Log LOG = LogFactory.getLog(TestDAGImpl.class); + private DAGPlan dagPlan; + private TezDAGID dagId; + private TezConfiguration conf; + private DrainDispatcher dispatcher; + private Credentials fsTokens; + private AppContext appContext; + private ApplicationAttemptId appAttemptId; + private DAGImpl dag; + private VertexEventDispatcher vertexEventDispatcher; + private DagEventDispatcher dagEventDispatcher; + private TaskAttemptListener taskAttemptListener; + private TaskHeartbeatHandler thh; + private Clock clock = new SystemClock(); + private JobTokenSecretManager jobTokenSecretManager; + private DAGFinishEventHandler dagFinishEventHandler; + + private class DagEventDispatcher implements EventHandler<DAGEvent> { + @Override + public void handle(DAGEvent event) { + dag.handle(event); + } + } + + private class HistoryHandler implements EventHandler<DAGHistoryEvent> { + @Override + public void handle(DAGHistoryEvent event) { + } + } + + private class TaskEventHandler implements EventHandler<TaskEvent> { + @Override + public void handle(TaskEvent event) { + } + } + + private class VertexEventDispatcher + implements EventHandler<VertexEvent> { + + @SuppressWarnings("unchecked") + @Override + public void handle(VertexEvent event) { + Vertex vertex = dag.getVertex(event.getVertexId()); + ((EventHandler<VertexEvent>) vertex).handle(event); + } + } + + private class DAGFinishEventHandler + implements EventHandler<DAGFinishEvent> { + public int dagFinishEvents = 0; + + @Override + public void handle(DAGFinishEvent event) { + ++dagFinishEvents; + } + } + + private DAGPlan createTestDAGPlan() { + LOG.info("Setting up dag plan"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("testverteximpl") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host1") + .addRack("rack1") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .addOutEdgeId("e1") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host2") + .addRack("rack2") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x2.y2") + .build() + ) + .addOutEdgeId("e2") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex3") + .setType(PlanVertexType.NORMAL) + .setProcessorName("x3.y3") + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host3") + .addRack("rack3") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("foo") + .setTaskModule("x3.y3") + .build() + ) + .addInEdgeId("e1") + .addInEdgeId("e2") + .addOutEdgeId("e3") + .addOutEdgeId("e4") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex4") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host4") + .addRack("rack4") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x4.y4") + .build() + ) + .addInEdgeId("e3") + .addOutEdgeId("e5") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex5") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host5") + .addRack("rack5") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x5.y5") + .build() + ) + .addInEdgeId("e4") + .addOutEdgeId("e6") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex6") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host6") + .addRack("rack6") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x6.y6") + .build() + ) + .addInEdgeId("e5") + .addInEdgeId("e6") + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i3_v1") + .setInputVertexName("vertex1") + .setOutputClass("o1") + .setOutputVertexName("vertex3") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e1") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i3_v2") + .setInputVertexName("vertex2") + .setOutputClass("o2") + .setOutputVertexName("vertex3") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e2") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i4_v3") + .setInputVertexName("vertex3") + .setOutputClass("o3_v4") + .setOutputVertexName("vertex4") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e3") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i5_v3") + .setInputVertexName("vertex3") + .setOutputClass("o3_v5") + .setOutputVertexName("vertex5") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e4") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i6_v4") + .setInputVertexName("vertex4") + .setOutputClass("o4") + .setOutputVertexName("vertex6") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e5") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setInputClass("i6_v5") + .setInputVertexName("vertex5") + .setOutputClass("o5") + .setOutputVertexName("vertex6") + .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE) + .setId("e6") + .setSourceType(PlanEdgeSourceType.STABLE) + .build() + ) + .build(); + + return dag; + } + + @Before + public void setup() { + conf = new TezConfiguration(); + appAttemptId = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(100, 1), 1); + dagId = new TezDAGID(appAttemptId.getApplicationId(), 1); + Assert.assertNotNull(dagId); + dagPlan = createTestDAGPlan(); + dispatcher = new DrainDispatcher(); + fsTokens = new Credentials(); + jobTokenSecretManager = new JobTokenSecretManager(); + appContext = mock(AppContext.class); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(dagId).when(appContext).getDAGID(); + dag = new DAGImpl(dagId, appAttemptId, conf, dagPlan, + dispatcher.getEventHandler(), taskAttemptListener, + jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext); + doReturn(dag).when(appContext).getDAG(); + vertexEventDispatcher = new VertexEventDispatcher(); + dispatcher.register(VertexEventType.class, vertexEventDispatcher); + dagEventDispatcher = new DagEventDispatcher(); + dispatcher.register(DAGEventType.class, dagEventDispatcher); + dispatcher.register(HistoryEventType.class, + new HistoryHandler()); + dagFinishEventHandler = new DAGFinishEventHandler(); + dispatcher.register(DAGFinishEvent.Type.class, dagFinishEventHandler); + dispatcher.register(TaskEventType.class, new TaskEventHandler()); + dispatcher.init(conf); + dispatcher.start(); + } + + @After + public void teardown() { + dagPlan = null; + dag = null; + dispatcher.await(); + dispatcher.stop(); + } + + private void initDAG(DAGImpl dag) { + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); + Assert.assertEquals(DAGState.INITED, dag.getState()); + } + + private void startDAG(DAGImpl dag) { + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START)); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + } + + @Test + public void testDAGInit() { + initDAG(dag); + Assert.assertEquals(6, dag.getTotalVertices()); + } + + @Test + public void testDAGStart() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + for (int i = 0 ; i < 6; ++i ) { + TezVertexID vId = new TezVertexID(dagId, i); + Vertex v = dag.getVertex(vId); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + if (i < 2) { + Assert.assertEquals(0, v.getDistanceFromRoot()); + } else if (i == 2) { + Assert.assertEquals(1, v.getDistanceFromRoot()); + } else if ( i > 2 && i < 5) { + Assert.assertEquals(2, v.getDistanceFromRoot()); + } else if (i == 5) { + Assert.assertEquals(3, v.getDistanceFromRoot()); + } + } + + for (int i = 0 ; i < 6; ++i ) { + TezVertexID vId = new TezVertexID(dagId, i); + LOG.info("Distance from root: v" + i + ":" + + dag.getVertex(vId).getDistanceFromRoot()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testVertexCompletion() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + TezVertexID vId = new TezVertexID(dagId, 1); + Vertex v = dag.getVertex(vId); + ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted( + new TezTaskID(vId, 0), TaskState.SUCCEEDED)); + ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted( + new TezTaskID(vId, 1), TaskState.SUCCEEDED)); + dispatcher.await(); + + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertEquals(1, dag.getSuccessfulVertices()); + } + + public void testKillStartedDAG() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.await(); + + Assert.assertEquals(DAGState.KILLED, dag.getState()); + for (int i = 0 ; i < 6; ++i ) { + TezVertexID vId = new TezVertexID(dagId, i); + Vertex v = dag.getVertex(vId); + Assert.assertEquals(VertexState.KILLED, v.getState()); + } + + } + + @SuppressWarnings("unchecked") + @Test + public void testKillRunningDAG() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + TezVertexID vId1 = new TezVertexID(dagId, 1); + Vertex v1 = dag.getVertex(vId1); + ((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted( + new TezTaskID(vId1, 0), TaskState.SUCCEEDED)); + TezVertexID vId0 = new TezVertexID(dagId, 0); + Vertex v0 = dag.getVertex(vId0); + ((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted( + new TezTaskID(vId0, 0), TaskState.SUCCEEDED)); + dispatcher.await(); + + Assert.assertEquals(VertexState.SUCCEEDED, v0.getState()); + Assert.assertEquals(VertexState.RUNNING, v1.getState()); + + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.await(); + + Assert.assertEquals(DAGState.KILL_WAIT, dag.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v0.getState()); + Assert.assertEquals(VertexState.KILL_WAIT, v1.getState()); + for (int i = 2 ; i < 6; ++i ) { + TezVertexID vId = new TezVertexID(dagId, i); + Vertex v = dag.getVertex(vId); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); + } + Assert.assertEquals(1, dag.getSuccessfulVertices()); + } + + @Test + public void testInvalidEvent() { + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START)); + dispatcher.await(); + Assert.assertEquals(DAGState.ERROR, dag.getState()); + } + + @Test + @Ignore + public void testVertexSuccessfulCompletionUpdates() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + for (int i = 0; i < 6; ++i) { + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); + } + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + Assert.assertEquals(1, dag.getSuccessfulVertices()); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 2), VertexState.SUCCEEDED)); + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 3), VertexState.SUCCEEDED)); + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 4), VertexState.SUCCEEDED)); + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 5), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.SUCCEEDED, dag.getState()); + Assert.assertEquals(6, dag.getSuccessfulVertices()); + } + + @Test + @Ignore + public void testVertexFailureHandling() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 2), VertexState.FAILED)); + dispatcher.await(); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(2, dag.getSuccessfulVertices()); + + // Expect running vertices to be killed on first failure + for (int i = 3; i < 6; ++i) { + TezVertexID vId = new TezVertexID(dagId, i); + Vertex v = dag.getVertex(vId); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); + } + } + + @Test + @Ignore + public void testDAGKill() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + + for (int i = 2; i < 6; ++i) { + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, i), VertexState.SUCCEEDED)); + } + dispatcher.await(); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(6, dag.getSuccessfulVertices()); + Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); + } + + @Test + public void testDAGKillPending() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + + for (int i = 2; i < 5; ++i) { + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, i), VertexState.SUCCEEDED)); + } + dispatcher.await(); + Assert.assertEquals(DAGState.KILL_WAIT, dag.getState()); + + dag.handle(new DAGEventVertexCompleted( + new TezVertexID(dagId, 5), VertexState.KILLED)); + dispatcher.await(); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(5, dag.getSuccessfulVertices()); + Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); + } + + @Test + public void testDiagnosticUpdates() { + // FIXME need to implement + } + + @Test + public void testCounterUpdates() { + // FIXME need to implement + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index fe6ae14..f78a7b8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -62,6 +62,8 @@ import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; @@ -115,7 +117,7 @@ public class TestVertexImpl { public int abortCounter = 0; private boolean throwError; private boolean throwErrorOnAbort; - + public CountingVertexOutputCommitter(boolean throwError, boolean throwOnAbort) { this.throwError = throwError; @@ -125,7 +127,7 @@ public class TestVertexImpl { public CountingVertexOutputCommitter() { this(false, false); } - + @Override public void init(VertexContext context) throws IOException { ++initCounter; @@ -150,12 +152,26 @@ public class TestVertexImpl { if (throwErrorOnAbort) { throw new IOException("I can throwz exceptions in abort"); } - } + } } - + + private class TaskEventHandler implements EventHandler<TaskEvent> { + @Override + public void handle(TaskEvent event) { + } + } + private class DagEventDispatcher implements EventHandler<DAGEvent> { + public Map<DAGEventType, Integer> eventCount = + new HashMap<DAGEventType, Integer>(); + @Override public void handle(DAGEvent event) { + int count = 1; + if (eventCount.containsKey(event.getType())) { + count = eventCount.get(event.getType()) + 1; + } + eventCount.put(event.getType(), count); } } @@ -164,7 +180,7 @@ public class TestVertexImpl { public void handle(DAGHistoryEvent event) { } } - + private class VertexEventDispatcher implements EventHandler<VertexEvent> { @@ -421,7 +437,7 @@ public class TestVertexImpl { Map<Vertex, EdgeProperty> outVertices = new HashMap<Vertex, EdgeProperty>(); - + for(String inEdgeId : vertexPlan.getInEdgeIdList()){ EdgePlan edgePlan = edgePlans.get(inEdgeId); Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName()); @@ -469,6 +485,7 @@ public class TestVertexImpl { dispatcher.register(DAGEventType.class, dagEventDispatcher); dispatcher.register(HistoryEventType.class, new HistoryHandler()); + dispatcher.register(TaskEventType.class, new TaskEventHandler()); dispatcher.init(conf); dispatcher.start(); } @@ -500,10 +517,10 @@ public class TestVertexImpl { if (checkKillWait) { Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); } else { - Assert.assertEquals(VertexState.KILLED, v.getState()); + Assert.assertEquals(VertexState.KILLED, v.getState()); } } - + private void startVertex(VertexImpl v, boolean checkRunningState) { Assert.assertEquals(VertexState.INITED, v.getState()); @@ -522,10 +539,10 @@ public class TestVertexImpl { VertexImpl v3 = vertices.get("vertex3"); initVertex(v3); - + Assert.assertEquals("x3.y3", v3.getProcessorName()); Assert.assertEquals("foo", v3.getJavaOpts()); - + Assert.assertEquals(2, v3.getInputSpecList().size()); Assert.assertEquals(2, v3.getInputVerticesCount()); Assert.assertEquals(2, v3.getOutputVerticesCount()); @@ -547,7 +564,7 @@ public class TestVertexImpl { .getInputClassName()) || "i3_v2".equals(v3.getInputSpecList().get(1) .getInputClassName())); - + Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0) .getVertexName()) || "vertex5".equals(v3.getOutputSpecList().get(0) @@ -606,7 +623,7 @@ public class TestVertexImpl { v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); - + v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); @@ -632,7 +649,7 @@ public class TestVertexImpl { StringUtils.join(",", v.getDiagnostics()).toLowerCase(); Assert.assertTrue(diagnostics.contains("task failed " + t1.toString())); } - + @Test public void testVertexWithNoTasks() { // FIXME a vertex with no tasks should not be allowed @@ -644,14 +661,14 @@ public class TestVertexImpl { } @Test - public void testVertexKill() { + public void testVertexKillDiagnostics() { VertexImpl v1 = vertices.get("vertex1"); killVertex(v1, false); String diagnostics = StringUtils.join(",", v1.getDiagnostics()).toLowerCase(); Assert.assertTrue(diagnostics.contains( "vertex received kill in new state")); - + VertexImpl v2 = vertices.get("vertex2"); initVertex(v2); killVertex(v2, false); @@ -672,30 +689,73 @@ public class TestVertexImpl { } @Test - public void testKilledTasksHandling() { + public void testVertexKillPending() { VertexImpl v = vertices.get("vertex2"); initVertex(v); + VertexImpl v3 = vertices.get("vertex3"); + initVertex(v3); + startVertex(v); - TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); - TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); + v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); + + v.handle(new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); - v.handle(new VertexEventTaskCompleted(t1, TaskState.KILLED)); + v.handle(new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 1), TaskState.KILLED)); dispatcher.await(); - Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(VertexState.KILLED, v.getState()); + } + + @Test + @Ignore + public void testVertexKill() { + VertexImpl v = vertices.get("vertex2"); + initVertex(v); + VertexImpl v3 = vertices.get("vertex3"); + initVertex(v3); - v.handle(new VertexEventTaskCompleted(t2, TaskState.KILLED)); + startVertex(v); + + v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); + + v.handle(new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); + + v.handle(new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); } @Test + @Ignore + public void testKilledTasksHandling() { + VertexImpl v = vertices.get("vertex2"); + initVertex(v); + startVertex(v); + + TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); + TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); + + v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED)); + dispatcher.await(); + Assert.assertEquals(VertexState.FAILED, v.getState()); + Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState()); + } + + @Test public void testVertexCommitterInit() { VertexImpl v2 = vertices.get("vertex2"); initVertex(v2); Assert.assertTrue(v2.getVertexOutputCommitter() instanceof NullVertexOutputCommitter); - + VertexImpl v6 = vertices.get("vertex6"); initVertex(v6); Assert.assertTrue(v6.getVertexOutputCommitter() @@ -708,13 +768,13 @@ public class TestVertexImpl { initVertex(v2); Assert.assertTrue(v2.getVertexScheduler() instanceof ImmediateStartVertexScheduler); - + VertexImpl v6 = vertices.get("vertex6"); initVertex(v6); Assert.assertTrue(v6.getVertexScheduler() instanceof BipartiteSlowStartVertexScheduler); } - + @Test public void testVertexTaskFailure() { VertexImpl v = vertices.get("vertex2"); @@ -730,7 +790,7 @@ public class TestVertexImpl { v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); - + v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED)); v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED)); dispatcher.await(); @@ -765,9 +825,9 @@ public class TestVertexImpl { public void testDiagnostics() { // FIXME need to test diagnostics in various cases } - + @Test - public void testTaskAttemptCompletionEvents() { + public void testTaskAttemptCompletionEvents() { // FIXME need to test handling of task attempt events } @@ -821,7 +881,7 @@ public class TestVertexImpl { v4.handle(new VertexEventTaskAttemptCompleted(cEvt3)); v5.handle(new VertexEventTaskAttemptCompleted(cEvt4)); v5.handle(new VertexEventTaskAttemptCompleted(cEvt5)); - v5.handle(new VertexEventTaskAttemptCompleted(cEvt6)); + v5.handle(new VertexEventTaskAttemptCompleted(cEvt6)); v4.handle(new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED)); v4.handle(new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED)); @@ -850,7 +910,9 @@ public class TestVertexImpl { v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); - + Assert.assertEquals(1, + dagEventDispatcher.eventCount.get( + DAGEventType.DAG_VERTEX_COMPLETED).intValue()); } @Test @@ -861,7 +923,7 @@ public class TestVertexImpl { CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(); v.setVertexOutputCommitter(committer); - + startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); @@ -873,14 +935,14 @@ public class TestVertexImpl { // v.handle(new VertexEventTaskReschedule(t1)); v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); - Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(0, committer.commitCounter); - + v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); - Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); - + } @Test @@ -890,7 +952,7 @@ public class TestVertexImpl { CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(); v.setVertexOutputCommitter(committer); - + startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); @@ -900,23 +962,23 @@ public class TestVertexImpl { v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); - Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); - + v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); - Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); Assert.assertEquals(0, committer.abortCounter); Assert.assertEquals(0, committer.initCounter); // already done in init Assert.assertEquals(0, committer.setupCounter); // already done in init } - + @Test public void testCommitterInitAndSetup() { // FIXME need to add a test for this } - + @Test public void testTaskAttemptFetchFailureHandling() { // FIXME needs testing @@ -929,7 +991,7 @@ public class TestVertexImpl { CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(true, true); v.setVertexOutputCommitter(committer); - + startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); @@ -938,13 +1000,28 @@ public class TestVertexImpl { v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); - Assert.assertEquals(VertexState.FAILED, v.getState()); + Assert.assertEquals(VertexState.FAILED, v.getState()); Assert.assertEquals(1, committer.commitCounter); - + // FIXME need to verify whether abort needs to be called if commit fails Assert.assertEquals(0, committer.abortCounter); Assert.assertEquals(0, committer.initCounter); // already done in init - Assert.assertEquals(0, committer.setupCounter); // already done in init + Assert.assertEquals(0, committer.setupCounter); // already done in init + } + + @Test + public void testHistoryEventGeneration() { + } + + @Test + public void testInvalidEvent() { + VertexImpl v = vertices.get("vertex2"); + v.handle(new VertexEvent(v.getVertexId(), + VertexEventType.V_START)); + dispatcher.await(); + Assert.assertEquals(VertexState.ERROR, v.getState()); + Assert.assertEquals(1, + dagEventDispatcher.eventCount.get( + DAGEventType.INTERNAL_ERROR).intValue()); } - }
