Author: ddas
Date: Mon Sep 8 21:44:06 2008
New Revision: 693360
URL: http://svn.apache.org/viewvc?rev=693360&view=rev
Log:
HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler
implementations and moves it to the JobTracker. Contributed by Amareshwari
Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 8 21:44:06 2008
@@ -484,6 +484,10 @@
HADOOP-3963. libhdfs does not exit on its own, instead it returns error
to the caller and behaves as a true library. (Pete Wyckoff via dhruba)
+ HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler
+ implementations and moves it to the JobTracker.
+ (Amareshwari Sriramadasu via ddas)
+
Release 0.18.1 - Unreleased
BUG FIXES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Mon Sep 8 21:44:06 2008
@@ -804,6 +804,11 @@
* @return true/false
*/
private synchronized boolean canLaunchCleanupTask() {
+ // check if the job is running
+ if (status.getRunState() != JobStatus.RUNNING) {
+ return false;
+ }
+ // check if cleanup task has been launched already.
if (launchedCleanup) {
return false;
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
Mon Sep 8 21:44:06 2008
@@ -73,34 +73,6 @@
0.01f);
}
- protected Task getCleanupTask(int numMaps, int numReduces,
- int maxMapTasks, int maxReduceTasks,
- TaskTrackerStatus taskTracker,
- int numTaskTrackers,
- Collection<JobInProgress> jobQueue)
- throws IOException {
- Task t = null;
- if (numMaps < maxMapTasks) {
- for (JobInProgress job : jobQueue) {
- t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts(), true);
- if (t != null) {
- return t;
- }
- }
- }
- if (numReduces < maxReduceTasks) {
- for (JobInProgress job : jobQueue) {
- t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts(), false);
- if (t != null) {
- return t;
- }
- }
- }
- return t;
- }
-
@Override
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
@@ -119,17 +91,6 @@
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
-
- // cleanup task has the highest priority, it should be
- // launched as soon as the job is done.
- synchronized (jobQueue) {
- Task t = getCleanupTask(numMaps, numReduces, maxCurrentMapTasks,
- maxCurrentReduceTasks, taskTracker, numTaskTrackers,
jobQueue);
- if (t != null) {
- return Collections.singletonList(t);
- }
- }
-
//
// Compute average map and reduce task numbers across pool
//
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon
Sep 8 21:44:06 2008
@@ -1221,7 +1221,10 @@
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
- List<Task> tasks = taskScheduler.assignTasks(taskTrackerStatus);
+ List<Task> tasks = getCleanupTask(taskTrackerStatus);
+ if (tasks == null ) {
+ tasks = taskScheduler.assignTasks(taskTrackerStatus);
+ }
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
@@ -1457,6 +1460,43 @@
return null;
}
+ private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
+ throws IOException {
+ int maxMapTasks = taskTracker.getMaxMapTasks();
+ int maxReduceTasks = taskTracker.getMaxReduceTasks();
+ int numMaps = taskTracker.countMapTasks();
+ int numReduces = taskTracker.countReduceTasks();
+ int numTaskTrackers = getClusterStatus().getTaskTrackers();
+ int numUniqueHosts = getNumberOfUniqueHosts();
+
+ Task t = null;
+ synchronized (jobs) {
+ if (numMaps < maxMapTasks) {
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+ numUniqueHosts, true);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
+ }
+ if (numReduces < maxReduceTasks) {
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+ numUniqueHosts, false);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
/**
* Grab the local fs name
*/
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
Mon Sep 8 21:44:06 2008
@@ -74,16 +74,6 @@
final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
- // check if cleanup task can be launched
- synchronized (jobQueue) {
- task = getCleanupTask(mapTasksNumber, reduceTasksNumber,
- maximumMapTasksNumber, maximumReduceTasksNumber,
- taskTracker, numTaskTrackers, jobQueue);
- if (task != null) {
- return Collections.singletonList(task);
- }
- }
-
/*
* Statistics about the whole cluster. Most are approximate because of
* concurrency