MAPREDUCE-7020. Task timeout in uber mode can crash AM. Contributed by Peter Bacsko
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6eef3d7f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6eef3d7f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6eef3d7f Branch: refs/heads/YARN-1011 Commit: 6eef3d7f1a1e5e3f27fb3bf7596663640d786181 Parents: e990904 Author: Jason Lowe <[email protected]> Authored: Fri Jan 26 15:31:43 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Fri Jan 26 15:31:43 2018 -0600 ---------------------------------------------------------------------- .../apache/hadoop/mapred/TaskAttemptListenerImpl.java | 8 +++++--- .../hadoop/mapred/TestTaskAttemptListenerImpl.java | 6 ++++-- .../src/main/java/org/apache/hadoop/mapred/Task.java | 14 +++++++++++--- 3 files changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eef3d7f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index b155af22..668d8ed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -369,14 +369,16 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + AMFeedback feedback = new AMFeedback(); AtomicReference<TaskAttemptStatus> lastStatusRef = attemptIdToStatus.get(yarnAttemptID); if (lastStatusRef == null) { - throw new IllegalStateException("Status update was called" - + " with illegal TaskAttemptId: " + yarnAttemptID); + LOG.error("Status update was called with illegal TaskAttemptId: " + + yarnAttemptID); + feedback.setTaskFound(false); + return feedback; } - AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); // Propagating preemption to the task if TASK_PREEMPTION is enabled http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eef3d7f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 4ff6fb2..da7421b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -487,13 +487,15 @@ public class TestTaskAttemptListenerImpl { assertEquals(Phase.REDUCE, status.phase); } - @Test(expected = IllegalStateException.class) + @Test public void testStatusUpdateFromUnregisteredTask() throws IOException, InterruptedException{ configureMocks(); startListener(false); - listener.statusUpdate(attemptID, firstReduceStatus); + AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus); + + assertFalse(feedback.getTaskFound()); } private void configureMocks() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eef3d7f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 87c9e16..5b98b35 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -855,6 +855,9 @@ abstract public class Task implements Writable, Configurable { long taskProgressInterval = MRJobConfUtil. getTaskProgressReportInterval(conf); + boolean uberized = conf.getBoolean("mapreduce.task.uberized", + false); + while (!taskDone.get()) { synchronized (lock) { done = false; @@ -893,9 +896,14 @@ abstract public class Task implements Writable, Configurable { // if Task Tracker is not aware of our task ID (probably because it died and // came back up), kill ourselves if (!taskFound) { - LOG.warn("Parent died. Exiting "+taskId); - resetDoneFlag(); - System.exit(66); + if (uberized) { + taskDone.set(true); + break; + } else { + LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); + System.exit(66); + } } // Set a flag that says we should preempt this is read by --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
