Repository: hadoop Updated Branches: refs/heads/trunk 312e57b95 -> a32e0138f
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/trunk Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo Chen <haiboc...@apache.org> Authored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Thu Aug 10 15:17:36 2017 -0700 ---------------------------------------------------------------------- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 ++++- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++++++++++++++---- .../apache/hadoop/mapreduce/MRJobConfig.java | 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token<JobTokenIdentifier> jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); + this.finishJobWhenReducersDone = conf.getBoolean( + MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, + MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ + private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { + int totalReduces = job.getTotalReduces(); + int completedReduces = job.getCompletedReduces(); + + if (totalReduces > 0 && totalReduces == completedReduces + && !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { + MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); + if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + job.completingJob = true; + } + } + } } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 2147ec1..1827ce4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -564,33 +564,13 @@ public class TestJobImpl { dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); // replace the tasks with spied versions to return the right attempts - Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>(); - List<NodeReport> nodeReports = new ArrayList<NodeReport>(); - Map<NodeReport,TaskId> nodeReportsToTaskIds = - new HashMap<NodeReport,TaskId>(); - for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) { - TaskId taskId = e.getKey(); - Task task = e.getValue(); - if (taskId.getTaskType() == TaskType.MAP) { - // add an attempt to the task to simulate nodes - NodeId nodeId = mock(NodeId.class); - TaskAttempt attempt = mock(TaskAttempt.class); - when(attempt.getNodeId()).thenReturn(nodeId); - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - when(attempt.getID()).thenReturn(attemptId); - // create a spied task - Task spied = spy(task); - doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); - spiedTasks.put(taskId, spied); + Map<TaskId, Task> spiedTasks = new HashMap<>(); + List<NodeReport> nodeReports = new ArrayList<>(); + Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.UNHEALTHY, nodeReports); - // create a NodeReport based on the node id - NodeReport report = mock(NodeReport.class); - when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY); - when(report.getNodeId()).thenReturn(nodeId); - nodeReports.add(report); - nodeReportsToTaskIds.put(report, taskId); - } - } // replace the tasks with the spied tasks job.tasks.putAll(spiedTasks); @@ -641,6 +621,82 @@ public class TestJobImpl { commitHandler.stop(); } + @Test + public void testJobNCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(true); + } + + @Test + public void testJobNotCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(false); + } + + private void testJobCompletionWhenReducersAreFinished(boolean killMappers) + throws InterruptedException, BrokenBarrierException { + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + final List<TaskEvent> killedEvents = + Collections.synchronizedList(new ArrayList<TaskEvent>()); + dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() { + @Override + public void handle(TaskEvent event) { + if (event.getType() == TaskEventType.T_KILL) { + killedEvents.add(event); + } + } + }); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); + + // replace the tasks with spied versions to return the right attempts + Map<TaskId, Task> spiedTasks = new HashMap<>(); + List<NodeReport> nodeReports = new ArrayList<>(); + Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.RUNNING, nodeReports); + + // replace the tasks with the spied tasks + job.tasks.putAll(spiedTasks); + + // finish reducer + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.REDUCE) { + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + } + } + + dispatcher.await(); + + /* + * StubbedJob cannot finish in this test - we'd have to generate the + * necessary events in this test manually, but that wouldn't add too + * much value. Instead, we validate the T_KILL events. + */ + if (killMappers) { + Assert.assertEquals("Number of killed events", 2, killedEvents.size()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000", + killedEvents.get(0).getTaskID().toString()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001", + killedEvents.get(1).getTaskID().toString()); + } else { + Assert.assertEquals("Number of killed events", 0, killedEvents.size()); + } + } + public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); t.testJobNoTasks(); @@ -1021,6 +1077,37 @@ public class TestJobImpl { Assert.assertEquals(state, job.getInternalState()); } + private void createSpiedMapTasks(Map<NodeReport, TaskId> + nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job, + NodeState nodeState, List<NodeReport> nodeReports) { + for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) { + TaskId taskId = e.getKey(); + Task task = e.getValue(); + if (taskId.getTaskType() == TaskType.MAP) { + // add an attempt to the task to simulate nodes + NodeId nodeId = mock(NodeId.class); + TaskAttempt attempt = mock(TaskAttempt.class); + when(attempt.getNodeId()).thenReturn(nodeId); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + when(attempt.getID()).thenReturn(attemptId); + // create a spied task + Task spied = spy(task); + Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>(); + attemptMap.put(attemptId, attempt); + when(spied.getAttempts()).thenReturn(attemptMap); + doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); + spiedTasks.put(taskId, spied); + + // create a NodeReport based on the node id + NodeReport report = mock(NodeReport.class); + when(report.getNodeState()).thenReturn(nodeState); + when(report.getNodeId()).thenReturn(nodeId); + nodeReports.add(report); + nodeReportsToTaskIds.put(report, taskId); + } + } + } + private static class JobSubmittedEventHandler implements EventHandler<JobHistoryEvent> { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index cfc1bcc..2023ba3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -431,7 +431,7 @@ public interface MRJobConfig { public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " "; - + public static final String JOB_RUNNING_MAP_LIMIT = "mapreduce.job.running.map.limit"; public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0; @@ -1033,4 +1033,8 @@ public interface MRJobConfig { String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties"; String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf"; + + String FINISH_JOB_WHEN_REDUCERS_DONE = + "mapreduce.job.finish-when-all-reducers-done"; + boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 101aa07..ee9b906 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1126,6 +1126,14 @@ </property> <property> + <name>mapreduce.job.finish-when-all-reducers-done</name> + <value>true</value> + <description>Specifies whether the job should complete once all reducers + have finished, regardless of whether there are still running mappers. + </description> +</property> + +<property> <name>mapreduce.job.token.tracking.ids.enabled</name> <value>false</value> <description>Whether to write tracking ids of tokens to --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org