Author: omalley
Date: Fri Mar 4 03:41:15 2011
New Revision: 1077107
URL: http://svn.apache.org/viewvc?rev=1077107&view=rev
Log:
commit 22f13bff8c3eb13e7f306a6e97c981b2fd4780e6
Author: Hemanth Yamijala <[email protected]>
Date: Tue Jan 12 22:14:37 2010 +0530
HADOOP:5737 from
https://issues.apache.org/jira/secure/attachment/12430029/HADOOP-5737-y20.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-5737. Fixes a problem in the way the JobTracker used to talk to
+ other daemons like the NameNode to get the job's files. Also adds APIs
+ in the JobTracker to get the FileSystem objects as per the JobTracker's
+ configuration. (Amar Kamat via ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Fri Mar 4 03:41:15 2011
@@ -39,7 +39,7 @@ class CleanupQueue {
* paths(directories/files) in a separate thread. This constructor creates a
* clean-up thread and also starts it as a daemon. Callers can instantiate
one
* CleanupQueue per JVM and can use it for deleting paths. Use
- * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
+ * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
* deletion.
*/
public CleanupQueue() {
@@ -50,22 +50,22 @@ class CleanupQueue {
}
}
- public void addToQueue(JobConf conf, Path...paths) {
- cleanupThread.addToQueue(conf,paths);
+ public void addToQueue(FileSystem fs, Path...paths) {
+ cleanupThread.addToQueue(fs, paths);
}
private static class PathCleanupThread extends Thread {
- static class PathAndConf {
- JobConf conf;
+ static class PathAndFS {
+ FileSystem fs;
Path path;
- PathAndConf(JobConf conf, Path path) {
- this.conf = conf;
+ PathAndFS(FileSystem fs, Path path) {
+ this.fs = fs;
this.path = path;
}
}
// cleanup queue which deletes files/directories of the paths queued up.
- private LinkedBlockingQueue<PathAndConf> queue = new
LinkedBlockingQueue<PathAndConf>();
+ private LinkedBlockingQueue<PathAndFS> queue = new
LinkedBlockingQueue<PathAndFS>();
public PathCleanupThread() {
setName("Directory/File cleanup thread");
@@ -73,28 +73,27 @@ class CleanupQueue {
start();
}
- public void addToQueue(JobConf conf,Path... paths) {
+ public void addToQueue(FileSystem fs, Path... paths) {
for (Path p : paths) {
try {
- queue.put(new PathAndConf(conf,p));
+ queue.put(new PathAndFS(fs, p));
} catch (InterruptedException ie) {}
}
}
public void run() {
LOG.debug(getName() + " started.");
- PathAndConf pathAndConf = null;
+ PathAndFS pathAndFS = null;
while (true) {
try {
- pathAndConf = queue.take();
+ pathAndFS = queue.take();
// delete the path.
- FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
- fs.delete(pathAndConf.path, true);
- LOG.debug("DELETED " + pathAndConf.path);
+ pathAndFS.fs.delete(pathAndFS.path, true);
+ LOG.debug("DELETED " + pathAndFS.path);
} catch (InterruptedException t) {
return;
} catch (Exception e) {
- LOG.warn("Error deleting path" + pathAndConf.path);
+ LOG.warn("Error deleting path" + pathAndFS.path);
}
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:41:15 2011
@@ -193,6 +193,7 @@ class JobInProgress {
private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
private LocalFileSystem localFs;
+ private FileSystem fs;
private JobID jobId;
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
@@ -300,7 +301,7 @@ class JobInProgress {
this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
this.startTime = System.currentTimeMillis();
status.setStartTime(startTime);
- this.localFs = FileSystem.getLocal(default_conf);
+ this.localFs = jobtracker.getLocalFileSystem();
JobConf default_job_conf = new JobConf(default_conf);
this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
@@ -314,7 +315,7 @@ class JobInProgress {
LOG.info("User : " + this.user);
Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
- FileSystem fs = jobDir.getFileSystem(default_conf);
+ fs = jobtracker.getFileSystem(jobDir);
jobFile = new Path(jobDir, "job.xml");
fs.copyToLocalFile(jobFile, localJobFile);
conf = new JobConf(localJobFile);
@@ -499,8 +500,6 @@ class JobInProgress {
// log the job priority
setPriority(this.priority);
- Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
- FileSystem fs = jobDir.getFileSystem(conf);
//
// generate security keys needed by Tasks
//
@@ -2856,7 +2855,7 @@ class JobInProgress {
// Delete temp dfs dirs created if any, like in case of
// speculative exn of reduces.
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(conf,tempDir);
+ new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir),
tempDir);
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:41:15 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -2194,6 +2195,29 @@ public class JobTracker implements MRCon
: 0;
}
+ /**
+ * Get JobTracker's FileSystem. This is the filesystem for mapred.system.dir.
+ */
+ FileSystem getFileSystem() {
+ return fs;
+ }
+
+ /**
+ * Get the FileSystem for the given path. This can be used to resolve
+ * filesystem for job history, local job files or mapred.system.dir path.
+ */
+ FileSystem getFileSystem(Path path) throws IOException {
+ return path.getFileSystem(conf);
+ }
+
+ /**
+ * Get JobTracker's LocalFileSystem handle. This is used by jobs for
+ * localizing job files to the local disk.
+ */
+ LocalFileSystem getLocalFileSystem() throws IOException {
+ return FileSystem.getLocal(conf);
+ }
+
public static Class<? extends JobTrackerInstrumentation>
getInstrumentationClass(Configuration conf) {
return conf.getClass("mapred.jobtracker.instrumentation",
JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
@@ -3544,7 +3568,7 @@ public class JobTracker implements MRCon
String queue = job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {
- new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+ new CleanupQueue().addToQueue(fs,getSystemDirectoryForJob(jobId));
job.fail();
if (userFileForJob != null) {
userFileForJob.delete();
@@ -3562,7 +3586,7 @@ public class JobTracker implements MRCon
if (userFileForJob != null) {
userFileForJob.delete();
}
- new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+ new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
throw ioe;
}
@@ -3571,7 +3595,7 @@ public class JobTracker implements MRCon
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
- new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+ new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
throw ioe;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar 4 03:41:15 2011
@@ -179,7 +179,7 @@ public class TaskTracker
// The filesystem where job files are stored
FileSystem systemFS = null;
-
+ private FileSystem localFs = null;
private final HttpServer server;
volatile boolean shuttingDown = false;
@@ -500,6 +500,7 @@ public class TaskTracker
synchronized void initialize() throws IOException {
// use configured nameserver & interface to get local hostname
this.fConf = new JobConf(originalConf);
+ localFs = FileSystem.getLocal(fConf);
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
}
@@ -1511,7 +1512,7 @@ public class TaskTracker
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
- directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf,
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
@@ -2629,19 +2630,19 @@ public class TaskTracker
//might be using the dir. The JVM running the tasks would clean
//the workdir per a task in the task process itself.
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(defaultJobConf,
+ directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
taskDir));
}
else {
- directoryCleanupThread.addToQueue(defaultJobConf,
+ directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
taskDir+"/job.xml"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(defaultJobConf,
+ directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
taskDir+"/work"));
}