Repository: tez Updated Branches: refs/heads/branch-0.7 37ae6f561 -> d55cf45f2
TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d55cf45f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d55cf45f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d55cf45f Branch: refs/heads/branch-0.7 Commit: d55cf45f20e674e00f09fd6bdf66f6a7ba04c499 Parents: 37ae6f5 Author: Jason Lowe <[email protected]> Authored: Thu Mar 17 19:08:55 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Mar 17 19:08:55 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/records/TaskAttemptTerminationCause.java | 3 ++- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 10 +++++++++- .../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 17 +++++++++++------ .../tez/dag/app/dag/impl/TestTaskRecovery.java | 17 +++++++++++++++++ 5 files changed, 40 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bb7a56d..676d640 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures. TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch TEZ-3140. Reduce AM memory usage during serialization http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java index c8396de..d0c6798 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java @@ -23,6 +23,7 @@ public enum TaskAttemptTerminationCause { TERMINATED_BY_CLIENT, // Killed by client command TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown + TERMINATED_AT_RECOVERY, // Killed in recovery, due to can not recover running task attempt INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded @@ -42,5 +43,5 @@ public enum TaskAttemptTerminationCause { CONTAINER_STOPPED, // Container stopped or released by Tez NODE_FAILED, // Node for the container failed NODE_DISK_ERROR, // Disk failed on the node runnign the task - + } http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index aa1b39c..9b8fd80 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1633,7 +1633,15 @@ public class TaskAttemptImpl implements TaskAttempt, MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { @Override public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) { - if (attempt.leafVertex) { + boolean fromRecovery = false; + if (event instanceof TaskAttemptEventTerminationCauseEvent) { + TaskAttemptEventTerminationCauseEvent termEvent = + (TaskAttemptEventTerminationCauseEvent) event; + if (termEvent.getTerminationCause() == TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY) { + fromRecovery = true; + } + } + if (!fromRecovery && attempt.leafVertex) { return TaskAttemptStateInternal.SUCCEEDED; } // TODO - TEZ-834. This assumes that the outputs were on that node http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index e9ef69f..ad678d7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -35,6 +35,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1199,7 +1200,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { if (task.successfulAttempt != null) { //Found successful attempt //Recover data - boolean recoveredData = true; + String recoverErrorMsg = null; if (task.getVertex().getOutputCommitters() != null && !task.getVertex().getOutputCommitters().isEmpty()) { for (Entry<String, OutputCommitter> entry @@ -1209,28 +1210,32 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { + ", output=" + entry.getKey()); OutputCommitter committer = entry.getValue(); if (!committer.isTaskRecoverySupported()) { - LOG.info("Task recovery not supported by committer" - + ", failing task attempt" + recoverErrorMsg = "Task recovery not supported by committer" + + ", failing task attempt"; + LOG.info(recoverErrorMsg + ", taskId=" + task.getTaskId() + ", attemptId=" + task.successfulAttempt + ", output=" + entry.getKey()); - recoveredData = false; break; } try { committer.recoverTask(task.getTaskId().getId(), task.appContext.getApplicationAttemptId().getAttemptId()-1); } catch (Exception e) { + recoverErrorMsg = "Task recovery failed by committer: " + + ExceptionUtils.getStackTrace(e); LOG.warn("Task recovery failed by committer" + ", taskId=" + task.getTaskId() + ", attemptId=" + task.successfulAttempt + ", output=" + entry.getKey(), e); - recoveredData = false; break; } } } - if (!recoveredData) { + if (recoverErrorMsg != null) { + task.eventHandler.handle( + new TaskAttemptEventKillRequest(task.successfulAttempt, recoverErrorMsg, + TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY)); task.successfulAttempt = null; } else { LOG.info("Recovered a successful attempt" http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index 52421ba..fbf815d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -54,6 +54,7 @@ import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; @@ -623,6 +624,7 @@ public class TestTaskRecovery { assertEquals(taId, task.successfulAttempt); task.handle(new TaskEventRecoverTask(task.getTaskId())); + dispatcher.await(); assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); // new task attempt is scheduled assertEquals(2, task.getAttempts().size()); @@ -630,6 +632,13 @@ public class TestTaskRecovery { assertEquals(0, task.failedAttempts); assertEquals(1, task.getUncompletedAttemptsCount()); assertEquals(null, task.successfulAttempt); + // verify kill event is sent to original task attempt + assertEquals(2, taEventHandler.getEvents().size()); + TaskAttemptEvent taEvent = taEventHandler.getEvents().get(1); + assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, taEvent.getType()); + assertEquals(taId, taEvent.getTaskAttemptID()); + TaskAttemptEventKillRequest taKillEvent = (TaskAttemptEventKillRequest) taEvent; + assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taKillEvent.getTerminationCause()); } /** @@ -663,6 +672,7 @@ public class TestTaskRecovery { assertEquals(taId, task.successfulAttempt); task.handle(new TaskEventRecoverTask(task.getTaskId())); + dispatcher.await(); assertEquals(TaskStateInternal.RUNNING, task.getInternalState()); // new task attempt is scheduled assertEquals(2, task.getAttempts().size()); @@ -670,6 +680,13 @@ public class TestTaskRecovery { assertEquals(0, task.failedAttempts); assertEquals(1, task.getUncompletedAttemptsCount()); assertEquals(null, task.successfulAttempt); + // verify kill event is sent to original task attempt + assertEquals(2, taEventHandler.getEvents().size()); + TaskAttemptEvent taEvent = taEventHandler.getEvents().get(1); + assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, taEvent.getType()); + assertEquals(taId, taEvent.getTaskAttemptID()); + TaskAttemptEventKillRequest taKillEvent = (TaskAttemptEventKillRequest) taEvent; + assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taKillEvent.getTerminationCause()); } @Test(timeout = 5000)
