Author: ddas
Date: Mon Feb 23 07:36:56 2009
New Revision: 746909
URL: http://svn.apache.org/viewvc?rev=746909&view=rev
Log:
Merge -r 746901:746902 and 746902:746903 from trunk onto 0.20 branch. Fixes
HADOOP-5285.
Added:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
- copied unchanged from r746903,
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 07:36:56 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Mon Feb 23 07:36:56 2009
@@ -638,6 +638,13 @@
HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit)
+ HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether
job is
+ inited before trying to lock the JobInProgress (2) Moves the CleanupQueue
class
+ outside the TaskTracker and makes it a generic class that is used by the
+ JobTracker also for deleting the paths on the job's output fs. (3) Moves
the
+ references to completedJobStore outside the block where the JobTracker is
locked.
+ (ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 07:36:56 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Mon Feb 23 07:36:56 2009
@@ -978,35 +978,39 @@
/*
* Return task cleanup attempt if any, to run on a given tracker
*/
- public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts,
+ public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
boolean isMapSlot)
throws IOException {
- if (this.status.getRunState() != JobStatus.RUNNING ||
- jobFailed || jobKilled) {
- return null;
- }
-
- String taskTracker = tts.getTrackerName();
- if (!shouldRunOnTaskTracker(taskTracker)) {
+ if (!tasksInited.get()) {
return null;
}
- TaskAttemptID taskid = null;
- TaskInProgress tip = null;
- if (isMapSlot) {
- if (!mapCleanupTasks.isEmpty()) {
- taskid = mapCleanupTasks.remove(0);
- tip = maps[taskid.getTaskID().getId()];
+ synchronized (this) {
+ if (this.status.getRunState() != JobStatus.RUNNING ||
+ jobFailed || jobKilled) {
+ return null;
}
- } else {
- if (!reduceCleanupTasks.isEmpty()) {
- taskid = reduceCleanupTasks.remove(0);
- tip = reduces[taskid.getTaskID().getId()];
+ String taskTracker = tts.getTrackerName();
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
}
+ TaskAttemptID taskid = null;
+ TaskInProgress tip = null;
+ if (isMapSlot) {
+ if (!mapCleanupTasks.isEmpty()) {
+ taskid = mapCleanupTasks.remove(0);
+ tip = maps[taskid.getTaskID().getId()];
+ }
+ } else {
+ if (!reduceCleanupTasks.isEmpty()) {
+ taskid = reduceCleanupTasks.remove(0);
+ tip = reduces[taskid.getTaskID().getId()];
+ }
+ }
+ if (tip != null) {
+ return tip.addRunningTask(taskid, taskTracker, true);
+ }
+ return null;
}
- if (tip != null) {
- return tip.addRunningTask(taskid, taskTracker, true);
- }
- return null;
}
public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
@@ -1111,9 +1115,6 @@
* @return true/false
*/
private synchronized boolean canLaunchJobCleanupTask() {
- if (!tasksInited.get()) {
- return false;
- }
// check if the job is running
if (status.getRunState() != JobStatus.RUNNING &&
status.getRunState() != JobStatus.PREP) {
@@ -2444,6 +2445,7 @@
*/
synchronized void garbageCollect() {
// Let the JobTracker know that a job is complete
+ jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
try {
@@ -2467,8 +2469,7 @@
// Delete temp dfs dirs created if any, like in case of
// speculative exn of reduces.
Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
- FileSystem fs = tempDir.getFileSystem(conf);
- fs.delete(tempDir, true);
+ new CleanupQueue().addToQueue(conf,tempDir);
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
@@ -2479,6 +2480,7 @@
this.runningMapCache = null;
this.nonRunningReduces = null;
this.runningReduces = null;
+
}
/**
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Mon Feb 23 07:36:56 2009
@@ -1823,9 +1823,6 @@
synchronized void finalizeJob(JobInProgress job) {
// Mark the 'non-running' tasks for pruning
markCompletedJob(job);
-
- //persists the job info in DFS
- completedJobStatusStore.store(job);
JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
@@ -2856,34 +2853,41 @@
setJobPriority(jobid, newPriority);
}
- public synchronized JobProfile getJobProfile(JobID jobid) {
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getProfile();
- } else {
- return completedJobStatusStore.readJobProfile(jobid);
+ void storeCompletedJob(JobInProgress job) {
+ //persists the job info in DFS
+ completedJobStatusStore.store(job);
+ }
+
+ public JobProfile getJobProfile(JobID jobid) {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getProfile();
+ }
}
+ return completedJobStatusStore.readJobProfile(jobid);
}
- public synchronized JobStatus getJobStatus(JobID jobid) {
+ public JobStatus getJobStatus(JobID jobid) {
if (null == jobid) {
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
return null;
}
-
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getStatus();
- } else {
- return completedJobStatusStore.readJobStatus(jobid);
- }
- }
- public synchronized Counters getJobCounters(JobID jobid) {
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getCounters();
- } else {
- return completedJobStatusStore.readCounters(jobid);
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getStatus();
+ }
+ }
+ return completedJobStatusStore.readJobStatus(jobid);
+ }
+ public Counters getJobCounters(JobID jobid) {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getCounters();
+ }
}
+ return completedJobStatusStore.readCounters(jobid);
}
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
@@ -2981,18 +2985,17 @@
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
JobID jobid, int fromEventId, int maxEvents) throws IOException{
- TaskCompletionEvent[] events = EMPTY_EVENTS;
-
- JobInProgress job = this.jobs.get(jobid);
- if (null != job) {
- if (job.inited()) {
- events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+ synchronized (this) {
+ JobInProgress job = this.jobs.get(jobid);
+ if (null != job) {
+ if (job.inited()) {
+ return job.getTaskCompletionEvents(fromEventId, maxEvents);
+ } else {
+ return EMPTY_EVENTS;
+ }
}
}
- else {
- events = completedJobStatusStore.readJobTaskCompletionEvents(jobid,
fromEventId, maxEvents);
- }
- return events;
+ return completedJobStatusStore.readJobTaskCompletionEvents(jobid,
fromEventId, maxEvents);
}
/**
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Mon Feb 23 07:36:56 2009
@@ -972,9 +972,7 @@
private void startCleanupThreads() throws IOException {
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
- directoryCleanupThread = new CleanupQueue(originalConf);
- directoryCleanupThread.setDaemon(true);
- directoryCleanupThread.start();
+ directoryCleanupThread = new CleanupQueue();
}
/**
@@ -1422,7 +1420,7 @@
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
- directoryCleanupThread.addToQueue(getLocalFiles(fConf,
+ directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf,
getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
@@ -2487,17 +2485,20 @@
//might be using the dir. The JVM running the tasks would clean
//the workdir per a task in the task process itself.
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir));
}
else {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir+"/job.xml"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir+"/work"));
}
}
@@ -3026,43 +3027,6 @@
return paths;
}
- // cleanup queue which deletes files/directories of the paths queued up.
- private static class CleanupQueue extends Thread {
- private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
- private JobConf conf;
-
- public CleanupQueue(JobConf conf) throws IOException{
- setName("Directory/File cleanup thread");
- setDaemon(true);
- this.conf = conf;
- }
-
- public void addToQueue(Path... paths) {
- for (Path p : paths) {
- try {
- queue.put(p);
- } catch (InterruptedException ie) {}
- }
- return;
- }
-
- public void run() {
- LOG.debug("cleanup thread started");
- Path path = null;
- while (true) {
- try {
- path = queue.take();
- // delete the path.
- FileSystem fs = path.getFileSystem(conf);
- fs.delete(path, true);
- } catch (IOException e) {
- LOG.info("Error deleting path" + path);
- } catch (InterruptedException t) {
- }
- }
- }
- }
-
int getMaxCurrentMapTasks() {
return maxCurrentMapTasks;
}