Repository: tez Updated Branches: refs/heads/master 8e969abf9 -> 191447e09
TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/191447e0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/191447e0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/191447e0 Branch: refs/heads/master Commit: 191447e092e0432ebbc521113813e138c3e55818 Parents: 8e969ab Author: Jason Lowe <[email protected]> Authored: Thu Mar 17 19:00:46 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Mar 17 19:00:46 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +++--- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 26 +++++++++++--------- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 7 +++--- 4 files changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 78228c4..5fd3856 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state TEZ-2936. Create ATS implementation that enables support for YARN-4265 (ATS v1.5) TEZ-3148. Invalid event TA_TEZ_EVENT_UPDATE on TaskAttempt. TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures. @@ -404,6 +405,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures. TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch TEZ-3140. Reduce AM memory usage during serialization http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6f44f3d..702c323 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -79,7 +79,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; @@ -172,7 +171,7 @@ public class TaskAttemptImpl implements TaskAttempt, private final Lock writeLock; protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; - private final TaskAttemptRecoveryData recoveryData; + private TaskAttemptRecoveryData recoveryData; private long launchTime = 0; private long finishTime = 0; private String trackerName; @@ -1663,7 +1662,10 @@ public class TaskAttemptImpl implements TaskAttempt, MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { @Override public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) { - if (attempt.leafVertex) { + boolean fromRecovery = (event instanceof RecoveryEvent + && ((RecoveryEvent) event).isFromRecovery()); + attempt.recoveryData = null; + if (!fromRecovery && attempt.leafVertex) { return TaskAttemptStateInternal.SUCCEEDED; } // TODO - TEZ-834. This assumes that the outputs were on that node http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index bdadf3f..32869a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -35,6 +35,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskEvent; @@ -982,10 +984,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private static class AttemptSucceededTransition implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { - private boolean recoverSuccessTaskAttempt(TaskImpl task) { + private String recoverSuccessTaskAttempt(TaskImpl task) { // Found successful attempt // Recover data - boolean recoveredData = true; + String errorMsg = null; if (task.getVertex().getOutputCommitters() != null && !task.getVertex().getOutputCommitters().isEmpty()) { for (Entry<String, OutputCommitter> entry @@ -995,28 +997,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { + ", output=" + entry.getKey()); OutputCommitter committer = entry.getValue(); if (!committer.isTaskRecoverySupported()) { - LOG.info("Task recovery not supported by committer" - + ", failing task attempt" + errorMsg = "Task recovery not supported by committer" + + ", failing task attempt"; + LOG.info(errorMsg + ", taskId=" + task.getTaskId() + ", attemptId=" + task.successfulAttempt + ", output=" + entry.getKey()); - recoveredData = false; break; } try { committer.recoverTask(task.getTaskId().getId(), task.appContext.getApplicationAttemptId().getAttemptId()-1); } catch (Exception e) { + errorMsg = "Task recovery failed by committer: " + + ExceptionUtils.getStackTrace(e); LOG.warn("Task recovery failed by committer" + ", taskId=" + task.getTaskId() + ", attemptId=" + task.successfulAttempt + ", output=" + entry.getKey(), e); - recoveredData = false; break; } } } - return recoveredData; + return errorMsg; } @Override @@ -1026,13 +1029,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // recovery. In that case just reschedule new attempt if numFailedAttempts does not exceeded maxFailedAttempts. if (task.recoveryData!= null && task.recoveryData.isTaskAttemptSucceeded(successTaId)) { - boolean recoveredData = recoverSuccessTaskAttempt(task); - if (!recoveredData) { - // Move this TA to KILLED (TEZ-2958) - LOG.info("Can not recovery the successful task attempt, schedule new task attempt," + String errorMsg = recoverSuccessTaskAttempt(task); + if (errorMsg != null) { + LOG.info("Can not recover the successful task attempt, schedule new task attempt," + "taskId=" + task.getTaskId()); task.successfulAttempt = null; task.addAndScheduleAttempt(successTaId); + task.eventHandler.handle(new TaskAttemptEventAttemptKilled(successTaId, + errorMsg, TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true)); return TaskStateInternal.RUNNING; } else { task.successfulAttempt = successTaId; http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 0a2613c..0b0af7b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -321,6 +321,7 @@ public class TestDAGRecovery { final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class); when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId()); + when(appContext.getClock()).thenReturn(new SystemClock()); Mockito.doAnswer(new Answer() { public ListenableFuture<Void> answer(InvocationOnMock invocation) { @@ -1052,7 +1053,7 @@ public class TestDAGRecovery { * V2's committer is not recovery supported */ @Test//(timeout=5000) - public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() { + public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() throws Exception{ initMockDAGRecoveryDataForTaskAttempt(); // set up v2 recovery data // ta1t1v2: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent(SUCCEEDED) @@ -1097,8 +1098,8 @@ public class TestDAGRecovery { TaskImpl task = (TaskImpl)dag.getVertex(v2Id).getTask(t1v2Id); TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v2Id); - assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState()); - historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED); + assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState()); + historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED); assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); // new task attempt is scheduled assertEquals(2, task.getAttempts().size());
