http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java deleted file mode 100644 index e389d64..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ /dev/null @@ -1,1340 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.dag.impl; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.api.records.DAGProtos; -import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; -import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; -import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; -import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType; -import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType; -import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; -import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; -import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; -import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; -import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; -import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.ClusterInfo; -import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; -import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.dag.VertexState; -import org.apache.tez.dag.app.dag.VertexTerminationCause; -import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; -import org.apache.tez.dag.app.dag.event.DAGEvent; -import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; -import org.apache.tez.dag.app.dag.event.DAGEventType; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.dag.app.dag.event.TaskEvent; -import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; -import org.apache.tez.dag.app.dag.event.TaskEventType; -import org.apache.tez.dag.app.dag.event.VertexEvent; -import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; -import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; -import org.apache.tez.dag.app.dag.event.VertexEventType; -import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; -import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.DAGInitializedEvent; -import org.apache.tez.dag.history.events.DAGStartedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; -import org.apache.tez.dag.history.events.VertexFinishedEvent; -import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexStartedEvent; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; -import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto; -import org.apache.tez.runtime.api.OutputCommitter; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class TestVertexRecovery { - - private static final Logger LOG = LoggerFactory.getLogger(TestVertexRecovery.class); - - private DrainDispatcher dispatcher; - - private AppContext mockAppContext; - private ApplicationId appId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - private DAGImpl dag; - private TezDAGID dagId = TezDAGID.getInstance(appId, 1); - private String user = "user"; - - private long initRequestedTime = 100L; - private long initedTime = initRequestedTime + 100L; - - /* - * v1 v2 \ / v3 - */ - private DAGPlan createDAGPlan() { - DAGPlan dag = - DAGPlan - .newBuilder() - .setName("testverteximpl") - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex1") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host1") - .addRack("rack1").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(1) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("").setTaskModule("x1.y1").build()) - .addOutEdgeId("e1") - .addOutputs( - DAGProtos.RootInputLeafOutputProto - .newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("output").build()) - .setName("outputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName( - CountingOutputCommitter.class.getName()))) - .build()) - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex2") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host2") - .addRack("rack2").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(2) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("").setTaskModule("x2.y2").build()) - .addOutEdgeId("e2").build()) - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex3") - .setType(PlanVertexType.NORMAL) - .setProcessorDescriptor( - TezEntityDescriptorProto.newBuilder().setClassName( - "x3.y3")) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host3") - .addRack("rack3").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(2) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("foo").setTaskModule("x3.y3").build()) - .addInEdgeId("e1") - .addInEdgeId("e2") - .addOutputs( - DAGProtos.RootInputLeafOutputProto - .newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("output").build()) - .setName("outputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName( - CountingOutputCommitter.class.getName()))) - .build() - - ) - - .addEdge( - EdgePlan - .newBuilder() - .setEdgeDestination( - TezEntityDescriptorProto.newBuilder().setClassName( - "i3_v1")) - .setInputVertexName("vertex1") - .setEdgeSource( - TezEntityDescriptorProto.newBuilder() - .setClassName("o1")) - .setOutputVertexName("vertex3") - .setDataMovementType( - PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1") - .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) - .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) - .build()) - .addEdge( - EdgePlan - .newBuilder() - .setEdgeDestination( - TezEntityDescriptorProto.newBuilder().setClassName( - "i3_v2")) - .setInputVertexName("vertex2") - .setEdgeSource( - TezEntityDescriptorProto.newBuilder() - .setClassName("o2")) - .setOutputVertexName("vertex3") - .setDataMovementType( - PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2") - .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) - .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) - .build()).build(); - - return dag; - } - - private DAGPlan createDAGPlanSingleVertex() { - DAGPlan dag = - DAGPlan - .newBuilder() - .setName("testverteximpl") - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex1") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host1") - .addRack("rack1").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(-1) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("").setTaskModule("x1.y1").build()) - .addInputs(RootInputLeafOutputProto.newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("input").build()) - .setName("inputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName("inputinitlizer")) - .build()) - .addOutputs( - DAGProtos.RootInputLeafOutputProto - .newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("output").build()) - .setName("outputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName( - CountingOutputCommitter.class.getName()))) - .build()).build(); - return dag; - } - - /* - * v1 - * | - * v2 - */ - private DAGPlan createDAGPlanMR() { - DAGPlan dag = - DAGPlan - .newBuilder() - .setName("testverteximpl") - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex1") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host1") - .addRack("rack1").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(1) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("").setTaskModule("x1.y1").build()) - .addOutEdgeId("e1") - .addOutputs( - DAGProtos.RootInputLeafOutputProto - .newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("output").build()) - .setName("outputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName( - CountingOutputCommitter.class.getName()))) - .build()) - .addVertex( - VertexPlan - .newBuilder() - .setName("vertex2") - .setType(PlanVertexType.NORMAL) - .setProcessorDescriptor( - TezEntityDescriptorProto.newBuilder().setClassName( - "x2.y2")) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder().addHost("host2") - .addRack("rack2").build()) - .setTaskConfig( - PlanTaskConfiguration.newBuilder().setNumTasks(2) - .setVirtualCores(4).setMemoryMb(1024) - .setJavaOpts("foo").setTaskModule("x2.y2").build()) - .addInEdgeId("e1") - .addOutputs( - DAGProtos.RootInputLeafOutputProto - .newBuilder() - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("output").build()) - .setName("outputx") - .setControllerDescriptor( - TezEntityDescriptorProto - .newBuilder() - .setClassName( - CountingOutputCommitter.class.getName()))) - .build() - - ) - .addEdge( - EdgePlan - .newBuilder() - .setEdgeDestination( - TezEntityDescriptorProto.newBuilder().setClassName( - "i2_v1")) - .setInputVertexName("vertex1") - .setEdgeSource( - TezEntityDescriptorProto.newBuilder() - .setClassName("o1")) - .setOutputVertexName("vertex2") - .setDataMovementType( - PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1") - .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) - .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) - .build()) - .build(); - - return dag; - } - - class DAGEventHandler implements EventHandler<DAGEvent> { - @Override - public void handle(DAGEvent event) { - dag.handle(event); - } - } - - class VertexEventHanlder implements EventHandler<VertexEvent> { - - private List<VertexEvent> events = new ArrayList<VertexEvent>(); - - @Override - public void handle(VertexEvent event) { - events.add(event); - ((VertexImpl) dag.getVertex(event.getVertexId())).handle(event); - } - - public List<VertexEvent> getEvents() { - return this.events; - } - } - - class TaskEventHandler implements EventHandler<TaskEvent> { - - private List<TaskEvent> events = new ArrayList<TaskEvent>(); - - @Override - public void handle(TaskEvent event) { - events.add(event); - ((TaskImpl) dag.getVertex(event.getTaskID().getVertexID()).getTask( - event.getTaskID())).handle(event); - } - - public List<TaskEvent> getEvents() { - return events; - } - } - - class TaskAttemptEventHandler implements EventHandler<TaskAttemptEvent> { - - @Override - public void handle(TaskAttemptEvent event) { - // TezTaskID taskId = event.getTaskAttemptID().getTaskID(); - // ((TaskAttemptImpl) vertex1.getTask(taskId).getAttempt( - // event.getTaskAttemptID())).handle(event); - } - } - - private DAGEventHandler dagEventHandler; - private VertexEventHanlder vertexEventHandler; - private TaskEventHandler taskEventHandler; - - @Before - public void setUp() throws IOException { - - dispatcher = new DrainDispatcher(); - dispatcher.register(DAGAppMasterEventType.class, mock(EventHandler.class)); - dagEventHandler = new DAGEventHandler(); - dispatcher.register(DAGEventType.class, dagEventHandler); - vertexEventHandler = new VertexEventHanlder(); - dispatcher.register(VertexEventType.class, vertexEventHandler); - taskEventHandler = new TaskEventHandler(); - dispatcher.register(TaskEventType.class, taskEventHandler); - dispatcher.register(TaskAttemptEventType.class, - new TaskAttemptEventHandler()); - dispatcher.init(new Configuration()); - dispatcher.start(); - - mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - - DAGPlan dagPlan = createDAGPlan(); - dag = - new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), - new Credentials(), new SystemClock(), user, - mock(TaskHeartbeatHandler.class), mockAppContext); - when(mockAppContext.getCurrentDAG()).thenReturn(dag); - ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); - doReturn(clusterInfo).when(mockAppContext).getClusterInfo(); - - dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null)); - dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName")); - LOG.info("finish setUp"); - } - - /** - * vertex1(New) -> StartRecoveryTransition(SUCCEEDED) - */ - @Test(timeout = 5000) - public void testRecovery_Desired_SUCCEEDED() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), - "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.SUCCEEDED)); - dispatcher.await(); - assertEquals(VertexState.SUCCEEDED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); - assertEquals(vertex1.numTasks, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - - } - - @Test(timeout = 5000) - public void testRecovery_SetParallelism() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - int oldNumTasks = 10; - VertexState recoveredState = vertex1 - .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", - initRequestedTime, initedTime, oldNumTasks, "", null)); - assertEquals(VertexState.INITED, recoveredState); - recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 - .getVertexId(), 5, null, null, null, oldNumTasks)); - assertEquals(5, vertex1.getTotalTasks()); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.SUCCEEDED)); - dispatcher.await(); - assertEquals(VertexState.SUCCEEDED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); - assertEquals(vertex1.numTasks, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - } - - @Test(timeout = 5000) - public void testRecovery_SetParallelismMultiple() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - int oldNumTasks = 10; - VertexState recoveredState = vertex1 - .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", - initRequestedTime, initedTime, oldNumTasks, "", null)); - assertEquals(VertexState.INITED, recoveredState); - recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 - .getVertexId(), 5, null, null, null, oldNumTasks)); - assertEquals(5, vertex1.getTotalTasks()); - recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 - .getVertexId(), 7, null, null, null, 5)); - assertEquals(7, vertex1.getTotalTasks()); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.SUCCEEDED)); - dispatcher.await(); - assertEquals(VertexState.SUCCEEDED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); - assertEquals(vertex1.numTasks, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - - } - - - /** - * vertex1(New) -> StartRecoveryTransition(SUCCEEDED) - * @throws IOException - */ - @Test(timeout = 5000) - public void testRecovery_Desired_SUCCEEDED_OnlySummaryLog() throws IOException { - DAGPlan dagPlan = createDAGPlanSingleVertex(); - dag = - new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), - new Credentials(), new SystemClock(), user, - mock(TaskHeartbeatHandler.class), mockAppContext); - when(mockAppContext.getCurrentDAG()).thenReturn(dag); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); - - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexFinishedEvent vertexFinishEvent = new VertexFinishedEvent(); - vertexFinishEvent.fromSummaryProtoStream(SummaryEventProto.newBuilder() - .setDagId(dag.getID().toString()) - .setEventType(HistoryEventType.VERTEX_FINISHED.ordinal()) - .setTimestamp(100L) - .setEventPayload(VertexFinishStateProto.newBuilder() - .setNumTasks(2) - .setState(VertexState.SUCCEEDED.ordinal()) - .setVertexId(vertex1.getVertexId().toString()).build().toByteString()) - .build()); - VertexState recoveredState = vertex1.restoreFromEvent(vertexFinishEvent); - // numTasks is recovered from summary log - assertEquals(2, vertex1.numTasks); - assertEquals(VertexState.SUCCEEDED, recoveredState); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.SUCCEEDED)); - dispatcher.await(); - assertEquals(VertexState.SUCCEEDED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); - assertEquals(vertex1.numTasks, vertex1.completedTaskCount); - } - - /** - * vertex1(New) -> StartRecoveryTransition(FAILED) - */ - @Test(timeout = 5000) - public void testRecovery_Desired_FAILED() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), - "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.FAILED)); - dispatcher.await(); - assertEquals(VertexState.FAILED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.failedTaskCount); - assertEquals(0, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - } - - /** - * vertex1(New) -> StartRecoveryTransition(KILLED) - */ - @Test(timeout = 5000) - public void testRecovery_Desired_KILLED() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), - "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.KILLED)); - dispatcher.await(); - assertEquals(VertexState.KILLED, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.killedTaskCount); - assertEquals(0, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - } - - /** - * vertex1(New) -> StartRecoveryTransition(ERROR) - */ - @Test(timeout = 5000) - public void testRecovery_Desired_ERROR() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), - "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.ERROR)); - dispatcher.await(); - assertEquals(VertexState.ERROR, vertex1.getState()); - assertEquals(vertex1.numTasks, vertex1.failedTaskCount); - assertEquals(0, vertex1.completedTaskCount); - // recover its task - assertTaskRecoveredEventSent(vertex1); - - // vertex3 is still in NEW, when the desiredState is - // Completed State, each vertex recovery by itself, not depend on its parent - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - assertEquals(VertexState.NEW, vertex3.getState()); - // no VertexEvent pass to downstream vertex - assertEquals(0, vertexEventHandler.getEvents().size()); - } - - private TezEvent createTezEvent() { - return new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)), - new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", null, - null)); - } - - /** - * vertex1(New) -> restoreFromDataMovementEvent -> StartRecoveryTransition - */ - @Test(timeout = 5000) - public void testRecovery_New_Desired_RUNNING() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - VertexState recoveredState = - vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex1.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.NEW, recoveredState); - assertEquals(1, vertex1.recoveredEvents.size()); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - - // InputDataInformationEvent is removed - assertEquals(0, vertex1.recoveredEvents.size()); - // V_INIT and V_START is sent - assertEquals(VertexState.RUNNING, vertex1.getState()); - - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - - } - - private void assertTaskRecoveredEventSent(VertexImpl vertex) { - int sentNum = 0; - for (TaskEvent event : taskEventHandler.getEvents()) { - if (event.getType() == TaskEventType.T_RECOVER) { - TaskEventRecoverTask recoverEvent = (TaskEventRecoverTask)event; - if (recoverEvent.getTaskID().getVertexID().equals(vertex.getVertexId())){ - sentNum++; - } - } - } - assertEquals("expect " + vertex.getTotalTasks() - + " TaskEventTaskRecover sent for vertex:" + vertex.getVertexId() + - "but actuall sent " + sentNum, vertex.getTotalTasks(), sentNum); - } - - private void assertOutputCommitters(VertexImpl vertex){ - assertTrue(vertex.getOutputCommitters() != null); - for (OutputCommitter c : vertex.getOutputCommitters().values()) { - CountingOutputCommitter committer = (CountingOutputCommitter) c; - assertEquals(0, committer.abortCounter); - assertEquals(0, committer.commitCounter); - assertEquals(1, committer.initCounter); - assertEquals(1, committer.setupCounter); - } - } - - private void restoreFromInitializedEvent(VertexImpl vertex) { - long initTimeRequested = 100L; - long initedTime = initTimeRequested + 100L; - VertexState recoveredState = - vertex.restoreFromEvent(new VertexInitializedEvent(vertex - .getVertexId(), "vertex1", initTimeRequested, initedTime, vertex.getTotalTasks(), - "", null)); - assertEquals(VertexState.INITED, recoveredState); - assertEquals(vertex.getTotalTasks(), vertex.getTasks().size()); - assertEquals(initTimeRequested, vertex.initTimeRequested); - assertEquals(initedTime, vertex.initedTime); - } - - /** - * restoreFromVertexInitializedEvent -> StartRecoveryTransition - */ - @Test(timeout = 5000) - public void testRecovery_Inited_Desired_RUNNING() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - - VertexState recoveredState = - vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex1.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.INITED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - - // InputDataInformationEvent is removed - assertEquals(0, vertex1.recoveredEvents.size()); - assertEquals(VertexState.RUNNING, vertex1.getState()); - // task recovered event is sent - assertTaskRecoveredEventSent(vertex1); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - } - - /** - * restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent -> - * StartRecoveryTransition - */ - @Test(timeout = 5000) - public void testRecovery_Started_Desired_RUNNING() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - - long startTimeRequested = initedTime + 100L; - long startedTime = startTimeRequested + 100L; - VertexState recoveredState = - vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - startTimeRequested, startedTime)); - assertEquals(VertexState.RUNNING, recoveredState); - assertEquals(startTimeRequested, vertex1.startTimeRequested); - assertEquals(startedTime, vertex1.startedTime); - - recoveredState = - vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex1.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.RUNNING, recoveredState); - assertEquals(1, vertex1.recoveredEvents.size()); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - - // InputDataInformationEvent is removed - assertEquals(0, vertex1.recoveredEvents.size()); - assertEquals(VertexState.RUNNING, vertex1.getState()); - // task recovered event is sent - assertTaskRecoveredEventSent(vertex1); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - } - - /** - * restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent -> - * restoreFromVertexFinishedEvent -> StartRecoveryTransition - */ - @Test(timeout = 5000) - public void testRecovery_Finished_Desired_RUNNING() { - // v1: initFromInitializedEvent - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - - // v1: initFromStartedEvent - long startRequestedTime = initedTime + 100L; - long startTime = startRequestedTime + 100L; - VertexState recoveredState = - vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - startRequestedTime, startTime)); - assertEquals(VertexState.RUNNING, recoveredState); - - // v1: initFromFinishedEvent - long finishTime = startTime + 100L; - recoveredState = - vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), - "vertex1", 1, initRequestedTime, initedTime, startRequestedTime, - startTime, finishTime, VertexState.SUCCEEDED, "", - new TezCounters(), new VertexStats(), null)); - assertEquals(finishTime, vertex1.finishTime); - assertEquals(VertexState.SUCCEEDED, recoveredState); - assertEquals(false, vertex1.recoveryCommitInProgress); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - - // InputDataInformationEvent is removed - assertEquals(0, vertex1.recoveredEvents.size()); - assertEquals(VertexState.RUNNING, vertex1.getState()); - // task recovered event is sent - assertTaskRecoveredEventSent(vertex1); - - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - } - - /** - * vertex1 (New) -> StartRecoveryTransition <br> - * vertex2 (New) -> StartRecoveryTransition <br> - * vertex3 (New) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_RecoveringFromNew() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(1, vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - VertexState recoveredState = - vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex3.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.NEW, recoveredState); - assertEquals(1, vertex3.recoveredEvents.size()); - - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex2.getState()); - // no OutputCommitter for vertex2 - assertNull(vertex2.getOutputCommitters()); - - // v3 go to RUNNING because v1 and v2 both start - assertEquals(VertexState.RUNNING, vertex3.getState()); - assertEquals(2, vertex3.numRecoveredSourceVertices); - assertEquals(2, vertex3.numInitedSourceVertices); - assertEquals(2, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - // RootInputDataInformation is removed - assertEquals(0, vertex3.recoveredEvents.size()); - - // verify OutputCommitter is initialized - assertOutputCommitters(vertex3); - - } - - /** - * vertex1 (New) -> StartRecoveryTransition <br> - * vertex2 (New) -> RecoveryTransition <br> - */ - @Test - public void testMRDAG() { - DAGPlan dagPlan = createDAGPlanMR(); - dag = - new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), - new Credentials(), new SystemClock(), user, - mock(TaskHeartbeatHandler.class), mockAppContext); - when(mockAppContext.getCurrentDAG()).thenReturn(dag); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); - - VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1"); - VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2"); - assertEquals(VertexState.NEW, vertex1.getState()); - assertEquals(VertexState.NEW, vertex1.getState()); - - // vertex1 handle RecoveryEvent at the state of NEW - // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(1, vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - assertEquals(VertexState.RUNNING, vertex2.getState()); - } - - @Test(timeout = 5000) - public void testRecovery_VertexManagerErrorOnRecovery() { - // In order to simulate the behavior that VertexManagerError happens in recovering stage, need to start the recovering from - // vertex and disable the the eventhandling of DAG (use mock here). - dispatcher = new DrainDispatcher(); - dispatcher.register(DAGEventType.class, mock(EventHandler.class)); - vertexEventHandler = new VertexEventHanlder(); - dispatcher.register(VertexEventType.class, vertexEventHandler); - taskEventHandler = new TaskEventHandler(); - dispatcher.register(TaskEventType.class, taskEventHandler); - dispatcher.register(TaskAttemptEventType.class, - new TaskAttemptEventHandler()); - dispatcher.init(new Configuration()); - dispatcher.start(); - mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - DAGPlan dagPlan = createDAGPlan(); - dag = - new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), - new Credentials(), new SystemClock(), user, - mock(TaskHeartbeatHandler.class), mockAppContext); - when(mockAppContext.getCurrentDAG()).thenReturn(dag); - ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); - doReturn(clusterInfo).when(mockAppContext).getClusterInfo(); - dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null)); - dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName")); - LOG.info("finish setUp"); - - /////////////////// Start the recover //////////////////////// - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - VertexState recoveredState = - vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3 - .getVertexId(), "vertex3", initRequestedTime, initedTime, 0, "", - null)); - assertEquals(VertexState.INITED, recoveredState); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - vertex3.handle(new VertexEventManagerUserCodeError(vertex3.getVertexId(), - new AMUserCodeException(Source.VertexManager, new TezUncheckedException("test")))); - - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - restoreFromInitializedEvent(vertex2); - vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex2.getState()); - - // v3 FAILED due to user code error - assertEquals(VertexState.FAILED, vertex3.getState()); - Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertex3.getTerminationCause()); - assertEquals(2, vertex3.numRecoveredSourceVertices); - } - - - /** - * vertex1 (New) -> restoreFromInitialized -> StartRecoveryTransition<br> - * vertex2 (New) -> restoreFromInitialized -> StartRecoveryTransition<br> - * vertex3 (New) -> restoreFromVertexInitedEvent -> RecoverTransition<br> - */ - @Test(timeout = 5000) - public void testRecovery_RecoveringFromInited() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - VertexState recoveredState = - vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex3.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.NEW, recoveredState); - assertEquals(1, vertex3.recoveredEvents.size()); - recoveredState = - vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3 - .getVertexId(), "vertex3", initRequestedTime, initedTime, 2, "", - null)); - assertEquals(VertexState.INITED, recoveredState); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - restoreFromInitializedEvent(vertex2); - vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex2.getState()); - - // v3 go to RUNNING because v1 and v2 both start - assertEquals(VertexState.RUNNING, vertex3.getState()); - assertEquals(2, vertex3.numRecoveredSourceVertices); - // numInitedSourceVertices is wrong but doesn't matter because v3 has - // already initialized - assertEquals(2, vertex3.numInitedSourceVertices); - assertEquals(2, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - // RootInputDataInformation is removed - assertEquals(0, vertex3.recoveredEvents.size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex3); - // 1 for vertex1, 2 for vertex2, the second 2 for vertex3 - assertTaskRecoveredEventSent(vertex1); - assertTaskRecoveredEventSent(vertex2); - assertTaskRecoveredEventSent(vertex3); - } - - /** - * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * StartRecoveryTransition <br> - * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> StartRecoveryTransition <br> - * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_RecoveringFromRunning() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(1, vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - recoveredState = - vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex3.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.NEW, recoveredState); - assertEquals(1, vertex3.recoveredEvents.size()); - recoveredState = - vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3 - .getVertexId(), "vertex3", initRequestedTime, initedTime, vertex3.getTotalTasks(), "", - null)); - assertEquals(VertexState.INITED, recoveredState); - recoveredState = - vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - recoveredState = vertex2.restoreFromEvent(new VertexInitializedEvent(vertex2.getVertexId(), - "vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - - vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex2.getState()); - - // v3 go to RUNNING because v1 and v2 both start - assertEquals(VertexState.RUNNING, vertex3.getState()); - assertEquals(2, vertex3.numRecoveredSourceVertices); - assertEquals(2, vertex3.numInitedSourceVertices); - assertEquals(2, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - // RootInputDataInformation is removed - assertEquals(0, vertex3.recoveredEvents.size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex3); - - assertTaskRecoveredEventSent(vertex1); - assertTaskRecoveredEventSent(vertex2); - assertTaskRecoveredEventSent(vertex3); - } - - /** - * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished -> StartRecoveryTransition<br> - * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished -> StartRecoveryTransition<br> - * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_RecoveringFromSUCCEEDED() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - - recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), - "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.SUCCEEDED, recoveredState); - - vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex1.getState()); - assertEquals(1, vertex1.getTasks().size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex1); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - recoveredState = - vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent( - vertex3.getVertexId(), Lists.newArrayList(createTezEvent()))); - assertEquals(VertexState.NEW, recoveredState); - assertEquals(1, vertex3.recoveredEvents.size()); - restoreFromInitializedEvent(vertex3); - recoveredState = - vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - // wait for recovery of vertex2 - assertEquals(VertexState.RECOVERING, vertex3.getState()); - assertEquals(1, vertex3.numRecoveredSourceVertices); - assertEquals(1, vertex3.numInitedSourceVertices); - assertEquals(1, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - recoveredState = vertex2.restoreFromEvent(new VertexInitializedEvent(vertex2.getVertexId(), - "vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", null)); - assertEquals(VertexState.INITED, recoveredState); - recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(), - VertexState.RUNNING)); - dispatcher.await(); - assertEquals(VertexState.RUNNING, vertex2.getState()); - - // v3 go to RUNNING because v1 and v2 both start - assertEquals(VertexState.RUNNING, vertex3.getState()); - assertEquals(2, vertex3.numRecoveredSourceVertices); - assertEquals(2, vertex3.numInitedSourceVertices); - assertEquals(2, vertex3.numStartedSourceVertices); - assertEquals(1, vertex3.getDistanceFromRoot()); - // RootInputDataInformation is removed - assertEquals(0, vertex3.recoveredEvents.size()); - // verify OutputCommitter is initialized - assertOutputCommitters(vertex3); - - assertTaskRecoveredEventSent(vertex1); - assertTaskRecoveredEventSent(vertex2); - assertTaskRecoveredEventSent(vertex3); - } - - /** - * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished (KILLED) - * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished (KILLED) - * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished (KILLED) - */ - @Test(timeout = 5000) - public void testRecovery_KilledBeforeTaskStarted() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), - "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.KILLED, recoveredState); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - restoreFromInitializedEvent(vertex2); - recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - recoveredState = vertex2.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), - "vertex2", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.KILLED, recoveredState); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - restoreFromInitializedEvent(vertex3); - recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(), - "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.KILLED, recoveredState); - - // start the recovering, send RecoverEvent to its root vertices (v1, v2) - dag.handle(new DAGEventRecoverEvent(dag.getID(), null)); - dispatcher.await(); - // recover v1 to KILLED directly and also its tasks are recovered to KILLED - assertEquals(VertexState.KILLED, vertex1.getState()); - for (Task task : vertex1.tasks.values()) { - assertEquals(TaskState.KILLED, task.getState()); - } - // recover v2 to KILLED directly and also its tasks are recovered to KILLED - assertEquals(VertexState.KILLED, vertex2.getState()); - for (Task task : vertex2.tasks.values()) { - assertEquals(TaskState.KILLED, task.getState()); - } - // recover v3 to KILLED directly and also its tasks are recovered to KILLED - assertEquals(VertexState.KILLED, vertex3.getState()); - for (Task task : vertex3.tasks.values()) { - assertEquals(TaskState.KILLED, task.getState()); - } - } - - /** - * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished (FAILED) - * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted - * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> - * restoreFromVertexFinished (FAILED) - */ - @Test(timeout = 5000) - public void testRecovery_FailedBeforeTaskStarted() { - VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); - restoreFromInitializedEvent(vertex1); - VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), - "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.FAILED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.FAILED, recoveredState); - - VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); - restoreFromInitializedEvent(vertex2); - recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - assertEquals(VertexState.RUNNING, recoveredState); - - VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); - restoreFromInitializedEvent(vertex3); - recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), - initRequestedTime + 100L, initRequestedTime + 200L)); - recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(), - "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L, - initRequestedTime + 400L, initRequestedTime + 500L, - VertexState.FAILED, "", new TezCounters(), new VertexStats(), null)); - assertEquals(VertexState.FAILED, recoveredState); - - // start the recovering from DAG - dag.handle(new DAGEventRecoverEvent(dag.getID(), null)); - dispatcher.await(); - // recover v1 to KILLED directly and also its tasks are recovered to KILLED - assertEquals(VertexState.FAILED, vertex1.getState()); - for (Task task : vertex1.tasks.values()) { - assertEquals(TaskState.FAILED, task.getState()); - } - // recover v2 to KILLED finally due to v1/v3 failed will cause dag failed which result in - // dag kill all its vertices - assertEquals(VertexState.KILLED, vertex2.getState()); - - // recover v3 to KILLED directly and also its tasks are recovered to KILLED - assertEquals(VertexState.FAILED, vertex3.getState()); - for (Task task : vertex3.tasks.values()) { - assertEquals(TaskState.FAILED, task.getState()); - } - assertEquals(DAGState.FAILED, dag.getState()); - } -}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 57a849b..14aed3d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -61,24 +61,29 @@ import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; + public class TestHistoryEventsProtoConversion { private static final Logger LOG = LoggerFactory.getLogger( @@ -270,10 +275,12 @@ public class TestHistoryEventsProtoConversion { } private void testVertexInitializedEvent() throws Exception { + List<TezEvent> initGeneratedEvents = Lists.newArrayList( + new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap(new byte[0])), null)); VertexInitializedEvent event = new VertexInitializedEvent( TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), - "vertex1", 1000l, 15000l, 100, "procName", null); + "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents); VertexInitializedEvent deserializedEvent = (VertexInitializedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); @@ -286,6 +293,11 @@ public class TestHistoryEventsProtoConversion { Assert.assertEquals(event.getAdditionalInputs(), deserializedEvent.getAdditionalInputs()); Assert.assertNull(deserializedEvent.getProcessorName()); + Assert.assertEquals(1, event.getInitGeneratedEvents().size()); + Assert.assertEquals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT, + event.getInitGeneratedEvents().get(0).getEventType()); + Assert.assertEquals(event.getInitGeneratedEvents().size(), + deserializedEvent.getInitGeneratedEvents().size()); logEvents(event, deserializedEvent); } @@ -304,85 +316,72 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } - private void testVertexParallelismUpdatedEvent() throws Exception { - { - InputSpecUpdate rootInputSpecUpdateBulk = InputSpecUpdate - .createAllTaskInputSpecUpdate(2); - InputSpecUpdate rootInputSpecUpdatePerTask = InputSpecUpdate - .createPerTaskInputSpecUpdate(Lists.newArrayList(1, 2, 3)); - Map<String, InputSpecUpdate> rootInputSpecUpdates = new HashMap<String, InputSpecUpdate>(); - rootInputSpecUpdates.put("input1", rootInputSpecUpdateBulk); - rootInputSpecUpdates.put("input2", rootInputSpecUpdatePerTask); - VertexParallelismUpdatedEvent event = - new VertexParallelismUpdatedEvent( - TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), - 100, null, null, rootInputSpecUpdates, 1); - VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent) + private void testVertexReconfigureDoneEvent() throws Exception { + VertexLocationHint vertexLocationHint = VertexLocationHint.create(new ArrayList<TaskLocationHint>()); + InputSpecUpdate rootInputSpecUpdateBulk = InputSpecUpdate + .createAllTaskInputSpecUpdate(2); + InputSpecUpdate rootInputSpecUpdatePerTask = InputSpecUpdate + .createPerTaskInputSpecUpdate(Lists.newArrayList(1, 2, 3)); + Map<String, InputSpecUpdate> rootInputSpecUpdates = new HashMap<String, InputSpecUpdate>(); + rootInputSpecUpdates.put("input1", rootInputSpecUpdateBulk); + rootInputSpecUpdates.put("input2", rootInputSpecUpdatePerTask); + + Map<String, EdgeProperty> sourceEdgeManagers + = new HashMap<String, EdgeProperty>(); + // add standard and custom edge + sourceEdgeManagers.put("foo", EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); + sourceEdgeManagers.put("foo1", EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1") + .setUserPayload( + UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100)), + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); + + VertexConfigurationDoneEvent event = + new VertexConfigurationDoneEvent( + TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), + 100, 2, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, true); + VertexConfigurationDoneEvent deserializedEvent = (VertexConfigurationDoneEvent) testProtoConversion(event); - Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); - Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks()); - Assert.assertEquals(event.getSourceEdgeProperties(), - deserializedEvent.getSourceEdgeProperties()); - Assert.assertEquals(event.getVertexLocationHint(), - deserializedEvent.getVertexLocationHint()); - Assert.assertEquals(event.getRootInputSpecUpdates().size(), deserializedEvent - .getRootInputSpecUpdates().size()); - InputSpecUpdate deserializedBulk = deserializedEvent.getRootInputSpecUpdates().get("input1"); - InputSpecUpdate deserializedPerTask = deserializedEvent.getRootInputSpecUpdates().get("input2"); - Assert.assertEquals(rootInputSpecUpdateBulk.isForAllWorkUnits(), - deserializedBulk.isForAllWorkUnits()); - Assert.assertEquals(rootInputSpecUpdateBulk.getAllNumPhysicalInputs(), - deserializedBulk.getAllNumPhysicalInputs()); - Assert.assertEquals(rootInputSpecUpdatePerTask.isForAllWorkUnits(), - deserializedPerTask.isForAllWorkUnits()); - Assert.assertEquals(rootInputSpecUpdatePerTask.getAllNumPhysicalInputs(), - deserializedPerTask.getAllNumPhysicalInputs()); - logEvents(event, deserializedEvent); - } - { - Map<String, EdgeProperty> sourceEdgeManagers - = new LinkedHashMap<String, EdgeProperty>(); - // add standard and custom edge - sourceEdgeManagers.put("foo", EdgeProperty.create(DataMovementType.SCATTER_GATHER, - DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, - OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); - sourceEdgeManagers.put("foo1", EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1") - .setUserPayload( - UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100)), - DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, - OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); - VertexParallelismUpdatedEvent event = - new VertexParallelismUpdatedEvent( - TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), - 100, VertexLocationHint.create(Arrays.asList(TaskLocationHint.createTaskLocationHint( - new HashSet<String>(Arrays.asList("h1")), - new HashSet<String>(Arrays.asList("r1"))))), - sourceEdgeManagers, null, 1); - - VertexParallelismUpdatedEvent deserializedEvent = - (VertexParallelismUpdatedEvent) testProtoConversion(event); - Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); - Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks()); - Assert.assertEquals(event.getSourceEdgeProperties().size(), deserializedEvent - .getSourceEdgeProperties().size()); - Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(), - deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType()); - Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo") - .getEdgeManagerDescriptor()); - Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(), - deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType()); - Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor() - .getUserPayload().getVersion(), deserializedEvent.getSourceEdgeProperties().get("foo1") - .getEdgeManagerDescriptor().getUserPayload().getVersion()); - Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1") - .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), deserializedEvent - .getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload() - .deepCopyAsArray()); - Assert.assertEquals(event.getVertexLocationHint(), deserializedEvent.getVertexLocationHint()); - logEvents(event, deserializedEvent); - } + Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); + Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks()); + Assert.assertEquals(event.isSetParallelismCalled(), deserializedEvent.isSetParallelismCalled()); + // vertexLocationHint + Assert.assertEquals(event.getVertexLocationHint(), + deserializedEvent.getVertexLocationHint()); + // rootInputSpec + Assert.assertEquals(event.getRootInputSpecUpdates().size(), deserializedEvent + .getRootInputSpecUpdates().size()); + InputSpecUpdate deserializedBulk = deserializedEvent.getRootInputSpecUpdates().get("input1"); + InputSpecUpdate deserializedPerTask = deserializedEvent.getRootInputSpecUpdates().get("input2"); + Assert.assertEquals(rootInputSpecUpdateBulk.isForAllWorkUnits(), + deserializedBulk.isForAllWorkUnits()); + Assert.assertEquals(rootInputSpecUpdateBulk.getAllNumPhysicalInputs(), + deserializedBulk.getAllNumPhysicalInputs()); + Assert.assertEquals(rootInputSpecUpdatePerTask.isForAllWorkUnits(), + deserializedPerTask.isForAllWorkUnits()); + Assert.assertEquals(rootInputSpecUpdatePerTask.getAllNumPhysicalInputs(), + deserializedPerTask.getAllNumPhysicalInputs()); + // sourceEdgeManager + Assert.assertEquals(event.getSourceEdgeProperties().size(), deserializedEvent + .getSourceEdgeProperties().size()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(), + deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType()); + Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo") + .getEdgeManagerDescriptor()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(), + deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor() + .getUserPayload().getVersion(), deserializedEvent.getSourceEdgeProperties().get("foo1") + .getEdgeManagerDescriptor().getUserPayload().getVersion()); + Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1") + .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), deserializedEvent + .getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload() + .deepCopyAsArray()); + + logEvents(event, deserializedEvent); } private void testVertexFinishedEvent() throws Exception { @@ -500,7 +499,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null, null, null, 2048, + null, null, null, null, null, 2048, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) @@ -536,7 +535,8 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events, 0, null, 0); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events, + null, 0, null, 0); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -594,35 +594,6 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } - private void testVertexDataMovementEventsGeneratedEvent() throws Exception { - VertexRecoverableEventsGeneratedEvent event; - try { - event = new VertexRecoverableEventsGeneratedEvent( - TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null); - fail("Invalid creation should have errored out"); - } catch (RuntimeException e) { - // Expected - } - long eventTime = 1024; - List<TezEvent> events = - Arrays.asList(new TezEvent(DataMovementEvent.create(1, null), - new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime)); - event = new VertexRecoverableEventsGeneratedEvent( - TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events); - VertexRecoverableEventsGeneratedEvent deserializedEvent = - (VertexRecoverableEventsGeneratedEvent) testProtoConversion(event); - Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); - Assert.assertEquals(1, - deserializedEvent.getTezEvents().size()); - Assert.assertEquals(event.getTezEvents().get(0).getEventType(), - deserializedEvent.getTezEvents().get(0).getEventType()); - Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(), - deserializedEvent.getTezEvents().get(0).getEventReceivedTime()); - logEvents(event, deserializedEvent); - } - private void testDAGCommitStartedEvent() throws Exception { DAGCommitStartedEvent event = new DAGCommitStartedEvent( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100l); @@ -643,15 +614,21 @@ public class TestHistoryEventsProtoConversion { } private void testVertexGroupCommitStartedEvent() throws Exception { + TezVertexID vertexId1 = TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 1); + TezVertexID vertexId2 = TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 2); + Collection<TezVertexID> vertexIds = Lists.newArrayList(vertexId1, vertexId2); VertexGroupCommitStartedEvent event = new VertexGroupCommitStartedEvent( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), - "fooGroup", 1000344l); + "fooGroup", vertexIds, 1000344l); { VertexGroupCommitStartedEvent deserializedEvent = (VertexGroupCommitStartedEvent) testProtoConversion(event); Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID()); Assert.assertEquals(event.getVertexGroupName(), deserializedEvent.getVertexGroupName()); + Assert.assertEquals(event.getVertexIds(), vertexIds); logEvents(event, deserializedEvent); } { @@ -664,15 +641,21 @@ public class TestHistoryEventsProtoConversion { } private void testVertexGroupCommitFinishedEvent() throws Exception { + TezVertexID vertexId1 = TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 1); + TezVertexID vertexId2 = TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 2); + Collection<TezVertexID> vertexIds = Lists.newArrayList(vertexId1, vertexId2); VertexGroupCommitFinishedEvent event = new VertexGroupCommitFinishedEvent( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), - "fooGroup", 1000344l); + "fooGroup", vertexIds, 1000344l); { VertexGroupCommitFinishedEvent deserializedEvent = (VertexGroupCommitFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID()); Assert.assertEquals(event.getVertexGroupName(), deserializedEvent.getVertexGroupName()); + Assert.assertEquals(event.getVertexIds(), vertexIds); logEvents(event, deserializedEvent); } { @@ -685,7 +668,7 @@ public class TestHistoryEventsProtoConversion { } - @Test(timeout = 5000) + @Test//(timeout = 5000) public void testDefaultProtoConversion() throws Exception { for (HistoryEventType eventType : HistoryEventType.values()) { switch (eventType) { @@ -716,8 +699,8 @@ public class TestHistoryEventsProtoConversion { case VERTEX_STARTED: testVertexStartedEvent(); break; - case VERTEX_PARALLELISM_UPDATED: - testVertexParallelismUpdatedEvent(); + case VERTEX_CONFIGURE_DONE: + testVertexReconfigureDoneEvent(); break; case VERTEX_FINISHED: testVertexFinishedEvent(); @@ -740,9 +723,6 @@ public class TestHistoryEventsProtoConversion { case CONTAINER_STOPPED: testContainerStoppedEvent(); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - testVertexDataMovementEventsGeneratedEvent(); - break; case DAG_COMMIT_STARTED: testDAGCommitStartedEvent(); break; http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index bcc3859..606fb85 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -59,12 +59,11 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.utils.DAGUtils; import org.apache.tez.dag.records.TaskAttemptTerminationCause; @@ -139,13 +138,13 @@ public class TestHistoryEventJsonConversion { break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), - random.nextInt(), "proc", null); + random.nextInt(), "proc", null, null); break; case VERTEX_STARTED: event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); break; - case VERTEX_PARALLELISM_UPDATED: - event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 10); + case VERTEX_CONFIGURE_DONE: + event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, null, null, true); break; case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), @@ -165,7 +164,8 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null, 0, null, 0); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, + null, null, null, null, 0, null, 0); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -174,9 +174,6 @@ public class TestHistoryEventJsonConversion { case CONTAINER_STOPPED: event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - event = new VertexRecoverableEventsGeneratedEvent(); - break; case DAG_COMMIT_STARTED: event = new DAGCommitStartedEvent(); break; @@ -207,7 +204,7 @@ public class TestHistoryEventJsonConversion { } @Test(timeout = 5000) - public void testConvertVertexParallelismUpdatedEvent() throws JSONException { + public void testConvertVertexReconfigureDoneEvent() throws JSONException { TezVertexID vId = TezVertexID.getInstance( TezDAGID.getInstance( ApplicationId.newInstance(1l, 1), 1), 1); @@ -217,8 +214,8 @@ public class TestHistoryEventJsonConversion { edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class") .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))); - VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, - edgeMgrs, null, 10); + VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null, + edgeMgrs, null, true); JSONObject jsonObject = HistoryEventJsonConversion.convertToJson(event); Assert.assertNotNull(jsonObject); @@ -229,12 +226,11 @@ public class TestHistoryEventJsonConversion { Assert.assertEquals(1, events.length()); JSONObject evt = events.getJSONObject(0); - Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), + Assert.assertEquals(HistoryEventType.VERTEX_CONFIGURE_DONE.name(), evt.getString(ATSConstants.EVENT_TYPE)); JSONObject evtInfo = evt.getJSONObject(ATSConstants.EVENT_INFO); Assert.assertEquals(1, evtInfo.getInt(ATSConstants.NUM_TASKS)); - Assert.assertEquals(10, evtInfo.getInt(ATSConstants.OLD_NUM_TASKS)); Assert.assertNotNull(evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS)); JSONObject updatedEdgeMgrs = evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS); @@ -247,9 +243,6 @@ public class TestHistoryEventJsonConversion { Assert.assertEquals("In", updatedEdgeMgr.getString(DAGUtils.EDGE_DESTINATION_CLASS_KEY)); Assert.assertEquals("a.class", updatedEdgeMgr.getString(DAGUtils.EDGE_MANAGER_CLASS_KEY)); - JSONObject otherInfo = jsonObject.getJSONObject(ATSConstants.OTHER_INFO); - Assert.assertEquals(1, otherInfo.getInt(ATSConstants.NUM_TASKS)); - } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 5922100..281eaa9 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.commons.cli.Options; @@ -32,6 +33,7 @@ import org.apache.tez.client.CallerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.security.UserGroupInformation; @@ -263,4 +265,14 @@ public abstract class TezExampleBase extends Configured implements Tool { */ protected abstract int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws Exception; + + @Private + @VisibleForTesting + public ApplicationId getAppId() { + if (tezClientInternal == null) { + LOG.warn("TezClient is not initialized, return null for AppId"); + return null; + } + return tezClientInternal.getAppMasterApplicationId(); + } }
