Author: omalley
Date: Fri Mar 4 03:51:41 2011
New Revision: 1077205
URL: http://svn.apache.org/viewvc?rev=1077205&view=rev
Log:
commit a182596b1671a65468ffb90c63c1232e18c6f184
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Tue Feb 23 23:04:04 2010 +0530
MAPREDUCE:1398 from
https://issues.apache.org/jira/secure/attachment/12436724/mr-1398-y20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1398. Fix TaskLauncher to stop waiting for slots on a TIP
+ that is killed / failed. (Amareshwari Sriramadasu via yhemanth)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLauncher.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077205&r1=1077204&r2=1077205&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar 4 03:51:41 2011
@@ -384,14 +384,7 @@ public class TaskTracker
if (action instanceof KillJobAction) {
purgeJob((KillJobAction) action);
} else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
- }
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
+ processKillTaskAction((KillTaskAction) action);
} else {
LOG.error("Non-delete action given to cleanup thread: "
+ action);
@@ -403,6 +396,15 @@ public class TaskTracker
}
}, "taskCleanup");
+ void processKillTaskAction(KillTaskAction killAction) throws IOException {
+ TaskInProgress tip;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+ purgeTask(tip, false);
+ }
+
public TaskController getTaskController() {
return taskController;
}
@@ -706,7 +708,7 @@ public class TaskTracker
getReduceUserLogRetainSize()));
getTaskLogsMonitor().start();
- this.indexCache = new IndexCache(this.fConf);
+ setIndexCache(new IndexCache(this.fConf));
mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
@@ -1460,6 +1462,10 @@ public class TaskTracker
private long previousUpdate = 0;
+ void setIndexCache(IndexCache cache) {
+ this.indexCache = cache;
+ }
+
/**
* Build and transmit the heart beat to the JobTracker
* @param now current time
@@ -1950,7 +1956,7 @@ public class TaskTracker
}
}
- private class TaskLauncher extends Thread {
+ class TaskLauncher extends Thread {
private IntWritable numFreeSlots;
private final int maxSlots;
private List<TaskInProgress> tasksToLaunch;
@@ -1984,6 +1990,18 @@ public class TaskTracker
}
}
+ void notifySlots() {
+ synchronized (numFreeSlots) {
+ numFreeSlots.notifyAll();
+ }
+ }
+
+ int getNumWaitingTasksToLaunch() {
+ synchronized (tasksToLaunch) {
+ return tasksToLaunch.size();
+ }
+ }
+
public void run() {
while (!Thread.interrupted()) {
try {
@@ -2001,12 +2019,33 @@ public class TaskTracker
}
//wait for free slots to run
synchronized (numFreeSlots) {
+ boolean canLaunch = true;
while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+ //Make sure that there is no kill task action for this task!
+ //We are not locking tip here, because it would reverse the
+ //locking order!
+ //Also, Lock for the tip is not required here! because :
+ // 1. runState of TaskStatus is volatile
+ // 2. Any notification is not missed because notification is
+ // synchronized on numFreeSlots. So, while we are doing the
check,
+ // if the tip is half way through the kill(), we don't miss
+ // notification for the following wait().
+ if (!tip.canBeLaunched()) {
+ //got killed externally while still in the launcher queue
+ LOG.info("Not blocking slots for " + task.getTaskID()
+ + " as it got killed externally. Task's state is "
+ + tip.getRunState());
+ canLaunch = false;
+ break;
+ }
LOG.info("TaskLauncher : Waiting for " +
task.getNumSlotsRequired() +
" to launch " + task.getTaskID() + ", currently we have
" +
numFreeSlots.get() + " free slots");
numFreeSlots.wait();
}
+ if (!canLaunch) {
+ continue;
+ }
LOG.info("In TaskLauncher, current free slots : " +
numFreeSlots.get()+
" and trying to launch "+tip.getTask().getTaskID() +
" which needs " + task.getNumSlotsRequired() + " slots");
@@ -2015,10 +2054,10 @@ public class TaskTracker
}
synchronized (tip) {
//to make sure that there is no kill task action for this
- if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
- tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
- tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
+ if (!tip.canBeLaunched()) {
//got killed externally while still in the launcher queue
+ LOG.info("Not launching task " + task.getTaskID() + " as it got"
+ + " killed externally. Task's state is " + tip.getRunState());
addFreeSlots(task.getNumSlotsRequired());
continue;
}
@@ -2058,7 +2097,7 @@ public class TaskTracker
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
*/
- private void startNewTask(TaskInProgress tip) {
+ void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
} catch (Throwable e) {
@@ -2344,6 +2383,13 @@ public class TaskTracker
return this.taskStatus.inTaskCleanupPhase();
}
+ // checks if state has been changed for the task to be launched
+ boolean canBeLaunched() {
+ return (getRunState() == TaskStatus.State.UNASSIGNED ||
+ getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ getRunState() == TaskStatus.State.KILLED_UNCLEAN);
+ }
+
/**
* The task is reporting its progress
*/
@@ -2774,6 +2820,11 @@ public class TaskTracker
launcher.addFreeSlots(task.getNumSlotsRequired());
}
slotTaken = false;
+ } else {
+ // wake up the launcher. it may be waiting to block slots for this
task.
+ if (launcher != null) {
+ launcher.notifySlots();
+ }
}
}
@@ -3648,7 +3699,7 @@ public class TaskTracker
}
}
- private void setTaskMemoryManagerEnabledFlag() {
+ void setTaskMemoryManagerEnabledFlag() {
if (!ProcfsBasedProcessTree.isAvailable()) {
LOG.info("ProcessTree implementation is missing on this system. "
+ "TaskMemoryManager is disabled.");
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLauncher.java?rev=1077205&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLauncher.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLauncher.java
Fri Mar 4 03:51:41 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+
+/**
+ * Tests {@link TaskLauncher}
+ */
+public class TestTaskLauncher {
+ private static int expectedLaunchAttemptId = 1;
+
+ private static class MyTaskTracker extends TaskTracker {
+ // override startNewTask just to set the runState,
+ // not to launch the task really
+ @Override
+ void startNewTask(TaskInProgress tip) {
+ assertEquals(expectedLaunchAttemptId, tip.getTask().getTaskID().getId());
+ tip.getStatus().setRunState(TaskStatus.State.RUNNING);
+ }
+ }
+
+ /**
+ * Tests the case "task waiting to be launched is killed externally".
+ *
+ * Launches a task which will wait for ever to get slots. Kill the
+ * task and see if launcher is able to come out of the wait and pickup a
+ * another task.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testExternalKillForLaunchTask() throws IOException {
+ // setup a TaskTracker
+ JobConf ttConf = new JobConf();
+ ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 4);
+ TaskTracker tt = new MyTaskTracker();
+ tt.runningJobs = new TreeMap<JobID, TaskTracker.RunningJob>();
+ tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ tt.setIndexCache(new IndexCache(ttConf));
+ tt.setTaskMemoryManagerEnabledFlag();
+
+ // start map-task launcher with four slots
+ TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
+ mapLauncher.start();
+
+ // launch a task which requires five slots
+ String jtId = "test";
+ TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, true, 0, 0);
+ Task task = new MapTask(null, attemptID, 0, null, 5);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
+ // verify that task is added to runningTasks
+ TaskInProgress killTip = tt.runningTasks.get(attemptID);
+ assertNotNull(killTip);
+
+ // wait for a while for launcher to pick up the task
+ // this loop waits atmost for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (mapLauncher.getNumWaitingTasksToLaunch() == 0) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ assertEquals("Launcher didnt pick up the task " + attemptID + "to launch",
+ 0, mapLauncher.getNumWaitingTasksToLaunch());
+
+ // Now, that launcher has picked up the task, it waits until all five slots
+ // are available. i.e. it waits for-ever
+ // lets kill the task so that map launcher comes out
+ tt.processKillTaskAction(new KillTaskAction(attemptID));
+ assertEquals(TaskStatus.State.KILLED, killTip.getRunState());
+
+ // launch another attempt which requires only one slot
+ TaskAttemptID runningAttemptID = new TaskAttemptID(jtId, 1, true,
+ 0, expectedLaunchAttemptId);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(new MapTask(null,
+ runningAttemptID, 0, null, 1)));
+ TaskInProgress runningTip = tt.runningTasks.get(runningAttemptID);
+ assertNotNull(runningTip);
+
+ // wait for a while for the task to be launched
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (runningTip.getRunState().equals(TaskStatus.State.RUNNING)) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ // verify that the task went to running
+ assertEquals(TaskStatus.State.RUNNING, runningTip.getRunState());
+ }
+
+}