Author: omalley
Date: Fri Mar 4 03:46:35 2011
New Revision: 1077153
URL: http://svn.apache.org/viewvc?rev=1077153&view=rev
Log:
commit dd4956feca64b86eb358b512cfdbac7bfc0a7942
Author: Devaraj Das <[email protected]>
Date: Sun Feb 7 00:22:29 2010 -0800
MAPREDUCE:1457 from
https://issues.apache.org/jira/secure/attachment/12435115/MAPREDUCE-1457-BPY20.patch.1
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within
getStagingAreaDir
+ within a privileged block. Fixes Child.java to use the appropriate
UGIs while getting
+ the TaskUmbilicalProtocol proxy and while executing the task.
+ Contributed by Jakob Homan. (ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
Fri Mar 4 03:46:35 2011
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,10 +59,10 @@ class Child {
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
- JobConf defaultConf = new JobConf();
+ final JobConf defaultConf = new JobConf();
String host = args[0];
int port = Integer.parseInt(args[1]);
- InetSocketAddress address = new InetSocketAddress(host, port);
+ final InetSocketAddress address = new InetSocketAddress(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
final int SLEEP_LONGER_COUNT = 5;
int jvmIdInt = Integer.parseInt(args[3]);
@@ -81,11 +82,21 @@ class Child {
UserGroupInformation current = UserGroupInformation.getCurrentUser();
current.addToken(jt);
- TaskUmbilicalProtocol umbilical =
- (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
- TaskUmbilicalProtocol.versionID,
- address,
- defaultConf);
+ UserGroupInformation taskOwner
+ =
UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+ taskOwner.addToken(jt);
+
+ final TaskUmbilicalProtocol umbilical =
+ taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+ @Override
+ public TaskUmbilicalProtocol run() throws Exception {
+ return
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID,
+ address,
+ defaultConf);
+ }
+ });
+
int numTasksToExecute = -1; //-1 signifies "no limit"
int numTasksExecuted = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -127,6 +138,9 @@ class Child {
JvmContext context = new JvmContext(jvmId, pid);
int idleLoopCount = 0;
Task task = null;
+
+ UserGroupInformation childUGI = null;
+
try {
while (true) {
taskid = null;
@@ -156,7 +170,7 @@ class Child {
//create the index file so that the log files
//are viewable immediately
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- JobConf job = new JobConf(task.getJobFile());
+ final JobConf job = new JobConf(task.getJobFile());
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.
@@ -181,11 +195,27 @@ class Child {
JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
- try {
- task.run(job, umbilical); // run the task
- } finally {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- }
+ LOG.debug("Creating remote user to execute task: " +
job.get("user.name"));
+ childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+ // Add tokens to new user so that it may execute its task correctly.
+ for(Token<?> token :
UserGroupInformation.getCurrentUser().getTokens()) {
+ childUGI.addToken(token);
+ }
+
+ // Create a final reference to the task for the doAs block
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ taskFinal.run(job, umbilical); // run the task
+ } finally {
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ }
+
+ return null;
+ }
+ });
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
}
@@ -198,7 +228,18 @@ class Child {
try {
if (task != null) {
// do cleanup for the task
- task.taskCleanup(umbilical);
+ if(childUGI == null) {
+ task.taskCleanup(umbilical);
+ } else {
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ taskFinal.taskCleanup(umbilical);
+ return null;
+ }
+ });
+ }
}
} catch (Exception e) {
LOG.info("Error cleaning up", e);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:46:35 2011
@@ -271,6 +271,8 @@ class JobInProgress {
private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces =
new HashMap<TaskTracker, FallowSlotInfo>();
private Path jobSubmitDir = null;
+
+ final private UserGroupInformation userUGI;
/**
* Create an almost empty JobInProgress, which can be used only for tests
@@ -284,6 +286,11 @@ class JobInProgress {
this.anyCacheLevel = this.maxLevel+1;
this.jobtracker = tracker;
this.restartCount = 0;
+ try {
+ this.userUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie){
+ throw new RuntimeException(ie);
+ }
}
/**
@@ -320,14 +327,14 @@ class JobInProgress {
// use the user supplied token to add user credentials to the conf
jobSubmitDir = jobInfo.getJobSubmitDir();
user = jobInfo.getUser().toString();
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ userUGI = UserGroupInformation.createRemoteUser(user);
if (ts != null) {
for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
- ugi.addToken(token);
+ userUGI.addToken(token);
}
}
- fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return jobSubmitDir.getFileSystem(default_conf);
}});
@@ -525,10 +532,21 @@ class JobInProgress {
}
LOG.info("Initializing " + jobId);
-
- // log job info
- JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
- this.startTime, hasRestarted());
+ final long startTimeFinal = this.startTime;
+ // log job info as the user running the job
+ try {
+ userUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
+ startTimeFinal, hasRestarted());
+ return null;
+ }
+ });
+ } catch(InterruptedException ie) {
+ throw new IOException(ie);
+ }
+
// log the job priority
setPriority(this.priority);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:46:35 2011
@@ -2031,8 +2031,15 @@ public class JobTracker implements MRCon
tmpInfoPort == 0, conf);
infoServer.setAttribute("job.tracker", this);
// initialize history parameters.
- boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
- this.startTime);
+ final JobTracker jtFinal = this;
+ boolean historyInitialized =
+ mrOwner.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ return JobHistory.init(jtFinal, conf,jtFinal.localMachine,
+ jtFinal.startTime);
+ }
+ });
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
@@ -2147,16 +2154,16 @@ public class JobTracker implements MRCon
// Initialize history DONE folder
if (historyInitialized) {
- JobHistory.initDone(conf, fs);
- final String historyLogDir =
- JobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
FileSystem historyFS = mrOwner.doAs(new
PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
+ JobHistory.initDone(conf, fs);
+ final String historyLogDir =
+ JobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
+
return new Path(historyLogDir).getFileSystem(conf);
}
});
- infoServer.setAttribute("historyLogDir", historyLogDir);
infoServer.setAttribute("fileSys", historyFS);
}
@@ -3570,13 +3577,22 @@ public class JobTracker implements MRCon
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
- Path stagingRootDir =
- new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
- "/tmp/hadoop/mapred/staging"));
- FileSystem fs = stagingRootDir.getFileSystem(conf);
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- return fs.makeQualified(new Path(stagingRootDir,
- user+"/.staging")).toString();
+ try{
+ final String user =
UserGroupInformation.getCurrentUser().getShortUserName();
+ return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ Path stagingRootDir =
+ new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
+ FileSystem fs = stagingRootDir.getFileSystem(conf);
+ return fs.makeQualified(new Path(stagingRootDir,
+ user+"/.staging")).toString();
+ }
+ });
+ } catch(InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**