Repository: hadoop Updated Branches: refs/heads/branch-3.0 055d5e0b8 -> bc2d4ba50
MAPREDUCE-7048. Uber AM can crash due to unknown task in statusUpdate. Contributed by Peter Bacsko (cherry picked from commit 87e2570a1419d3616de2de3b553108ad1a8af425) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc2d4ba5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc2d4ba5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc2d4ba5 Branch: refs/heads/branch-3.0 Commit: bc2d4ba5075fa8eebbb74a51da3aa1c5df7d180e Parents: 055d5e0 Author: Jason Lowe <jl...@apache.org> Authored: Mon Feb 12 13:21:09 2018 -0600 Committer: Jason Lowe <jl...@apache.org> Committed: Mon Feb 12 14:49:35 2018 -0600 ---------------------------------------------------------------------- .../java/org/apache/hadoop/mapred/Task.java | 16 ++-- .../java/org/apache/hadoop/mapred/TestTask.java | 89 ++++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2d4ba5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 9059258..bcc0351 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -199,6 +199,7 @@ abstract public class Task implements Writable, Configurable { protected SecretKey shuffleSecret; protected GcTimeUpdater gcUpdater; final AtomicBoolean mustPreempt = new AtomicBoolean(false); + private boolean uberized = false; //////////////////////////////////////////// // Constructors @@ -790,9 +791,6 @@ abstract public class Task implements Writable, Configurable { long taskProgressInterval = MRJobConfUtil. getTaskProgressReportInterval(conf); - boolean uberized = conf.getBoolean("mapreduce.task.uberized", - false); - while (!taskDone.get()) { synchronized (lock) { done = false; @@ -1220,11 +1218,17 @@ abstract public class Task implements Writable, Configurable { public void statusUpdate(TaskUmbilicalProtocol umbilical) throws IOException { int retries = MAX_RETRIES; + while (true) { try { if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) { - LOG.warn("Parent died. Exiting "+taskId); - System.exit(66); + if (uberized) { + LOG.warn("Task no longer available: " + taskId); + break; + } else { + LOG.warn("Parent died. Exiting " + taskId); + ExitUtil.terminate(66); + } } taskStatus.clearStatus(); return; @@ -1437,6 +1441,8 @@ abstract public class Task implements Writable, Configurable { NetUtils.addStaticResolution(name, resolvedName); } } + + uberized = conf.getBoolean("mapreduce.task.uberized", false); } public Configuration getConf() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2d4ba5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java new file mode 100644 index 0000000..500229c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ExitUtil.ExitException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestTask { + @Mock + private TaskUmbilicalProtocol umbilical; + + @Mock + private AMFeedback feedback; + + private Task task; + + @Before + public void setup() { + task = new StubTask(); + ExitUtil.disableSystemExit(); + } + + @Test + public void testStatusUpdateDoesNotExitInUberMode() throws Exception { + setupTest(true); + + task.statusUpdate(umbilical); + } + + @Test(expected = ExitException.class) + public void testStatusUpdateExitsInNonUberMode() throws Exception { + setupTest(false); + + task.statusUpdate(umbilical); + } + + private void setupTest(boolean uberized) + throws IOException, InterruptedException { + Configuration conf = new Configuration(false); + conf.setBoolean("mapreduce.task.uberized", uberized); + task.setConf(conf); + when(umbilical.statusUpdate(any(TaskAttemptID.class), + any(TaskStatus.class))).thenReturn(feedback); + + // to avoid possible infinite loop + when(feedback.getTaskFound()).thenReturn(false, true); + } + + public class StubTask extends Task { + @Override + public void run(JobConf job, TaskUmbilicalProtocol umbilical) + throws IOException, ClassNotFoundException, InterruptedException { + // nop + } + + @Override + public boolean isMapTask() { + return false; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org