Repository: hadoop Updated Branches: refs/heads/branch-2.7 6e62b6fd8 -> 1918da8fc
MAPREDUCE-6513. Job got hanged forever when one NM unstable for some time. Contributed by Varun Saxena & Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1918da8f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1918da8f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1918da8f Branch: refs/heads/branch-2.7 Commit: 1918da8fc9d9c8264f13e35db7a6633aa0dcdcb2 Parents: 6e62b6f Author: Jian He <[email protected]> Authored: Thu May 12 15:11:34 2016 -0700 Committer: Jian He <[email protected]> Committed: Thu May 12 15:14:08 2016 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/event/TaskAttemptKillEvent.java | 15 +- .../app/job/event/TaskTAttemptKilledEvent.java | 40 +++++ .../mapreduce/v2/app/job/impl/JobImpl.java | 4 +- .../v2/app/job/impl/TaskAttemptImpl.java | 67 +++++--- .../mapreduce/v2/app/job/impl/TaskImpl.java | 25 ++- .../v2/app/rm/RMContainerAllocator.java | 4 +- .../hadoop/mapreduce/v2/app/TestMRApp.java | 51 +++++- .../v2/app/job/impl/TestTaskAttempt.java | 163 ++++++++++++++++--- .../mapreduce/v2/app/job/impl/TestTaskImpl.java | 80 +++++++-- 10 files changed, 388 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 56edf5a..eee49a9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -81,6 +81,9 @@ Release 2.7.3 - UNRELEASED MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer resource requests (Wangda Tan via jlowe) + MAPREDUCE-6513. Job got hanged forever when one NM unstable for some + time. Contributed by Varun Saxena & Wangda Tan + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java index 9bcc838..767ef0d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java @@ -24,14 +24,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; public class TaskAttemptKillEvent extends TaskAttemptEvent { private final String message; + // Next map attempt will be rescheduled(i.e. updated in ask with higher + // priority equivalent to that of a fast fail map) + private final boolean rescheduleAttempt; public TaskAttemptKillEvent(TaskAttemptId attemptID, - String message) { + String message, boolean rescheduleAttempt) { super(attemptID, TaskAttemptEventType.TA_KILL); this.message = message; + this.rescheduleAttempt = rescheduleAttempt; + } + + public TaskAttemptKillEvent(TaskAttemptId attemptID, + String message) { + this(attemptID, message, false); } public String getMessage() { return message; } + + public boolean getRescheduleAttempt() { + return rescheduleAttempt; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java new file mode 100644 index 0000000..897444d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +/** + * Task Attempt killed event. + */ +public class TaskTAttemptKilledEvent extends TaskTAttemptEvent { + + // Next map attempt will be rescheduled(i.e. updated in ask with + // higher priority equivalent to that of a fast fail map) + private final boolean rescheduleAttempt; + + public TaskTAttemptKilledEvent(TaskAttemptId id, boolean rescheduleAttempt) { + super(id, TaskEventType.T_ATTEMPT_KILLED); + this.rescheduleAttempt = rescheduleAttempt; + } + + public boolean getRescheduleAttempt() { + return rescheduleAttempt; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index b349ca8..c235e8e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1342,7 +1342,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, if (TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); - eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + // Kill the attempt and indicate that next map attempt should be + // rescheduled (i.e. considered as a fast fail map). + eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true)); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index b27a86c..813010d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -97,6 +97,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; @@ -184,6 +185,7 @@ public abstract class TaskAttemptImpl implements private int httpPort; private Locality locality; private Avataar avataar; + private boolean rescheduleNextAttempt = false; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -1260,6 +1262,16 @@ public abstract class TaskAttemptImpl implements } //always called in write lock + private boolean getRescheduleNextAttempt() { + return rescheduleNextAttempt; + } + + //always called in write lock + private void setRescheduleNextAttempt(boolean reschedule) { + rescheduleNextAttempt = reschedule; + } + + //always called in write lock private void setFinishTime() { //set the finish time only if launch time is set if (launchTime != 0) { @@ -1611,9 +1623,8 @@ public abstract class TaskAttemptImpl implements TaskEventType.T_ATTEMPT_FAILED)); break; case KILLED: - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, false)); break; default: LOG.error("Task final state is not FAILED or KILLED: " + finalState); @@ -1851,7 +1862,6 @@ public abstract class TaskAttemptImpl implements // not setting a finish time since it was set on success assert (taskAttempt.getFinishTime() != 0); - assert (taskAttempt.getLaunchTime() != 0); taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); @@ -1859,8 +1869,13 @@ public abstract class TaskAttemptImpl implements taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId .getTaskId().getJobId(), tauce)); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); + boolean rescheduleNextTaskAttempt = false; + if (event instanceof TaskAttemptKillEvent) { + rescheduleNextTaskAttempt = + ((TaskAttemptKillEvent)event).getRescheduleAttempt(); + } + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, rescheduleNextTaskAttempt)); return TaskAttemptStateInternal.KILLED; } } @@ -1891,9 +1906,8 @@ public abstract class TaskAttemptImpl implements ((TaskAttemptKillEvent) event).getMessage()); } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt())); } } @@ -1907,22 +1921,33 @@ public abstract class TaskAttemptImpl implements // for it taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); - + sendContainerCleanup(taskAttempt, event); + // Store reschedule flag so that after clean up is completed, new + // attempt is scheduled/rescheduled based on it. if (event instanceof TaskAttemptKillEvent) { - taskAttempt.addDiagnosticInfo( - ((TaskAttemptKillEvent) event).getMessage()); + taskAttempt.setRescheduleNextAttempt( + ((TaskAttemptKillEvent)event).getRescheduleAttempt()); } + } + } - taskAttempt.reportedStatus.progress = 1.0f; - taskAttempt.updateProgressSplits(); - //send the cleanup event to containerLauncher - taskAttempt.eventHandler.handle(new ContainerLauncherEvent( - taskAttempt.attemptId, - taskAttempt.container.getId(), StringInterner - .weakIntern(taskAttempt.container.getNodeId().toString()), - taskAttempt.container.getContainerToken(), - ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); + @SuppressWarnings("unchecked") + private static void sendContainerCleanup(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); } + + taskAttempt.reportedStatus.progress = 1.0f; + taskAttempt.updateProgressSplits(); + //send the cleanup event to containerLauncher + taskAttempt.eventHandler.handle( + new ContainerLauncherEvent(taskAttempt.attemptId, + taskAttempt.container.getId(), StringInterner + .weakIntern(taskAttempt.container.getNodeId().toString()), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); } private void addDiagnosticInfo(String diag) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index ca81059..55b626d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -594,10 +595,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { // This is always called in the Write Lock private void addAndScheduleAttempt(Avataar avataar) { + addAndScheduleAttempt(avataar, false); + } + + // This is always called in the Write Lock + private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) { TaskAttempt attempt = addAttempt(avataar); inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber - if (failedAttempts.size() > 0) { + if (failedAttempts.size() > 0 || reschedule) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { @@ -968,7 +974,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(Avataar.VIRGIN); + boolean rescheduleNewAttempt = false; + if (event instanceof TaskTAttemptKilledEvent) { + rescheduleNewAttempt = + ((TaskTAttemptKilledEvent)event).getRescheduleAttempt(); + } + task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt); } if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) { task.commitAttempt = null; @@ -1175,7 +1186,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(Avataar.VIRGIN); + boolean rescheduleNextTaskAttempt = false; + if (event instanceof TaskTAttemptKilledEvent) { + // Decide whether to reschedule next task attempt. If true, this + // typically indicates that a successful map attempt was killed on an + // unusable node being reported. + rescheduleNextTaskAttempt = + ((TaskTAttemptKilledEvent)event).getRescheduleAttempt(); + } + task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt); return TaskStateInternal.SCHEDULED; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d40b120..261c67b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -887,9 +887,11 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.info("Killing taskAttempt:" + tid + " because it is running on unusable node:" + taskAttemptNodeId); + // If map, reschedule next task attempt. + boolean rescheduleNextAttempt = (i == 0) ? true : false; eventHandler.handle(new TaskAttemptKillEvent(tid, "TaskAttempt killed because it ran on unusable node" - + taskAttemptNodeId)); + + taskAttemptNodeId, rescheduleNextAttempt)); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index eb6b932..eaf1070 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Supplier; import org.apache.hadoop.test.GenericTestUtils; @@ -56,13 +57,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; +import org.mockito.Mockito; /** * Tests the state machine of MR App. @@ -201,13 +208,18 @@ public class TestMRApp { @Test public void testUpdatedNodes() throws Exception { int runCount = 0; + Dispatcher disp = Mockito.spy(new AsyncDispatcher()); MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), - true, ++runCount); + true, ++runCount, disp); Configuration conf = new Configuration(); // after half of the map completion, reduce will start conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); // uberization forces full slowstart (1.0), so disable that conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + + ContainerAllocEventHandler handler = new ContainerAllocEventHandler(); + disp.register(ContainerAllocator.EventType.class, handler); + final Job job1 = app.submit(conf); app.waitForState(job1, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size()); @@ -285,6 +297,12 @@ public class TestMRApp { events = job1.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 2 more completion events for killed", 4, events.length); + // 2 map task attempts which were killed above should be requested from + // container allocator with the previous map task marked as failed. If + // this happens allocator will request the container for this mapper from + // RM at a higher priority of 5(i.e. with a priority equivalent to that of + // a fail fast map). + handler.waitForFailedMapContainerReqEvents(2); // all maps must be back to running app.waitForState(mapTask1, TaskState.RUNNING); @@ -324,7 +342,7 @@ public class TestMRApp { // rerun // in rerun the 1st map will be recovered from previous run app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false, - ++runCount); + ++runCount, (Dispatcher)new AsyncDispatcher()); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); @@ -420,6 +438,25 @@ public class TestMRApp { app.waitForState(job2, JobState.SUCCEEDED); } + private final class ContainerAllocEventHandler + implements EventHandler<ContainerAllocatorEvent> { + private AtomicInteger failedMapContainerReqEventCnt = new AtomicInteger(0); + @Override + public void handle(ContainerAllocatorEvent event) { + if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ && + ((ContainerRequestEvent)event).getEarlierAttemptFailed()) { + failedMapContainerReqEventCnt.incrementAndGet(); + } + } + public void waitForFailedMapContainerReqEvents(int count) + throws InterruptedException { + while(failedMapContainerReqEventCnt.get() != count) { + Thread.sleep(50); + } + failedMapContainerReqEventCnt.set(0); + } + } + private static void waitFor(Supplier<Boolean> predicate, int checkIntervalMillis, int checkTotalMillis) throws InterruptedException { try { @@ -590,9 +627,17 @@ public class TestMRApp { } private final class MRAppWithHistory extends MRApp { + private Dispatcher dispatcher; public MRAppWithHistory(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart, int startCount) { + String testName, boolean cleanOnStart, int startCount, + Dispatcher disp) { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); + this.dispatcher = disp; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 3d1facf..4baa85b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -18,22 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -71,6 +55,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; @@ -87,9 +75,25 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ @@ -944,7 +948,46 @@ public class TestTaskAttempt{ + " Task attempt finish time is not the same ", finishTime, Long.valueOf(taImpl.getFinishTime())); } - + + private void containerKillBeforeAssignment(boolean scheduleAttempt) + throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + ApplicationId appId = ApplicationId.newInstance(1, 2); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1, + mock(TaskSplitMetaInfo.class), new JobConf(), + mock(TaskAttemptListener.class), mock(Token.class), + new Credentials(), new SystemClock(), + mock(AppContext.class)); + if (scheduleAttempt) { + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + } + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILLED", + taImpl.getInternalState(), TaskAttemptStateInternal.KILLED); + assertFalse("InternalError occurred", eventHandler.internalError); + TaskEvent event = eventHandler.lastTaskEvent; + assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType()); + // In NEW state, new map attempt should not be rescheduled. + assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt()); + } + + @Test + public void testContainerKillOnNew() throws Exception { + containerKillBeforeAssignment(false); + } + + @Test + public void testContainerKillOnUnassigned() throws Exception { + containerKillBeforeAssignment(true); + } + @Test public void testContainerKillAfterAssigned() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); @@ -994,7 +1037,7 @@ public class TestTaskAttempt{ taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } @@ -1051,7 +1094,7 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } @@ -1112,16 +1155,96 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } + @Test + public void testKillMapTaskAfterSuccess() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", + taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + // Send a map task attempt kill event indicating next map attempt has to be + // reschedule + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILLED", + taImpl.getInternalState(), TaskAttemptStateInternal.KILLED); + assertFalse("InternalError occurred", eventHandler.internalError); + TaskEvent event = eventHandler.lastTaskEvent; + assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType()); + // Send an attempt killed event to TaskImpl forwarding the same reschedule + // flag we received in task attempt kill event. + assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt()); + } + + private TaskAttemptImpl createTaskAttemptImpl( + MockEventHandler eventHandler) { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + return taImpl; + } + public static class MockEventHandler implements EventHandler { public boolean internalError; + public TaskEvent lastTaskEvent; @Override public void handle(Event event) { + if (event instanceof TaskEvent) { + lastTaskEvent = (TaskEvent)event; + } if (event instanceof JobEvent) { JobEvent je = ((JobEvent) event); if (JobEventType.INTERNAL_ERROR == je.getType()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1918da8f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index ae8a797..5f56f9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -50,13 +50,17 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.util.Clock; @@ -89,7 +93,8 @@ public class TestTaskImpl { private int taskCounter = 0; private final int partition = 1; - private InlineDispatcher dispatcher; + private InlineDispatcher dispatcher; + private MockTaskAttemptEventHandler taskAttemptEventHandler; private List<MockTaskAttemptImpl> taskAttempts; private class MockTaskImpl extends TaskImpl { @@ -234,7 +239,10 @@ public class TestTaskImpl { taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); - taskAttempts = new ArrayList<MockTaskAttemptImpl>(); + taskAttempts = new ArrayList<MockTaskAttemptImpl>(); + + taskAttemptEventHandler = new MockTaskAttemptEventHandler(); + dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); } private MockTaskImpl createMockTask(TaskType taskType) { @@ -271,8 +279,12 @@ public class TestTaskImpl { } private void killScheduledTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + killScheduledTaskAttempt(attemptId, false); + } + + private void killScheduledTaskAttempt(TaskAttemptId attemptId, + boolean reschedule) { + mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); assertTaskScheduledState(); } @@ -301,11 +313,15 @@ public class TestTaskImpl { } private void killRunningTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + killRunningTaskAttempt(attemptId, false); + } + + private void killRunningTaskAttempt(TaskAttemptId attemptId, + boolean reschedule) { + mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); assertTaskRunningState(); } - + private void failRunningTaskAttempt(TaskAttemptId attemptId) { mockTask.handle(new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -334,7 +350,7 @@ public class TestTaskImpl { } /** - * {@link TaskState#KILL_WAIT} + * {@link TaskStateInternal#KILL_WAIT} */ private void assertTaskKillWaitState() { assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); @@ -398,10 +414,12 @@ public class TestTaskImpl { */ public void testKillScheduledTaskAttempt() { LOG.info("--- START: testKillScheduledTaskAttempt ---"); - mockTask = createMockTask(TaskType.MAP); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); - killScheduledTaskAttempt(getLastAttempt().getAttemptId()); + killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); } @Test @@ -424,11 +442,13 @@ public class TestTaskImpl { */ public void testKillRunningTaskAttempt() { LOG.info("--- START: testKillRunningTaskAttempt ---"); - mockTask = createMockTask(TaskType.MAP); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); - killRunningTaskAttempt(getLastAttempt().getAttemptId()); + killRunningTaskAttempt(getLastAttempt().getAttemptId(), true); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); } @Test @@ -446,6 +466,28 @@ public class TestTaskImpl { assertTaskSucceededState(); } + @Test + /** + * Kill map attempt for succeeded map task + * {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED} + */ + public void testKillAttemptForSuccessfulTask() { + LOG.info("--- START: testKillAttemptForSuccessfulTask ---"); + mockTask = createMockTask(TaskType.MAP); + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertTaskSucceededState(); + mockTask.handle( + new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true)); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); + assertTaskScheduledState(); + } + @Test public void testTaskProgress() { LOG.info("--- START: testTaskProgress ---"); @@ -703,8 +745,8 @@ public class TestTaskImpl { assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt = taskAttempts.get(3); taskAttempt.setState(TaskAttemptState.KILLED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(), + false)); assertEquals(TaskState.FAILED, mockTask.getState()); } @@ -750,4 +792,14 @@ public class TestTaskImpl { Counters taskCounters = mockTask.getCounters(); assertEquals("wrong counters for task", specAttemptCounters, taskCounters); } + + public static class MockTaskAttemptEventHandler implements EventHandler { + public TaskAttemptEvent lastTaskAttemptEvent; + @Override + public void handle(Event event) { + if (event instanceof TaskAttemptEvent) { + lastTaskAttemptEvent = (TaskAttemptEvent)event; + } + } + }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
