Repository: hadoop Updated Branches: refs/heads/branch-2.7 8e675d93d -> 3f5c67df7
MAPREDUCE-6937. Backport MAPREDUCE-6870 to branch-2 while preserving compatibility. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f5c67df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f5c67df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f5c67df Branch: refs/heads/branch-2.7 Commit: 3f5c67df7707f700f2df379af32ba1bea607d28b Parents: 8e675d9 Author: Haibo Chen <[email protected]> Authored: Wed Aug 30 09:39:50 2017 -0700 Committer: Haibo Chen <[email protected]> Committed: Wed Aug 30 09:39:50 2017 -0700 ---------------------------------------------------------------------- .../mapreduce/v2/app/job/impl/JobImpl.java | 40 +++++- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++++++++++++++---- .../apache/hadoop/mapreduce/MRJobConfig.java | 11 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 170 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f5c67df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4553f10..2476e7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -643,6 +643,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + /** Whether the job should be finished when all reducers are completed, + regardless of having running mappers */ + private boolean finishJobWhenReducersDone; + /** True if the job's mappers were already sent the T_KILL event */ + private boolean completingJob = false; private Credentials jobCredentials; private Token<JobTokenIdentifier> jobToken; @@ -714,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); + this.finishJobWhenReducersDone = conf.getBoolean( + MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, + MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { @@ -2014,7 +2022,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2045,6 +2055,34 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details + @param job The MapReduce job + */ + private void checkReadyForCompletionWhenAllReducersDone(final JobImpl job) { + if (job.finishJobWhenReducersDone) { + int totalReduces = job.getTotalReduces(); + int completedReduces = job.getCompletedReduces(); + + if (totalReduces > 0 && totalReduces == completedReduces + && !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { + MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); + if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + job.completingJob = true; + } + } + } } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f5c67df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 1a2dcd0..e7f4d79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -562,33 +562,13 @@ public class TestJobImpl { dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); // replace the tasks with spied versions to return the right attempts - Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>(); - List<NodeReport> nodeReports = new ArrayList<NodeReport>(); - Map<NodeReport,TaskId> nodeReportsToTaskIds = - new HashMap<NodeReport,TaskId>(); - for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) { - TaskId taskId = e.getKey(); - Task task = e.getValue(); - if (taskId.getTaskType() == TaskType.MAP) { - // add an attempt to the task to simulate nodes - NodeId nodeId = mock(NodeId.class); - TaskAttempt attempt = mock(TaskAttempt.class); - when(attempt.getNodeId()).thenReturn(nodeId); - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - when(attempt.getID()).thenReturn(attemptId); - // create a spied task - Task spied = spy(task); - doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); - spiedTasks.put(taskId, spied); + Map<TaskId, Task> spiedTasks = new HashMap<>(); + List<NodeReport> nodeReports = new ArrayList<>(); + Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.UNHEALTHY, nodeReports); - // create a NodeReport based on the node id - NodeReport report = mock(NodeReport.class); - when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY); - when(report.getNodeId()).thenReturn(nodeId); - nodeReports.add(report); - nodeReportsToTaskIds.put(report, taskId); - } - } // replace the tasks with the spied tasks job.tasks.putAll(spiedTasks); @@ -638,6 +618,82 @@ public class TestJobImpl { commitHandler.stop(); } + @Test + public void testJobNCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(true); + } + + @Test + public void testJobNotCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(false); + } + + private void testJobCompletionWhenReducersAreFinished(boolean killMappers) + throws InterruptedException, BrokenBarrierException { + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + final List<TaskEvent> killedEvents = + Collections.synchronizedList(new ArrayList<TaskEvent>()); + dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() { + @Override + public void handle(TaskEvent event) { + if (event.getType() == TaskEventType.T_KILL) { + killedEvents.add(event); + } + } + }); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); + + // replace the tasks with spied versions to return the right attempts + Map<TaskId, Task> spiedTasks = new HashMap<>(); + List<NodeReport> nodeReports = new ArrayList<>(); + Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.RUNNING, nodeReports); + + // replace the tasks with the spied tasks + job.tasks.putAll(spiedTasks); + + // finish reducer + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.REDUCE) { + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + } + } + + dispatcher.await(); + + /* + * StubbedJob cannot finish in this test - we'd have to generate the + * necessary events in this test manually, but that wouldn't add too + * much value. Instead, we validate the T_KILL events. + */ + if (killMappers) { + Assert.assertEquals("Number of killed events", 2, killedEvents.size()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000", + killedEvents.get(0).getTaskID().toString()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001", + killedEvents.get(1).getTaskID().toString()); + } else { + Assert.assertEquals("Number of killed events", 0, killedEvents.size()); + } + } + public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); t.testJobNoTasks(); @@ -985,6 +1041,37 @@ public class TestJobImpl { Assert.assertEquals(state, job.getInternalState()); } + private void createSpiedMapTasks(Map<NodeReport, TaskId> + nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job, + NodeState nodeState, List<NodeReport> nodeReports) { + for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) { + TaskId taskId = e.getKey(); + Task task = e.getValue(); + if (taskId.getTaskType() == TaskType.MAP) { + // add an attempt to the task to simulate nodes + NodeId nodeId = mock(NodeId.class); + TaskAttempt attempt = mock(TaskAttempt.class); + when(attempt.getNodeId()).thenReturn(nodeId); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + when(attempt.getID()).thenReturn(attemptId); + // create a spied task + Task spied = spy(task); + Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>(); + attemptMap.put(attemptId, attempt); + when(spied.getAttempts()).thenReturn(attemptMap); + doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); + spiedTasks.put(taskId, spied); + + // create a NodeReport based on the node id + NodeReport report = mock(NodeReport.class); + when(report.getNodeState()).thenReturn(nodeState); + when(report.getNodeId()).thenReturn(nodeId); + nodeReports.add(report); + nodeReportsToTaskIds.put(report, taskId); + } + } + } + private static class JobSubmittedEventHandler implements EventHandler<JobHistoryEvent> { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f5c67df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 09e1002..6636838 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -387,7 +387,7 @@ public interface MRJobConfig { public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " "; - + public static final String JOB_RUNNING_MAP_LIMIT = "mapreduce.job.running.map.limit"; public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0; @@ -920,4 +920,13 @@ public interface MRJobConfig { * A comma-separated list of properties whose value will be redacted. */ String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties"; + + /** + * Specifies whether the job should complete once all reducers + * have finished, regardless of whether there are still running mappers. + */ + String FINISH_JOB_WHEN_REDUCERS_DONE = + "mapreduce.job.finish-when-all-reducers-done"; + + boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f5c67df/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index d01dd5f..da4710e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1462,6 +1462,14 @@ </property> <property> + <name>mapreduce.job.finish-when-all-reducers-done</name> + <value>false</value> + <description>Specifies whether the job should complete once all reducers + have finished, regardless of whether there are still running mappers. + </description> +</property> + +<property> <name>mapreduce.job.token.tracking.ids.enabled</name> <value>false</value> <description>Whether to write tracking ids of tokens to --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
