MAPREDUCE-7166. map-only job should ignore node lost event when task is already succeeded. Contributed by Lei Li.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/499c70ed Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/499c70ed Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/499c70ed Branch: refs/heads/HDFS-12943 Commit: 499c70eda5f6315cf7e13a4b3bbefc7e187bc457 Parents: d963575 Author: Akira Ajisaka <[email protected]> Authored: Thu Dec 20 10:09:50 2018 +0900 Committer: Akira Ajisaka <[email protected]> Committed: Thu Dec 20 10:09:50 2018 +0900 ---------------------------------------------------------------------- .../v2/app/job/impl/TaskAttemptImpl.java | 15 ++++ .../v2/app/job/impl/TestTaskAttempt.java | 84 +++++++++++++++++++- 2 files changed, 95 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/499c70ed/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 63e7456..d912b60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -2196,6 +2196,14 @@ public abstract class TaskAttemptImpl implements taskAttempt.getID().toString()); return TaskAttemptStateInternal.SUCCEEDED; } + if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP + && taskAttempt.conf.getNumReduceTasks() == 0) { + // same reason as above for map only job after map task has succeeded. + // ignore this for map only tasks + LOG.info("Ignoring killed event for successful map only task attempt" + + taskAttempt.getID().toString()); + return TaskAttemptStateInternal.SUCCEEDED; + } if(event instanceof TaskAttemptKillEvent) { TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; //add to diagnostic @@ -2246,6 +2254,13 @@ public abstract class TaskAttemptImpl implements LOG.info("Ignoring killed event for successful reduce task attempt" + taskAttempt.getID().toString()); return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; + } else if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP + && taskAttempt.conf.getNumReduceTasks() == 0) { + // same reason as above for map only job after map task has succeeded. + // ignore this for map only tasks + LOG.info("Ignoring killed event for successful map only task attempt" + + taskAttempt.getID().toString()); + return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; } else { // Store reschedule flag so that after clean up is completed, new // attempt is scheduled/rescheduled based on it. http://git-wip-us.apache.org/repos/asf/hadoop/blob/499c70ed/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index d62a6cc..11f16a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -1323,6 +1323,42 @@ public class TestTaskAttempt{ } @Test + public void testKillMapOnlyTaskWhileSuccessFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", + TaskAttemptState.SUCCEEDED, taImpl.getState()); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + taImpl.getInternalState()); + + // If the map only task is killed when it is in SUCCESS_FINISHING_CONTAINER + // state, the state will move to SUCCESS_CONTAINER_CLEANUP + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in SUCCEEDED state", + TaskAttemptState.SUCCEEDED, taImpl.getState()); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_CONTAINER_CLEANUP", + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + taImpl.getInternalState()); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt is not in SUCCEEDED state", + TaskAttemptState.SUCCEEDED, taImpl.getState()); + assertEquals("Task attempt's internal state is not SUCCEEDED state", + TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState()); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test public void testKillMapTaskAfterSuccess() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); @@ -1340,7 +1376,7 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_CONTAINER_CLEANED)); // Send a map task attempt kill event indicating next map attempt has to be // reschedule - taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(), "", true)); assertEquals("Task attempt is not in KILLED state", taImpl.getState(), TaskAttemptState.KILLED); assertEquals("Task attempt's internal state is not KILLED", @@ -1354,6 +1390,34 @@ public class TestTaskAttempt{ } @Test + public void testKillMapOnlyTaskAfterSuccess() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", + TaskAttemptState.SUCCEEDED, taImpl.getState()); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + taImpl.getInternalState()); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + // Succeeded + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + assertEquals("Task attempt is not in SUCCEEDED state", + TaskAttemptState.SUCCEEDED, taImpl.getState()); + assertEquals("Task attempt's internal state is not SUCCEEDED", + TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState()); + assertFalse("InternalError occurred", eventHandler.internalError); + TaskEvent event = eventHandler.lastTaskEvent; + assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, event.getType()); + } + + @Test public void testKillMapTaskWhileFailFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); @@ -1765,8 +1829,8 @@ public class TestTaskAttempt{ thenReturn(taskAttemptFinishingMonitor); } - private TaskAttemptImpl createTaskAttemptImpl( - MockEventHandler eventHandler) { + private TaskAttemptImpl createCommonTaskAttemptImpl( + MockEventHandler eventHandler, JobConf jobConf) { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); @@ -1778,7 +1842,6 @@ public class TestTaskAttempt{ TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); - JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); @@ -1813,6 +1876,19 @@ public class TestTaskAttempt{ return taImpl; } + private TaskAttemptImpl createTaskAttemptImpl( + MockEventHandler eventHandler) { + JobConf jobConf = new JobConf(); + return createCommonTaskAttemptImpl(eventHandler, jobConf); + } + + private TaskAttemptImpl createMapOnlyTaskAttemptImpl( + MockEventHandler eventHandler) { + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.NUM_REDUCES, 0); + return createCommonTaskAttemptImpl(eventHandler, jobConf); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; public TaskEvent lastTaskEvent; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
