http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java deleted file mode 100644 index 1aba5fa..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.dag.impl; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; -import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; -import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEventHandler; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; -import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; -import org.apache.tez.dag.history.events.TaskFinishedEvent; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import com.google.common.collect.Lists; - -@SuppressWarnings({ "unchecked", "rawtypes" }) -public class TestTaskAttemptRecovery { - - private TaskAttemptImpl ta; - private EventHandler mockEventHandler; - private long creationTime = System.currentTimeMillis(); - private long allocationTime = creationTime + 5000; - private long startTime = allocationTime + 5000; - private long finishTime = startTime + 5000; - - private TezTaskAttemptID taId; - private String vertexName = "v1"; - - private AppContext mockAppContext; - private MockHistoryEventHandler mockHistoryEventHandler; - private Task mockTask; - private Vertex mockVertex; - - public static class MockHistoryEventHandler extends HistoryEventHandler { - - private List<DAGHistoryEvent> events; - - public MockHistoryEventHandler(AppContext context) { - super(context); - events = new ArrayList<DAGHistoryEvent>(); - } - - @Override - public void handle(DAGHistoryEvent event) { - events.add(event); - } - - @Override - public void handleCriticalEvent(DAGHistoryEvent event) throws IOException { - events.add(event); - } - - void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) { - int actualTimes = 0; - for (DAGHistoryEvent event : events) { - if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { - TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent(); - if (tfEvent.getTaskAttemptID().equals(taId) && - tfEvent.getState().equals(finalState)) { - actualTimes ++; - } - } - } - assertEquals(expectedTimes, actualTimes); - } - - void verifyTaskFinishedEvent(TezTaskID taskId, TaskState finalState, int expectedTimes) { - int actualTimes = 0; - for (DAGHistoryEvent event : events) { - if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_FINISHED) { - TaskFinishedEvent tfEvent = (TaskFinishedEvent)event.getHistoryEvent(); - if (tfEvent.getTaskID().equals(taskId) && tfEvent.getState().equals(finalState)) { - actualTimes ++; - } - } - } - assertEquals(expectedTimes, actualTimes); - } - } - - @Before - public void setUp() { - mockTask = mock(Task.class); - mockVertex = mock(Vertex.class); - when(mockTask.getVertex()).thenReturn(mockVertex); - mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class)) - .getTask(any(TezTaskID.class))) - .thenReturn(mockTask); - mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext); - when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler); - mockEventHandler = mock(EventHandler.class); - TezTaskID taskId = - TezTaskID.fromString("task_1407371892933_0001_1_00_000000"); - ta = - new TaskAttemptImpl(taskId, 0, mockEventHandler, - mock(TaskCommunicatorManagerInterface.class), new Configuration(), - new SystemClock(), mock(TaskHeartbeatHandler.class), - mockAppContext, false, Resource.newInstance(1, 1), - mock(ContainerContext.class), false, mockTask); - taId = ta.getID(); - } - - private void restoreFromTAStartEvent() { - TaskAttemptState recoveredState = - ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); - assertEquals(startTime, ta.getLaunchTime()); - assertEquals(TaskAttemptState.RUNNING, recoveredState); - } - - private void restoreFromTAFinishedEvent(TaskAttemptState state) { - String diag = "test_diag"; - TezCounters counters = mock(TezCounters.class); - TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1); - - TaskAttemptTerminationCause errorEnum = null; - if (state != TaskAttemptState.SUCCEEDED) { - errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR; - } - - long lastDataEventTime = 1024; - TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class); - List<DataEventDependencyInfo> events = Lists.newLinkedList(); - events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); - events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); - TaskAttemptState recoveredState = - ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - startTime, finishTime, state, errorEnum, diag, counters, events, creationTime, - causalId, allocationTime)); - assertEquals(causalId, ta.getCreationCausalAttempt()); - assertEquals(creationTime, ta.getCreationTime()); - assertEquals(allocationTime, ta.getAllocationTime()); - assertEquals(startTime, ta.getLaunchTime()); - assertEquals(finishTime, ta.getFinishTime()); - assertEquals(counters, ta.reportedStatus.counters); - assertEquals(1.0f, ta.reportedStatus.progress, 1e-6); - assertEquals(state, ta.reportedStatus.state); - assertEquals(1, ta.getDiagnostics().size()); - assertEquals(diag, ta.getDiagnostics().get(0)); - assertEquals(state, recoveredState); - assertEquals(events.size(), ta.lastDataEvents.size()); - assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp()); - assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId()); - if (state != TaskAttemptState.SUCCEEDED) { - assertEquals(errorEnum, ta.getTerminationCause()); - } else { - assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause()); - } - } - - private void verifyEvents(List<Event> events, Class<? extends Event> eventClass, - int expectedTimes) { - int actualTimes = 0; - for (Event event : events) { - if (eventClass.isInstance(event)) { - actualTimes ++; - } - } - assertEquals(expectedTimes, actualTimes); - } - - /** - * No any event to restore -> RecoverTransition - */ - @Test(timeout = 5000) - public void testTARecovery_NEW() { - ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); - assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); - - ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); - verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); - List<Event> events = eventCaptor.getAllValues(); - assertEquals(2, events.size()); - verifyEvents(events, TaskEventTAUpdate.class, 1); - // one for task killed - verifyEvents(events, DAGEventCounterUpdate.class, 1); - } - - /** - * restoreFromTAStartEvent -> RecoverTransition - */ - @Test(timeout = 5000) - public void testTARecovery_START() { - restoreFromTAStartEvent(); - - ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); - assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); - - ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); - verify(mockEventHandler, times(3)).handle(eventCaptor.capture()); - List<Event> events = eventCaptor.getAllValues(); - assertEquals(3, events.size()); - verifyEvents(events, TaskEventTAUpdate.class, 1); - // one for task launch, one for task killed - verifyEvents(events, DAGEventCounterUpdate.class, 2); - - mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1); - } - - /** - * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED) - * -> RecoverTransition - */ - @Test(timeout = 5000) - public void testTARecovery_SUCCEED() { - restoreFromTAStartEvent(); - restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED); - - ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); - assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState()); - - ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); - verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); - List<Event> events = eventCaptor.getAllValues(); - assertEquals(2, events.size()); - // one for task launch, one for task succeeded - verifyEvents(events, DAGEventCounterUpdate.class, 2); - } - - /** - * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED) - * -> RecoverTransition - */ - @Test(timeout = 5000) - public void testTARecovery_KIILED() { - restoreFromTAStartEvent(); - restoreFromTAFinishedEvent(TaskAttemptState.KILLED); - - ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); - assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); - - ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); - verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); - List<Event> events = eventCaptor.getAllValues(); - assertEquals(2, events.size()); - // one for task launch, one for task killed - verifyEvents(events, DAGEventCounterUpdate.class, 2); - } - - /** - * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED) - * -> RecoverTransition - */ - @Test(timeout = 5000) - public void testTARecovery_FAILED() { - restoreFromTAStartEvent(); - restoreFromTAFinishedEvent(TaskAttemptState.FAILED); - - ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); - assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState()); - - ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); - verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); - List<Event> events = eventCaptor.getAllValues(); - assertEquals(2, events.size()); - // one for task launch, one for task killed - verifyEvents(events, DAGEventCounterUpdate.class, 2); - } - - /** - * restoreFromTAFinishedEvent ( killed before started) - */ - @Test(timeout = 5000) - public void testRecover_FINISH_BUT_NO_START() { - TaskAttemptState recoveredState = - ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - startTime, finishTime, TaskAttemptState.KILLED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskAttemptState.KILLED, recoveredState); - } -}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 24c9664..0414c99 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -163,7 +163,7 @@ public class TestTaskImpl { } private void scheduleTaskAttempt(TezTaskID taskId) { - mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint)); + mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false)); assertTaskScheduledState(); assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec()); assertEquals(locationHint, mockTask.getTaskLocationHint()); @@ -762,8 +762,7 @@ public class TestTaskImpl { boolean isRescheduled, Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), - schedCausalTA); + appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java deleted file mode 100644 index bea423a..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ /dev/null @@ -1,873 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.dag.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.client.VertexStatus.State; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; -import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.StateChangeNotifier; -import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; -import org.apache.tez.dag.app.dag.TaskStateInternal; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.event.DAGEventType; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.dag.app.dag.event.TaskEvent; -import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; -import org.apache.tez.dag.app.dag.event.TaskEventType; -import org.apache.tez.dag.app.dag.event.VertexEventType; -import org.apache.tez.dag.app.dag.impl.TestTaskAttemptRecovery.MockHistoryEventHandler; -import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; -import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; -import org.apache.tez.dag.history.events.TaskFinishedEvent; -import org.apache.tez.dag.history.events.TaskStartedEvent; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.OutputCommitter; -import org.apache.tez.runtime.api.OutputCommitterContext; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class TestTaskRecovery { - - private TaskImpl task; - private DrainDispatcher dispatcher; - - private int taskAttemptCounter = 0; - - private Configuration conf = new Configuration(); - private AppContext mockAppContext; - private MockHistoryEventHandler mockHistoryEventHandler; - private ApplicationId appId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - private TezDAGID dagId = TezDAGID.getInstance(appId, 1); - private TezVertexID vertexId = TezVertexID.getInstance(dagId, 1); - private Vertex vertex; - private String vertexName = "v1"; - private long taskScheduledTime = 100L; - private long taskStartTime = taskScheduledTime + 100L; - private long taskFinishTime = taskStartTime + 100L; - private TaskAttemptEventHandler taEventHandler = - new TaskAttemptEventHandler(); - - private class TaskEventHandler implements EventHandler<TaskEvent> { - @Override - public void handle(TaskEvent event) { - task.handle(event); - } - } - - private class TaskAttemptEventHandler implements - EventHandler<TaskAttemptEvent> { - - private List<TaskAttemptEvent> events = Lists.newArrayList(); - - @Override - public void handle(TaskAttemptEvent event) { - events.add(event); - ((TaskAttemptImpl) task.getAttempt(event.getTaskAttemptID())) - .handle(event); - } - - public List<TaskAttemptEvent> getEvents() { - return events; - } - } - - private class TestOutputCommitter extends OutputCommitter { - - boolean recoverySupported = false; - boolean throwExceptionWhenRecovery = false; - - public TestOutputCommitter(OutputCommitterContext committerContext, - boolean recoverySupported, boolean throwExceptionWhenRecovery) { - super(committerContext); - this.recoverySupported = recoverySupported; - this.throwExceptionWhenRecovery = throwExceptionWhenRecovery; - } - - @Override - public void recoverTask(int taskIndex, int previousDAGAttempt) - throws Exception { - if (throwExceptionWhenRecovery) { - throw new Exception("fail recovery Task"); - } - } - - @Override - public boolean isTaskRecoverySupported() { - return recoverySupported; - } - - @Override - public void initialize() throws Exception { - - } - - @Override - public void setupOutput() throws Exception { - - } - - @Override - public void commitOutput() throws Exception { - - } - - @Override - public void abortOutput(State finalState) throws Exception { - - } - - } - - @Before - public void setUp() { - dispatcher = new DrainDispatcher(); - dispatcher.register(DAGEventType.class, mock(EventHandler.class)); - dispatcher.register(VertexEventType.class, mock(EventHandler.class)); - dispatcher.register(TaskEventType.class, new TaskEventHandler()); - dispatcher.register(TaskAttemptEventType.class, taEventHandler); - dispatcher.init(new Configuration()); - dispatcher.start(); - - vertex = mock(Vertex.class, RETURNS_DEEP_STUBS); - when(vertex.getProcessorDescriptor().getClassName()).thenReturn(""); - - mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))) - .thenReturn(vertex); - mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext); - when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler); - task = - new TaskImpl(vertexId, 0, dispatcher.getEventHandler(), - new Configuration(), mock(TaskCommunicatorManagerInterface.class), - new SystemClock(), mock(TaskHeartbeatHandler.class), - mockAppContext, false, Resource.newInstance(1, 1), - mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex); - - Map<String, OutputCommitter> committers = - new HashMap<String, OutputCommitter>(); - committers.put("out1", new TestOutputCommitter( - mock(OutputCommitterContext.class), true, false)); - when(task.getVertex().getOutputCommitters()).thenReturn(committers); - } - - private void restoreFromTaskStartEvent() { - TaskState recoveredState = - task.restoreFromEvent(new TaskStartedEvent(task.getTaskId(), - vertexName, taskScheduledTime, taskStartTime)); - assertEquals(TaskState.SCHEDULED, recoveredState); - assertEquals(0, task.getFinishedAttemptsCount()); - assertEquals(taskScheduledTime, task.scheduledTime); - assertEquals(0, task.getAttempts().size()); - } - - private void restoreFromFirstTaskAttemptStartEvent(TezTaskAttemptID taId) { - long taStartTime = taskStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(0, task.getFinishedAttemptsCount()); - assertEquals(taskScheduledTime, task.scheduledTime); - assertEquals(1, task.getAttempts().size()); - assertEquals(TaskAttemptStateInternal.NEW, - ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); - assertEquals(1, task.getUncompletedAttemptsCount()); - } - - /** - * New -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_New() { - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.NEW, task.getInternalState()); - } - - /** - * -> restoreFromTaskFinishEvent ( no TaskStartEvent ) - */ - @Test(timeout = 5000) - public void testRecovery_NoStartEvent() { - try { - task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName, - taskStartTime, taskFinishTime, null, TaskState.SUCCEEDED, "", - new TezCounters(), 0)); - fail("Should fail due to no TaskStartEvent before TaskFinishEvent"); - } catch (Throwable e) { - assertTrue(e.getMessage().contains( - "Finished Event seen but" - + " no Started Event was encountered earlier")); - } - } - - /** - * -> restoreFromTaskFinishEvent ( no TaskStartEvent ) - */ - @Test(timeout = 5000) - public void testRecoveryNewToKilled_NoStartEvent() { - task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName, - taskStartTime, taskFinishTime, null, TaskState.KILLED, "", - new TezCounters(), 0)); - } - - /** - * restoreFromTaskStartedEvent -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_Started() { - restoreFromTaskStartEvent(); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // new task attempt is scheduled - assertEquals(1, task.getAttempts().size()); - assertEquals(0, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (KILLED) -> - * RecoverTranstion - */ - @Test(timeout = 5000) - public void testRecovery_OnlyTAFinishedEvent_KILLED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null, 0, null, 0)); - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // wait for the second task attempt is scheduled - dispatcher.await(); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (FAILED) -> - * RecoverTranstion - */ - @Test(timeout = 5000) - public void testRecovery_OnlyTAFinishedEvent_FAILED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null, 0, null, 0)); - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // wait for the second task attempt is scheduled - dispatcher.await(); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // taskAttempt_1 is recovered to FAILED, and new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(1, task.failedAttempts); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> - * RecoverTranstion - */ - @Test(timeout = 5000) - public void testRecovery_OnlyTAFinishedEvent_SUCCEEDED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - try { - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null, 0, null, 0)); - fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)"); - } catch (TezUncheckedException e) { - assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover")); - } - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * RecoverTranstion - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // wait for the second task attempt is scheduled - dispatcher.await(); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_SUCCEEDED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (FAILED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_FAILED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(1, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(1, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_KILLED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> - * restoreFromTaskFinishedEvent -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_SUCCEEDED_Finished() { - - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - recoveredState = - task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), - vertexName, taskStartTime, taskFinishTime, taId, - TaskState.SUCCEEDED, "", new TezCounters(), 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(taId, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> - * restoreFromTaskAttemptFinishedEvent (Failed due to output_failure) - * restoreFromTaskFinishedEvent -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_SUCCEEDED_FAILED() { - - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure. - recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(1, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(1, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> - * restoreFromTaskAttemptFinishedEvent (KILLED due to node failed ) - * restoreFromTaskFinishedEvent -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_SUCCEEDED_KILLED() { - - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure. - recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_Commit_Failed_Recovery_Not_Supported() { - Map<String, OutputCommitter> committers = - new HashMap<String, OutputCommitter>(); - committers.put("out1", new TestOutputCommitter( - mock(OutputCommitterContext.class), false, false)); - when(task.getVertex().getOutputCommitters()).thenReturn(committers); - - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - // restoreFromTaskAttemptFinishedEvent (SUCCEEDED) - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_Commit_Failed_recover_fail() { - Map<String, OutputCommitter> committers = - new HashMap<String, OutputCommitter>(); - committers.put("out1", new TestOutputCommitter( - mock(OutputCommitterContext.class), true, true)); - when(task.getVertex().getOutputCommitters()).thenReturn(committers); - - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - - // restoreFromTaskAttemptFinishedEvent (SUCCEEDED) - long taStartTime = taskStartTime + 100L; - long taFinishTime = taStartTime + 100L; - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.SUCCEEDED, recoveredState); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(taId, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - @Test(timeout = 5000) - public void testRecovery_WithDesired_SUCCEEDED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.SUCCEEDED, - false)); - assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState()); - // no TA_Recovery event sent - assertEquals(0, taEventHandler.getEvents().size()); - } - - @Test(timeout = 5000) - public void testRecovery_WithDesired_FAILED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.FAILED, - false)); - assertEquals(TaskStateInternal.FAILED, task.getInternalState()); - // no TA_Recovery event sent - assertEquals(0, taEventHandler.getEvents().size()); - } - - @Test(timeout = 5000) - public void testRecovery_WithDesired_KILLED() { - restoreFromTaskStartEvent(); - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - restoreFromFirstTaskAttemptStartEvent(taId); - task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.KILLED, - false)); - assertEquals(TaskStateInternal.KILLED, task.getInternalState()); - // no TA_Recovery event sent - assertEquals(0, taEventHandler.getEvents().size()); - - } - - /** - * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent -> - * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition - */ - @Test(timeout = 5000) - public void testRecovery_OneTAStarted_Killed() { - restoreFromTaskStartEvent(); - - long taStartTime = taskStartTime + 100L; - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(TaskAttemptStateInternal.NEW, - ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); - assertEquals(1, task.getAttempts().size()); - assertEquals(0, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - long taFinishTime = taStartTime + 100L; - recoveredState = - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), null, 0, null, 0)); - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(TaskAttemptStateInternal.NEW, - ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); - assertEquals(1, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(0, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // wait for Task send TA_RECOVER to TA and TA complete the RecoverTransition - dispatcher.await(); - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - assertEquals(TaskAttemptStateInternal.KILLED, - ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); - // new task attempt is scheduled - assertEquals(2, task.getAttempts().size()); - assertEquals(1, task.getFinishedAttemptsCount()); - assertEquals(0, task.failedAttempts); - assertEquals(1, task.getUncompletedAttemptsCount()); - assertEquals(null, task.successfulAttempt); - } - - /** - * n = maxFailedAttempts, in the previous AM attempt, n task attempts are - * killed. When recovering, it should continue to be in running state and - * schedule a new task attempt. - */ - @Test(timeout = 5000) - public void testTaskRecovery_MultipleAttempts1() { - int maxFailedAttempts = - conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, - TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); - restoreFromTaskStartEvent(); - - for (int i = 0; i < maxFailedAttempts; ++i) { - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, null, "", null, null, 0, null, 0)); - } - assertEquals(maxFailedAttempts, task.getAttempts().size()); - assertEquals(0, task.failedAttempts); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // if the previous task attempt is killed, it should not been take into - // account when checking whether exceed the max attempts - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - // schedule a new task attempt - assertEquals(maxFailedAttempts + 1, task.getAttempts().size()); - } - - /** - * n = maxFailedAttempts, in the previous AM attempt, n task attempts are - * failed. When recovering, it should transit to failed because # of - * failed_attempt is exceeded. - */ - @Test(timeout = 5000) - public void testTaskRecovery_MultipleAttempts2() { - int maxFailedAttempts = - conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, - TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); - restoreFromTaskStartEvent(); - - for (int i = 0; i < maxFailedAttempts; ++i) { - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0)); - } - assertEquals(maxFailedAttempts, task.getAttempts().size()); - assertEquals(maxFailedAttempts, task.failedAttempts); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // it should transit to failed because of the failed task attempt in the - // last application attempt. - assertEquals(TaskStateInternal.FAILED, task.getInternalState()); - assertEquals(maxFailedAttempts, task.getAttempts().size()); - } - - /** - * n = maxFailedAttempts, in the previous AM attempt, n-1 task attempts are - * killed. And last task attempt is still in running state. When recovering, - * the last attempt should transit to killed and task is still in running - * state and new task attempt is scheduled. - */ - @Test(timeout = 5000) - public void testTaskRecovery_MultipleAttempts3() throws InterruptedException { - int maxFailedAttempts = - conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, - TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); - restoreFromTaskStartEvent(); - - for (int i = 0; i < maxFailedAttempts - 1; ++i) { - TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); - task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); - task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0)); - } - assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); - assertEquals(maxFailedAttempts - 1, task.failedAttempts); - - TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); - TaskState recoveredState = - task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, - vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "")); - - assertEquals(TaskState.RUNNING, recoveredState); - assertEquals(TaskAttemptStateInternal.NEW, - ((TaskAttemptImpl) task.getAttempt(newTaskAttemptId)) - .getInternalState()); - assertEquals(maxFailedAttempts, task.getAttempts().size()); - - task.handle(new TaskEventRecoverTask(task.getTaskId())); - // wait until task attempt receive the Recover event from task - dispatcher.await(); - - assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); - assertEquals(TaskAttemptStateInternal.KILLED, - ((TaskAttemptImpl) (task.getAttempt(newTaskAttemptId))) - .getInternalState()); - assertEquals(maxFailedAttempts - 1, task.failedAttempts); - - // new task attempt is added - assertEquals(maxFailedAttempts + 1, task.getAttempts().size()); - } - - private TezTaskAttemptID getNewTaskAttemptID(TezTaskID taskId) { - return TezTaskAttemptID.getInstance(taskId, taskAttemptCounter++); - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 035de32..11c2bf1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -137,8 +137,10 @@ import org.apache.tez.dag.app.dag.event.CallableEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; @@ -3459,11 +3461,8 @@ public class TestVertexImpl { ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); - - dispatcher.getEventHandler().handle( - new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( - new TaskAttemptFailedEvent("Failed"), new EventMetaData( - EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID()))))); + ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + "diag", TaskAttemptTerminationCause.APPLICATION_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause()); @@ -3496,10 +3495,8 @@ public class TestVertexImpl { ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); - dispatcher.getEventHandler().handle( - new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( - new TaskAttemptFailedEvent("Failed"), new EventMetaData( - EventProducerConsumerType.INPUT, v.getName(), null, ta.getID()))))); + ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause()); @@ -3533,10 +3530,8 @@ public class TestVertexImpl { ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); - dispatcher.getEventHandler().handle( - new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( - new TaskAttemptFailedEvent("Failed"), new EventMetaData( - EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID()))))); + ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause()); @@ -6355,17 +6350,17 @@ public class TestVertexImpl { v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent1))); dispatcher.await(); assertTrue(v3.pendingTaskEvents.size() != 0); - ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); - verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); - verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); +// ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); +// verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); +// verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null))); dispatcher.await(); assertTrue(v3.pendingTaskEvents.size() == 0); // recovery events is not only handled one time - argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); - verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); - verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); +// argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); +// verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); +// verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); } private void verifyHistoryEvents(List<DAGHistoryEvent> events, HistoryEventType eventType, int expectedTimes) {
