Author: omalley
Date: Fri Mar 4 03:40:38 2011
New Revision: 1077101
URL: http://svn.apache.org/viewvc?rev=1077101&view=rev
Log:
commit 9dd71b62d66a21544c9554c5659123a044621005
Author: Hemanth Yamijala <[email protected]>
Date: Mon Jan 11 20:18:53 2010 +0530
Reverting patch
https://issues.apache.org/jira/secure/attachment/12427328/y896.v2.1.patch for
MAPREDUCE:896
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
Fri Mar 4 03:40:38 2011
@@ -23,7 +23,6 @@ int main(int argc, char **argv) {
int next_option = 0;
const char * job_id = NULL;
const char * task_id = NULL;
- const char * dir_to_be_deleted = NULL;
const char * tt_root = NULL;
int exit_code = 0;
const char * task_pid = NULL;
@@ -32,7 +31,6 @@ int main(int argc, char **argv) {
NULL, 0 } };
const char* log_file = NULL;
- char * base_path = NULL;
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
@@ -110,13 +108,6 @@ int main(int argc, char **argv) {
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
- case ENABLE_TASK_FOR_CLEANUP:
- base_path = argv[optind++];
- job_id = argv[optind++];
- dir_to_be_deleted = argv[optind++];
- exit_code = enable_task_for_cleanup(base_path, user_detail->pw_name,
job_id,
- dir_to_be_deleted);
- break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
Fri Mar 4 03:40:38 2011
@@ -104,17 +104,12 @@ int check_tt_root(const char *tt_root) {
* path resolve to one and same.
*/
-int check_path_for_relative_components(char *path) {
+int check_path(char *path) {
char * resolved_path = (char *) canonicalize_file_name(path);
if(resolved_path == NULL) {
- fprintf(LOGFILE, "Error resolving the path: %s. Passed path: %s\n",
- strerror(errno), path);
return ERROR_RESOLVING_FILE_PATH;
}
if(strcmp(resolved_path, path) !=0) {
- fprintf(LOGFILE,
- "Relative path components in the path: %s. Resolved path: %s\n",
- path, resolved_path);
free(resolved_path);
return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
}
@@ -169,34 +164,6 @@ void get_task_file_path(const char * job
free(mapred_local_dir);
}
-/*
- * Builds the full path of the dir(localTaskDir or localWorkDir)
- * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
- *
- * Check TT_LOCAL_TASK_DIR_PATTERN for pattern
- */
-void get_task_dir_path(const char * tt_root, const char * jobid,
- const char * dir_to_be_deleted, char **task_dir_path) {
- *task_dir_path = NULL;
- int str_len = strlen(TT_LOCAL_TASK_DIR_PATTERN) + strlen(jobid) + strlen(
- dir_to_be_deleted) + strlen(tt_root);
-
- *task_dir_path = (char *) malloc(sizeof(char) * (str_len + 1));
- if (*task_dir_path == NULL) {
- fprintf(LOGFILE, "Unable to allocate memory for task_dir_path \n");
- return;
- }
-
- memset(*task_dir_path,'\0',str_len+1);
- snprintf(*task_dir_path, str_len, TT_LOCAL_TASK_DIR_PATTERN, tt_root,
- jobid, dir_to_be_deleted);
-#ifdef DEBUG
- fprintf(LOGFILE, "get_task_dir_path : task dir path = %s\n", *task_dir_path);
- fflush(LOGFILE);
-#endif
-}
-
//end of private functions
void display_usage(FILE *stream) {
fprintf(stream,
@@ -216,200 +183,6 @@ int get_user_details(const char *user) {
return 0;
}
-/**
- * Compare ownership of a file with the given ids.
- */
-int compare_ownership(uid_t uid, gid_t gid, char *path) {
- struct stat filestat;
- if (stat(path, &filestat) != 0) {
- return UNABLE_TO_STAT_FILE;
- }
- if (uid == filestat.st_uid && gid == filestat.st_gid) {
- return 0;
- }
- return 1;
-}
-
-/*
- * Function to check if the TaskTracker actually owns the file.
- */
-int check_ownership(char *path) {
- struct stat filestat;
- if (stat(path, &filestat) != 0) {
- return UNABLE_TO_STAT_FILE;
- }
- // check user/group. User should be TaskTracker user, group can either be
- // TaskTracker's primary group or the special group to which binary's
- // permissions are set.
- if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
- != filestat.st_gid)) {
- return FILE_NOT_OWNED_BY_TASKTRACKER;
- }
- return 0;
-}
-
-/**
- * Function to change the owner/group of a given path.
- */
-static int change_owner(const char *path, uid_t uid, gid_t gid) {
- int exit_code = chown(path, uid, gid);
- if (exit_code != 0) {
- fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path,
- strerror(errno));
- }
- return exit_code;
-}
-
-/**
- * Function to change the mode of a given path.
- */
-static int change_mode(const char *path, mode_t mode) {
- int exit_code = chmod(path, mode);
- if (exit_code != 0) {
- fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
- strerror(errno));
- }
- return exit_code;
-}
-
-/**
- * Function to change permissions of the given path. It does the following
- * recursively:
- * 1) changes the owner/group of the paths to the passed owner/group
- * 2) changes the file permission to the passed file_mode and directory
- * permission to the passed dir_mode
- *
- * should_check_ownership : boolean to enable checking of ownership of each
path
- */
-static int secure_path(const char *path, uid_t uid, gid_t gid,
- mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
- FTS *tree = NULL; // the file hierarchy
- FTSENT *entry = NULL; // a file in the hierarchy
- char *paths[] = { (char *) path };
- int process_path = 0;
- int dir = 0;
- int error_code = 0;
- int done = 0;
-
- // Get physical locations and don't resolve the symlinks.
- // Don't change directory while walking the directory.
- int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
-
- tree = fts_open(paths, ftsoptions, NULL);
- if (tree == NULL) {
- fprintf(LOGFILE,
- "Cannot open file traversal structure for the path %s:%s.\n", path,
- strerror(errno));
- return -1;
- }
-
- while (((entry = fts_read(tree)) != NULL) && !done) {
- dir = 0;
- switch (entry->fts_info) {
- case FTS_D:
- // A directory being visited in pre-order.
- // We change ownership of directories in post-order.
- // so ignore the pre-order visit.
- process_path = 0;
- break;
- case FTS_DC:
- // A directory that causes a cycle in the tree
- // We don't expect cycles, ignore.
- process_path = 0;
- break;
- case FTS_DNR:
- // A directory which cannot be read
- // Ignore and set error code.
- process_path = 0;
- error_code = -1;
- break;
- case FTS_DOT:
- // "." or ".."
- process_path = 0;
- break;
- case FTS_F:
- // A regular file
- process_path = 1;
- break;
- case FTS_DP:
- // A directory being visited in post-order
- if (entry->fts_level == 0) {
- // root directory. Done with traversing.
- done = 1;
- }
- process_path = 1;
- dir = 1;
- break;
- case FTS_SL:
- // A symbolic link
- process_path = 1;
- break;
- case FTS_SLNONE:
- // A symbolic link with a nonexistent target
- process_path = 1;
- break;
- case FTS_NS:
- // A file for which no stat(2) information was available
- // Ignore and set error code
- process_path = 0;
- error_code = -1;
- break;
- case FTS_ERR:
- // An error return. Ignore and set error code.
- process_path = 0;
- error_code = -1;
- break;
- case FTS_DEFAULT:
- // File that doesn't belong to any of the above type. Ignore.
- process_path = 0;
- break;
- default:
- // None of the above. Ignore and set error code
- process_path = 0;
- error_code = -1;
- }
-
- if (error_code != 0) {
- break;
- }
- if (!process_path) {
- continue;
- }
- if (should_check_ownership &&
- (compare_ownership(uid, gid, entry->fts_path) == 0)) {
- // already set proper permissions.
- // This might happen with distributed cache.
-#ifdef DEBUG
- fprintf(
- LOGFILE,
- "already has private permissions. Not trying to change again for %s",
- entry->fts_path);
-#endif
- continue;
- }
-
- if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
- fprintf(LOGFILE,
- "Invalid file path. %s not user/group owned by the tasktracker.\n",
- entry->fts_path);
- error_code = -1;
- } else if (change_owner(entry->fts_path, uid, gid) != 0) {
- fprintf(LOGFILE, "couldn't change the ownership of %s\n",
- entry->fts_path);
- error_code = -3;
- } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) !=
0) {
- fprintf(LOGFILE, "couldn't change the permissions of %s\n",
- entry->fts_path);
- error_code = -3;
- }
- }
- if (fts_close(tree) != 0) {
- fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
- strerror(errno));
- }
- return error_code;
-}
-
/*
*Function used to launch a task as the provided user.
* First the function checks if the tt_root passed is found in
@@ -458,7 +231,7 @@ int run_task_as_user(const char * user,
return INVALID_TASK_SCRIPT_PATH;
}
errno = 0;
- exit_code = check_path_for_relative_components(task_script_path);
+ exit_code = check_path(task_script_path);
if(exit_code != 0) {
goto cleanup;
}
@@ -528,59 +301,3 @@ int kill_user_task(const char *user, con
return 0;
}
-/**
- * Enables the path for deletion by changing the owner, group and permissions
- * of the specified path and all the files/directories in the path recursively.
- * * sudo chown user:mapred -R full_path
- * * sudo chmod 2777 -R full_path
- * Before changing permissions, makes sure that the given path doesn't contain
- * any relative components.
- * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
- */
-int enable_task_for_cleanup(char *tt_root, const char *user,
- const char *jobid, const char *dir_to_be_deleted) {
- int exit_code = 0;
- gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-
- char * full_path = NULL;
- if (check_tt_root(tt_root) < 0) {
- fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
- cleanup();
- return INVALID_TT_ROOT;
- }
-
- get_task_dir_path(tt_root, jobid, dir_to_be_deleted, &full_path);
- if (full_path == NULL) {
- fprintf(LOGFILE,
- "Could not build the full path. Not deleting the dir %s\n",
- dir_to_be_deleted);
- exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
- }
- // Make sure that the path given is not having any relative components
- else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
- fprintf(LOGFILE,
- "Not changing permissions as the path contains relative
components.\n",
- full_path);
- }
- else if (get_user_details(user) < 0) {
- fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
- exit_code = INVALID_USER_NAME;
- }
- else if (exit_code = secure_path(full_path, user_detail->pw_uid,
- tasktracker_gid, S_IRWXU | S_IRWXG | S_IRWXO,
- S_ISGID | S_IRWXU | S_IRWXG | S_IRWXO, 0) != 0) {
- // No setgid on files and setgid on dirs, 777.
- // set 777 permissions for user, TTgroup for all files/directories in
- // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
-
- fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
- }
-
- if (full_path != NULL) {
- free(full_path);
- }
- // free configurations
- cleanup();
- return exit_code;
-}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
Fri Mar 4 03:40:38 2011
@@ -29,16 +29,13 @@
#include <sys/signal.h>
#include <getopt.h>
#include<grp.h>
-#include <fts.h>
-
#include "configuration.h"
//command definitions
enum command {
LAUNCH_TASK_JVM,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP
+ KILL_TASK_JVM
};
enum errorcodes {
@@ -56,15 +53,12 @@ enum errorcodes {
ERROR_RESOLVING_FILE_PATH, //12
RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
UNABLE_TO_STAT_FILE, //14
- FILE_NOT_OWNED_BY_TASKTRACKER, //15
- UNABLE_TO_BUILD_PATH //16
+ FILE_NOT_OWNED_BY_TASKTRACKER //15
};
#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
-#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/jobcache/%s/%s"
-
#define TT_SYS_DIR_KEY "mapred.local.dir"
#define MAX_ITEMS 10
@@ -84,7 +78,4 @@ int run_task_as_user(const char * user,
int kill_user_task(const char *user, const char *task_pid, int sig);
-int enable_task_for_cleanup(char * base_path, const char *user,
- const char *jobid, const char *dir_to_be_deleted);
-
int get_user_details(const char *user);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Fri Mar 4 03:40:38 2011
@@ -39,7 +39,7 @@ class CleanupQueue {
* paths(directories/files) in a separate thread. This constructor creates a
* clean-up thread and also starts it as a daemon. Callers can instantiate
one
* CleanupQueue per JVM and can use it for deleting paths. Use
- * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
+ * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
* deletion.
*/
public CleanupQueue() {
@@ -49,53 +49,23 @@ class CleanupQueue {
}
}
}
-
- /**
- * Contains info related to the path of the file/dir to be deleted
- */
- static class PathDeletionContext {
- String fullPath;// full path of file or dir
- FileSystem fs;
-
- public PathDeletionContext(FileSystem fs, String fullPath) {
- this.fs = fs;
- this.fullPath = fullPath;
- }
-
- protected String getPathForCleanup() {
- return fullPath;
- }
-
- /**
- * Makes the path(and its subdirectories recursively) fully deletable
- */
- protected void enablePathForCleanup() throws IOException {
- // do nothing
- }
- }
-
- /**
- * Adds the paths to the queue of paths to be deleted by cleanupThread.
- */
- void addToQueue(PathDeletionContext... contexts) {
- cleanupThread.addToQueue(contexts);
- }
-
- protected static boolean deletePath(PathDeletionContext context)
- throws IOException {
- context.enablePathForCleanup();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to delete " + context.fullPath);
- }
- return context.fs.delete(new Path(context.fullPath), true);
+
+ public void addToQueue(JobConf conf, Path...paths) {
+ cleanupThread.addToQueue(conf,paths);
}
private static class PathCleanupThread extends Thread {
+ static class PathAndConf {
+ JobConf conf;
+ Path path;
+ PathAndConf(JobConf conf, Path path) {
+ this.conf = conf;
+ this.path = path;
+ }
+ }
// cleanup queue which deletes files/directories of the paths queued up.
- private LinkedBlockingQueue<PathDeletionContext> queue =
- new LinkedBlockingQueue<PathDeletionContext>();
+ private LinkedBlockingQueue<PathAndConf> queue = new
LinkedBlockingQueue<PathAndConf>();
public PathCleanupThread() {
setName("Directory/File cleanup thread");
@@ -103,34 +73,28 @@ class CleanupQueue {
start();
}
- void addToQueue(PathDeletionContext[] contexts) {
- for (PathDeletionContext context : contexts) {
+ public void addToQueue(JobConf conf,Path... paths) {
+ for (Path p : paths) {
try {
- queue.put(context);
- } catch(InterruptedException ie) {}
+ queue.put(new PathAndConf(conf,p));
+ } catch (InterruptedException ie) {}
}
}
public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + " started.");
- }
- PathDeletionContext context = null;
+ LOG.debug(getName() + " started.");
+ PathAndConf pathAndConf = null;
while (true) {
try {
- context = queue.take();
+ pathAndConf = queue.take();
// delete the path.
- if (!deletePath(context)) {
- LOG.warn("CleanupThread:Unable to delete path " +
context.fullPath);
- }
- else if (LOG.isDebugEnabled()) {
- LOG.debug("DELETED " + context.fullPath);
- }
+ FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
+ fs.delete(pathAndConf.path, true);
+ LOG.debug("DELETED " + pathAndConf.path);
} catch (InterruptedException t) {
- LOG.warn("Interrupted deletion of " + context.fullPath);
return;
} catch (Exception e) {
- LOG.warn("Error deleting path " + context.fullPath + ": " + e);
+ LOG.warn("Error deleting path" + pathAndConf.path);
}
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
Fri Mar 4 03:40:38 2011
@@ -21,9 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.fs.FileUtil;
+
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -135,18 +134,4 @@ class DefaultTaskController extends Task
}
}
- /**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
- @Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- try {
- FileUtil.chmod(context.fullPath, "a+rwx", true);
- } catch(InterruptedException e) {
- LOG.warn("Interrupted while setting permissions for " + context.fullPath
+
- " for deletion.");
- }
- }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:40:38 2011
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.metrics.MetricsContext;
@@ -2857,8 +2856,7 @@ class JobInProgress {
// Delete temp dfs dirs created if any, like in case of
// speculative exn of reduces.
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(new PathDeletionContext(
- FileSystem.get(conf), tempDir.toUri().getPath()));
+ new CleanupQueue().addToQueue(conf,tempDir);
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:40:38 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.mapred.JobHistory.Keys;
@@ -75,7 +76,6 @@ import org.apache.hadoop.mapred.JobHisto
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
@@ -3544,9 +3544,7 @@ public class JobTracker implements MRCon
String queue = job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {
- new CleanupQueue().addToQueue(new PathDeletionContext(
- FileSystem.get(conf),
- getSystemDirectoryForJob(jobId).toUri().getPath()));
+ new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
job.fail();
if (userFileForJob != null) {
userFileForJob.delete();
@@ -3564,9 +3562,7 @@ public class JobTracker implements MRCon
if (userFileForJob != null) {
userFileForJob.delete();
}
- new CleanupQueue().addToQueue(new PathDeletionContext(
- FileSystem.get(conf),
- getSystemDirectoryForJob(jobId).toUri().getPath()));
+ new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
@@ -3575,9 +3571,7 @@ public class JobTracker implements MRCon
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
- new CleanupQueue().addToQueue(new PathDeletionContext(
- FileSystem.get(conf),
- getSystemDirectoryForJob(jobId).toUri().getPath()));
+ new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
Fri Mar 4 03:40:38 2011
@@ -428,12 +428,7 @@ class JvmManager {
//task at the beginning of each task in the task JVM.
//For the last task, we do it here.
if (env.conf.getNumTasksToExecutePerJvm() != 1) {
- tracker.directoryCleanupThread.addToQueue(
- TaskTracker.buildTaskControllerPathDeletionContexts(
- tracker.getLocalFileSystem(), tracker.getLocalDirs(),
- initalContext.task,
- true /* workDir */,
- tracker.getTaskController()));
+ FileUtil.fullyDelete(env.workDir);
}
} catch (IOException ie){}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
Fri Mar 4 03:40:38 2011
@@ -24,16 +24,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -113,8 +110,7 @@ class LinuxTaskController extends TaskCo
enum TaskCommands {
LAUNCH_TASK_JVM,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP
+ KILL_TASK_JVM
}
/**
@@ -154,7 +150,7 @@ class LinuxTaskController extends TaskCo
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
- launchTaskJVMArgs, env.workDir, env.env);
+ launchTaskJVMArgs, env);
context.shExec = shExec;
try {
shExec.execute();
@@ -171,40 +167,6 @@ class LinuxTaskController extends TaskCo
}
/**
- * Helper method that runs a LinuxTaskController command
- *
- * @param taskCommand
- * @param user
- * @param cmdArgs
- * @param env
- * @throws IOException
- */
- private void runCommand(TaskCommands taskCommand, String user,
- List<String> cmdArgs, File workDir, Map<String, String> env)
- throws IOException {
-
- ShellCommandExecutor shExec =
- buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
- try {
- shExec.execute();
- } catch (Exception e) {
- LOG.warn("Exit code from " + taskCommand.toString() + " is : "
- + shExec.getExitCode());
- LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
- + StringUtils.stringifyException(e));
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
- + " follows:");
- logOutput(shExec.getOutput());
- throw new IOException(e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
- + " follows:");
- logOutput(shExec.getOutput());
- }
- }
-
- /**
* Returns list of arguments to be passed while launching task VM.
* See {@code buildTaskControllerExecutor(TaskCommands,
* String, List<String>, JvmEnv)} documentation.
@@ -229,67 +191,6 @@ class LinuxTaskController extends TaskCo
return commandArgs;
}
- private List<String> buildTaskCleanupArgs(
- TaskControllerPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.task.getJobID().toString());
-
- String workDir = "";
- if (context.isWorkDir) {
- workDir = "/work";
- }
- if (context.task.isTaskCleanupTask()) {
- commandArgs.add(context.task.getTaskID() +
TaskTracker.TASK_CLEANUP_SUFFIX
- + workDir);
- } else {
- commandArgs.add(context.task.getTaskID() + workDir);
- }
-
- return commandArgs;
- }
-
- /**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
- @Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " +
TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
- + " for " + context.fullPath);
- }
-
- if (context instanceof TaskControllerPathDeletionContext) {
- TaskControllerPathDeletionContext tContext =
- (TaskControllerPathDeletionContext) context;
-
- if (tContext.task.getUser() != null && tContext.fs instanceof
LocalFileSystem) {
- runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
- tContext.task.getUser(),
- buildTaskCleanupArgs(tContext), null, null);
- }
- else {
- throw new IllegalArgumentException("Either user is null or the " +
- "file system is not local file system.");
- }
- }
- else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerPathDeletionContext.");
- }
- }
-
- private void logOutput(String output) {
- String shExecOutput = output;
- if (shExecOutput != null) {
- for (String str : shExecOutput.split("\n")) {
- LOG.info(str);
- }
- }
- }
-
// get the Job ID from the information in the TaskControllerContext
private String getJobId(TaskControllerContext context) {
String taskId = context.task.getTaskID().toString();
@@ -398,10 +299,10 @@ class LinuxTaskController extends TaskCo
* @return {@link ShellCommandExecutor}
* @throws IOException
*/
- private ShellCommandExecutor buildTaskControllerExecutor(
- TaskCommands command, String userName, List<String> cmdArgs,
- File workDir, Map<String, String> env)
- throws IOException {
+ private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands
command,
+ String userName,
+ List<String> cmdArgs, JvmEnv env)
+ throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
taskControllerCmd[0] = taskControllerExe;
taskControllerCmd[1] = userName;
@@ -416,9 +317,9 @@ class LinuxTaskController extends TaskCo
}
}
ShellCommandExecutor shExec = null;
- if(workDir != null && workDir.exists()) {
+ if(env.workDir != null && env.workDir.exists()) {
shExec = new ShellCommandExecutor(taskControllerCmd,
- workDir, env);
+ env.workDir, env.env);
} else {
shExec = new ShellCommandExecutor(taskControllerCmd);
}
@@ -566,8 +467,7 @@ class LinuxTaskController extends TaskCo
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env.workDir,
- context.env.env);
+ buildKillTaskCommandArgs(context), context.env);
try {
shExec.execute();
} catch (Exception e) {
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
Fri Mar 4 03:40:38 2011
@@ -23,9 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -127,63 +124,6 @@ abstract class TaskController implements
}
/**
- * Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the file/dir
- */
- static class TaskControllerPathDeletionContext extends PathDeletionContext {
- Task task;
- boolean isWorkDir;
- TaskController taskController;
-
- /**
- * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
- * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
- * is built using mapredLocalDir, jobId, taskId, etc.
- */
- Path mapredLocalDir;
-
- public TaskControllerPathDeletionContext(FileSystem fs, Path
mapredLocalDir,
- Task task, boolean isWorkDir, TaskController taskController) {
- super(fs, null);
- this.task = task;
- this.isWorkDir = isWorkDir;
- this.taskController = taskController;
- this.mapredLocalDir = mapredLocalDir;
- }
-
- @Override
- protected String getPathForCleanup() {
- if (fullPath == null) {
- fullPath = buildPathForDeletion();
- }
- return fullPath;
- }
-
- /**
- * Builds the path of taskAttemptDir OR taskWorkDir based on
- * mapredLocalDir, jobId, taskId, etc
- */
- String buildPathForDeletion() {
- String subDir = TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask());
- if (isWorkDir) {
- subDir = subDir + Path.SEPARATOR + "work";
- }
- return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
- }
-
- /**
- * Makes the path(and its subdirectories recursively) fully deletable by
- * setting proper permissions(777) by task-controller
- */
- @Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath
- taskController.enableTaskForCleanup(this);
- }
- }
-
- /**
* Method which is called after the job is localized so that task controllers
* can implement their own job localization logic.
*
@@ -206,12 +146,4 @@ abstract class TaskController implements
*/
abstract void killTask(TaskControllerContext context);
-
- /**
- * Enable the task for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableTaskForCleanup(PathDeletionContext context)
- throws IOException;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar 4 03:40:38 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
@@ -556,53 +557,12 @@ abstract class TaskRunner extends Thread
}
}
- /**
- * Sets permissions recursively and then deletes the contents of dir.
- * Makes dir empty directory(does not delete dir itself).
- */
- static void deleteDirContents(JobConf conf, File dir) throws IOException {
- FileSystem fs = FileSystem.getLocal(conf);
- if (fs.exists(new Path(dir.getAbsolutePath()))) {
- File contents[] = dir.listFiles();
- if (contents != null) {
- for (int i = 0; i < contents.length; i++) {
- try {
- int ret = 0;
- if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
- "a+rwx", true)) != 0) {
- LOG.warn("Unable to chmod for " + contents[i] +
- "; chmod exit status = " + ret);
- }
- } catch(InterruptedException e) {
- LOG.warn("Interrupted while setting permissions for contents of " +
- "workDir. Not deleting the remaining contents of workDir.");
- return;
- }
- if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
- LOG.warn("Unable to delete "+ contents[i]);
- }
- }
- }
- }
- else {
- LOG.warn(dir + " does not exist.");
- }
- }
-
//Mostly for setting up the symlinks. Note that when we setup the distributed
//cache, we didn't create the symlinks. This is done on a per task basis
//by the currently executing task.
public static void setupWorkDir(JobConf conf) throws IOException {
File workDir = new File(".").getAbsoluteFile();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Fully deleting contents of " + workDir);
- }
-
- /** delete only the contents of workDir leaving the directory empty. We
- * can't delete the workDir as it is the current working directory.
- */
- deleteDirContents(conf, workDir);
-
+ FileUtil.fullyDelete(workDir);
if (DistributedCache.getSymlink(conf)) {
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077101&r1=1077100&r2=1077101&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar 4 03:40:38 2011
@@ -69,8 +69,6 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import
org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
@@ -230,8 +228,6 @@ public class TaskTracker
private int maxReduceSlots;
private int failures;
- private FileSystem localFs;
-
// Performance-related config knob to send an out-of-band heartbeat
// on task completion
static final String TT_OUTOFBAND_HEARBEAT =
@@ -243,7 +239,7 @@ public class TaskTracker
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
- CleanupQueue directoryCleanupThread;
+ private CleanupQueue directoryCleanupThread;
volatile JvmManager jvmManager;
private TaskMemoryManagerThread taskMemoryManager;
@@ -391,7 +387,7 @@ public class TaskTracker
TaskController getTaskController() {
return taskController;
}
-
+
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
synchronized (runningJobs) {
@@ -504,7 +500,6 @@ public class TaskTracker
synchronized void initialize() throws IOException {
// use configured nameserver & interface to get local hostname
this.fConf = new JobConf(originalConf);
- localFs = FileSystem.getLocal(fConf);
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
}
@@ -1488,32 +1483,6 @@ public class TaskTracker
}
}
- private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
- Path[] paths) {
- int i = 0;
- PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
- }
- return contexts;
- }
-
- static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
- FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
- isWorkDir, taskController);
- }
- return contexts;
- }
-
/**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
@@ -1542,9 +1511,8 @@ public class TaskTracker
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
- PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
- getLocalFiles(fConf,
getLocalJobDir(rjob.getJobID().toString())));
- directoryCleanupThread.addToQueue(contexts);
+ directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf,
+ getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
rjob.tasks.clear();
@@ -2650,7 +2618,6 @@ public class TaskTracker
}
String taskDir = getLocalTaskDir(task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
-
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2662,23 +2629,21 @@ public class TaskTracker
//might be using the dir. The JVM running the tasks would clean
//the workdir per a task in the task process itself.
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- PathDeletionContext[] contexts =
- buildTaskControllerPathDeletionContexts(localFs,
getLocalDirs(),
- task, false/* not workDir */, taskController);
- directoryCleanupThread.addToQueue(contexts);
- }
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
+ taskDir));
+ }
+
else {
- PathDeletionContext[] contexts = buildPathDeletionContexts(
- localFs, getLocalFiles(defaultJobConf, taskDir+"/job.xml"));
- directoryCleanupThread.addToQueue(contexts);
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
+ taskDir+"/job.xml"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- PathDeletionContext[] contexts =
- buildTaskControllerPathDeletionContexts(localFs,
getLocalDirs(),
- task, true /* workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
+ taskDir+"/work"));
}
}
} catch (Throwable ie) {
@@ -3265,7 +3230,7 @@ public class TaskTracker
// get the full paths of the directory in all the local disks.
- Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+ private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
String[] localDirs = conf.getLocalDirs();
Path[] paths = new Path[localDirs.length];
FileSystem localFs = FileSystem.getLocal(conf);
@@ -3276,22 +3241,6 @@ public class TaskTracker
return paths;
}
- // get the paths in all the local disks.
- Path[] getLocalDirs() throws IOException{
- String[] localDirs = fConf.getLocalDirs();
- Path[] paths = new Path[localDirs.length];
- FileSystem localFs = FileSystem.getLocal(fConf);
- for (int i = 0; i < localDirs.length; i++) {
- paths[i] = new Path(localDirs[i]);
- paths[i] = paths[i].makeQualified(localFs);
- }
- return paths;
- }
-
- FileSystem getLocalFileSystem(){
- return localFs;
- }
-
int getMaxCurrentMapTasks() {
return maxMapSlots;
}