Author: ddas
Date: Fri Jun 6 03:47:24 2008
New Revision: 663886
URL: http://svn.apache.org/viewvc?rev=663886&view=rev
Log:
HADOOP-2393. Moves the handling of dir deletions in the tasktracker to a
separate thread. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663886&r1=663885&r2=663886&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 03:47:24 2008
@@ -242,6 +242,9 @@
from shuffle threads when it has scheduled enough, before scheduling more.
(ddas)
+ HADOOP-2393. Moves the handling of dir deletions in the tasktracker to
+ a separate thread. (Amareshwari Sriramadasu via ddas)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=663886&r1=663885&r2=663886&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri
Jun 6 03:47:24 2008
@@ -123,7 +123,7 @@
StatusHttpServer server = null;
- boolean shuttingDown = false;
+ volatile boolean shuttingDown = false;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID,
TaskInProgress>();
/**
@@ -159,6 +159,7 @@
private int finishedCount[] = new int[1];
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
+ private CleanupQueue directoryCleanupThread;
/**
* the minimum interval between jobtracker polls
*/
@@ -296,6 +297,9 @@
public void run() {
while (true) {
try {
+ if (tasksToCleanup.isEmpty() && !isRunning()) {
+ break;
+ }
TaskTrackerAction action = tasksToCleanup.take();
if (action instanceof KillJobAction) {
purgeJob((KillJobAction) action);
@@ -394,9 +398,12 @@
fConf.get("mapred.tasktracker.dns.nameserver","default"));
}
+ directoryCleanupThread = new CleanupQueue(fConf);
+ directoryCleanupThread.start();
+
//check local disk
checkLocalDirs(this.fConf.getLocalDirs());
- fConf.deleteLocalFiles(SUBDIR);
+ directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
// Clear out state tables
this.tasks.clear();
@@ -793,6 +800,28 @@
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
+ // shutdown cleanup threads.
+ if (this.taskCleanupThread != null
+ && this.taskCleanupThread.isAlive()) {
+ LOG.info("Stopping task cleanup thread");
+ this.taskCleanupThread.interrupt();
+ try {
+ this.taskCleanupThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.directoryCleanupThread != null
+ && this.directoryCleanupThread.isAlive()) {
+ LOG.info("Stopping directory cleanup thread");
+ this.directoryCleanupThread.interrupt();
+ try {
+ this.directoryCleanupThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+
// shutdown RPC connections
RPC.stopProxy(jobClient);
}
@@ -1149,8 +1178,9 @@
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
- fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE +
- Path.SEPARATOR + rjob.getJobID());
+ directoryCleanupThread.addToQueue(getLocalFiles(fConf,
+ SUBDIR + Path.SEPARATOR + JOBCACHE +
+ Path.SEPARATOR + rjob.getJobID()));
}
// Remove this job
rjob.tasks.clear();
@@ -1925,10 +1955,11 @@
if (runner != null) {
runner.close();
}
- defaultJobConf.deleteLocalFiles(taskDir);
+ directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ taskDir));
} else {
- defaultJobConf.deleteLocalFiles(taskDir + Path.SEPARATOR +
- MRConstants.WORKDIR);
+ directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ taskDir + Path.SEPARATOR + MRConstants.WORKDIR));
}
} catch (Throwable ie) {
LOG.info("Error cleaning up task runner: " +
@@ -2313,7 +2344,15 @@
System.exit(-1);
}
}
-
+
+ /**
+ * True if task tracker is not shutting down.
+ * @return running
+ */
+ public boolean isRunning() {
+ return !shuttingDown;
+ }
+
/**
* This class is used in TaskTracker's Jetty to serve the map outputs
* to other nodes.
@@ -2456,4 +2495,56 @@
shuffleMetrics.successOutput();
}
}
+
+ // get the full paths of the directory in all the local disks.
+ private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+ String[] localDirs = conf.getLocalDirs();
+ Path[] paths = new Path[localDirs.length];
+ FileSystem localFs = FileSystem.getLocal(conf);
+ for (int i = 0; i < localDirs.length; i++) {
+ paths[i] = new Path(localDirs[i], subdir);
+ paths[i] = paths[i].makeQualified(localFs);
+ }
+ return paths;
+ }
+
+ // cleanup queue which deletes files/directories of the paths queued up.
+ private class CleanupQueue extends Thread {
+ private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
+ private JobConf conf;
+
+ public CleanupQueue(JobConf conf) throws IOException{
+ setName("Directory/File cleanup thread");
+ setDaemon(true);
+ this.conf = conf;
+ }
+
+ public void addToQueue(Path... paths) {
+ for (Path p : paths) {
+ try {
+ queue.put(p);
+ } catch (InterruptedException ie) {}
+ }
+ return;
+ }
+
+ public void run() {
+ LOG.debug("cleanup thread started");
+ Path path = null;
+ while (true) {
+ try {
+ if (queue.isEmpty() && !isRunning()) {
+ break;
+ }
+ path = queue.take();
+ // delete the path.
+ FileSystem fs = path.getFileSystem(conf);
+ fs.delete(path, true);
+ } catch (IOException e) {
+ LOG.info("Error deleting path" + path);
+ } catch (InterruptedException t) {
+ }
+ }
+ }
+ }
}