Author: yhemanth
Date: Thu Mar 12 11:25:32 2009
New Revision: 752834
URL: http://svn.apache.org/viewvc?rev=752834&view=rev
Log:
HADOOP-5327. Fixed job tracker to remove files from system directory on ACL
check failures and also check ACLs on restart. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=752834&r1=752833&r2=752834&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 12 11:25:32 2009
@@ -975,6 +975,10 @@
HADOOP-5416. Correct the shell command "fs -test" forrest doc description.
(Ravi Phulari via szetszwo)
+ HADOOP-5327. Fixed job tracker to remove files from system directory on
+ ACL check failures and also check ACLs on restart.
+ (Amar Kamat via yhemanth)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=752834&r1=752833&r2=752834&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Thu Mar 12 11:25:32 2009
@@ -235,9 +235,9 @@
+"/"+jobid + ".xml");
this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
+"/"+ jobid + ".jar");
- Path sysDir = new Path(this.jobtracker.getSystemDir());
- FileSystem fs = sysDir.getFileSystem(default_conf);
- jobFile = new Path(sysDir, jobid + "/job.xml");
+ Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+ FileSystem fs = jobDir.getFileSystem(default_conf);
+ jobFile = new Path(jobDir, "job.xml");
fs.copyToLocalFile(jobFile, localJobFile);
conf = new JobConf(localJobFile);
this.priority = conf.getJobPriority();
@@ -2474,7 +2474,7 @@
// so we remove that directory to cleanup
// Delete temp dfs dirs created if any, like in case of
// speculative exn of reduces.
- Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
+ Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
new CleanupQueue().addToQueue(conf,tempDir);
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752834&r1=752833&r2=752834&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu
Mar 12 11:25:32 2009
@@ -1098,31 +1098,45 @@
try {
// 1. Create the job object
JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
- String logFileName;
- Path jobHistoryFilePath;
- // 2. Get the log file and the file path
- logFileName =
+ // 2. Check if the user has appropriate access
+ // Get the user group info for the job's owner
+ UserGroupInformation ugi =
+ UserGroupInformation.readFrom(job.getJobConf());
+ LOG.info("Submitting job " + id + " on behalf of user "
+ + ugi.getUserName() + " in groups : "
+ + StringUtils.arrayToString(ugi.getGroupNames()));
+
+ // check the access
+ try {
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
+ } catch (Throwable t) {
+ LOG.warn("Access denied for user " + ugi.getUserName()
+ + " in groups : ["
+ + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
+ throw t;
+ }
+
+ // 3. Get the log file and the file path
+ String logFileName =
JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
- jobHistoryFilePath =
+ Path jobHistoryFilePath =
JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
- // 3. Recover the history file. This involved
+ // 4. Recover the history file. This involved
// - deleting file.recover if file exists
// - renaming file.recover to file if file doesnt exist
// This makes sure that the (master) file exists
JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
jobHistoryFilePath);
- // 4. Cache the history file name as it costs one dfs access
+ // 5. Cache the history file name as it costs one dfs access
jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
- // 5. Sumbit the job to the jobtracker
+ // 6. Sumbit the job to the jobtracker
addJob(id, job);
} catch (Throwable t) {
- LOG.warn("Failed to recover job " + id + " history details."
- + " Ignoring.", t);
- // TODO : remove job details from the system directory
+ LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
idIter.remove();
continue;
}
@@ -1157,8 +1171,8 @@
JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
listener, fs);
} catch (Throwable t) {
- LOG.info("JobTracker failed to recover job " + pJob.getJobID()
- + " from history. Ignoring it.", t);
+ LOG.info("Error reading history file of job " + pJob.getJobID()
+ + ". Ignoring the error and continuing.", t);
}
// 3. Close the listener
@@ -1179,7 +1193,7 @@
}
} catch (Throwable t) {
LOG.warn("Failed to delete log file (" + logFileName + ") for job "
- + id + ". Ignoring it.", t);
+ + id + ". Continuing.", t);
}
if (pJob.isComplete()) {
@@ -2763,7 +2777,14 @@
JobInProgress job = new JobInProgress(jobId, this, this.conf);
// check for access
- checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+ try {
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+ } catch (IOException ioe) {
+ LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ + ". Ignoring job " + jobId, ioe);
+ new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+ throw ioe;
+ }
return addJob(jobId, job);
}
@@ -2794,15 +2815,18 @@
}
// Check whether the specified operation can be performed
- // related to the job. If ownerAllowed is true, then an owner
- // of the job can perform the operation irrespective of
- // access control.
+ // related to the job.
private void checkAccess(JobInProgress job,
QueueManager.QueueOperation oper)
throws IOException {
// get the user group info
UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+ checkAccess(job, oper, ugi);
+ }
+ // use the passed ugi for checking the access
+ private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
+ UserGroupInformation ugi) throws IOException {
// get the queue
String queue = job.getProfile().getQueueName();
if (!queueManager.hasAccess(queue, job, oper, ugi)) {
@@ -3148,6 +3172,11 @@
return jobs.get(jobid);
}
+ // Get the job directory in system directory
+ Path getSystemDirectoryForJob(JobID id) {
+ return new Path(getSystemDir(), id.toString());
+ }
+
/**
* Change the run-time priority of the given job.
* @param jobId job id
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=752834&r1=752833&r2=752834&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
Thu Mar 12 11:25:32 2009
@@ -31,6 +31,7 @@
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -217,6 +218,16 @@
assertTrue(ioe.getMessage().
contains("cannot perform operation " +
"SUBMIT_JOB on queue " + queue));
+ // check if the system directory gets cleaned up or not
+ JobTracker jobtracker =
miniMRCluster.getJobTrackerRunner().getJobTracker();
+ Path sysDir = new Path(jobtracker.getSystemDir());
+ FileSystem fs = sysDir.getFileSystem(conf);
+ int size = fs.listStatus(sysDir).length;
+ while (size > 0) {
+ System.out.println("Waiting for the job files in sys directory to be
cleaned up");
+ UtilsForTests.waitFor(100);
+ size = fs.listStatus(sysDir).length;
+ }
}
} finally {
tearDownCluster();
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=752834&r1=752833&r2=752834&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Thu Mar 12 11:25:32 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Test whether the {...@link RecoveryManager} is able to tolerate
job-recovery
@@ -203,6 +204,25 @@
UtilsForTests.waitFor(100);
}
+ // now submit job3 with inappropriate acls
+ JobConf job3 = mr.createJobConf();
+ job3.set("hadoop.job.ugi","abc,users");
+
+ UtilsForTests.configureWaitingJobConf(job3,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 1, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // submit the job
+ RunningJob rJob3 = (new JobClient(job3)).submitJob(job3);
+ LOG.info("Submitted job " + rJob3.getID() + " with different user");
+
+ jip = jobtracker.getJob(rJob3.getID());
+
+ while (!jip.inited()) {
+ LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+ UtilsForTests.waitFor(100);
+ }
+
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
@@ -212,9 +232,15 @@
true);
mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
+ mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
+ UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
+ mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job",
+ ugi.getUserName());
+
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
+ UtilsForTests.waitForJobTracker(jc);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -222,6 +248,17 @@
assertEquals("Recovery manager failed to tolerate job failures",
2, jobtracker.getAllJobs().length);
+ // check if the job#1 has failed
+ JobStatus status = jobtracker.getJobStatus(rJob1.getID());
+ assertEquals("Faulty job not failed",
+ JobStatus.FAILED, status.getRunState());
+
+ jip = jobtracker.getJob(rJob2.getID());
+ assertFalse("Job should be running", jip.isComplete());
+
+ status = jobtracker.getJobStatus(rJob3.getID());
+ assertNull("Job should be missing", status);
+
mr.shutdown();
}
}