Author: tomwhite
Date: Tue Jun 16 01:17:51 2009
New Revision: 785044
URL: http://svn.apache.org/viewvc?rev=785044&view=rev
Log:
HADOOP-4041. IsolationRunner does not work as documented. Contributed by Philip
Zeyliger.
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalDirAllocator.java
hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
hadoop/core/trunk/src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/core/org/apache/hadoop/util/TestStringUtils.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun 16 01:17:51 2009
@@ -823,6 +823,9 @@
LD_LIBRARY_PATH and other environment variables.
(Sreekanth Ramakrishnan via yhemanth)
+ HADOOP-4041. IsolationRunner does not work as documented.
+ (Philip Zeyliger via tomwhite)
+
Release 0.20.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalDirAllocator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalDirAllocator.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalDirAllocator.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalDirAllocator.java Tue
Jun 16 01:17:51 2009
@@ -33,7 +33,7 @@
* files. The way it works is that it is kept track what disk was last
* allocated for a file write. For the current request, the next disk from
* the set of disks would be allocated if the free space on the disk is
- * sufficient enough to accomodate the file that is being considered for
+ * sufficient enough to accommodate the file that is being considered for
* creation. If the space requirements cannot be met, the next disk in order
* would be tried and so on till a disk is found with sufficient capacity.
* Once a disk with sufficient space is identified, a check is done to make
@@ -69,6 +69,9 @@
new TreeMap<String, AllocatorPerContext>();
private String contextCfgItemName;
+ /** Used when size of file to be allocated is unknown. */
+ public static final int SIZE_UNKNOWN = -1;
+
/**Create an allocator object
* @param contextCfgItemName
*/
@@ -105,10 +108,11 @@
*/
public Path getLocalPathForWrite(String pathStr,
Configuration conf) throws IOException {
- return getLocalPathForWrite(pathStr, -1, conf);
+ return getLocalPathForWrite(pathStr, SIZE_UNKNOWN, conf);
}
- /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+ /** Get a path from the local FS. Pass size as
+ * SIZE_UNKNOWN if not known apriori. We
* round-robin over the set of disks (via the configured dirs) and return
* the first complete path which has enough space
* @param pathStr the requested path (this will be created on the first
@@ -274,7 +278,7 @@
*/
public synchronized Path getLocalPathForWrite(String path,
Configuration conf) throws IOException {
- return getLocalPathForWrite(path, -1, conf);
+ return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
}
/** Get a path from the local FS. If size is known, we go
@@ -296,7 +300,7 @@
}
Path returnPath = null;
- if(size == -1) { //do roulette selection: pick dir with probability
+ if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with
probability
//proportional to available size
long[] availableOnDisk = new long[dirDF.length];
long totalAvailable = 0;
@@ -344,7 +348,8 @@
"directory for " + pathStr);
}
- /** Creates a file on the local FS. Pass size as -1 if not known apriori.
We
+ /** Creates a file on the local FS. Pass size as
+ * {...@link LocalDirAllocator.SIZE_UNKNOWN} if not known apriori. We
* round-robin over the set of disks (via the configured dirs) and return
* a file on the first path which has enough space. The file is guaranteed
* to go away when the JVM exits.
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java Tue Jun
16 01:17:51 2009
@@ -677,4 +677,24 @@
public static synchronized String limitDecimalTo2(double d) {
return decimalFormat.format(d);
}
+
+ /**
+ * Concatenates strings, using a separator.
+ *
+ * @param separator Separator to join with.
+ * @param strings Strings to join.
+ */
+ public static String join(CharSequence separator, Iterable<String> strings) {
+ StringBuffer sb = new StringBuffer();
+ boolean first = true;
+ for (String s : strings) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(separator);
+ }
+ sb.append(s);
+ }
+ return sb.toString();
+ }
}
Modified:
hadoop/core/trunk/src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
---
hadoop/core/trunk/src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml
(original)
+++
hadoop/core/trunk/src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml
Tue Jun 16 01:17:51 2009
@@ -1337,8 +1337,8 @@
IsolationRunner</a> æ¯å¸®å©è°è¯Map/Reduceç¨åºçå·¥å
·ã</p>
<p>使ç¨<code>IsolationRunner</code>çæ¹æ³æ¯ï¼é¦å
设置
- <code>keep.failed.tasks.files</code>屿§ä¸º<code>true</code>
- ï¼åæ¶åè<code>keep.tasks.files.pattern</code>ï¼ã</p>
+ <code>keep.failed.task.files</code>屿§ä¸º<code>true</code>
+ ï¼åæ¶åè<code>keep.task.files.pattern</code>ï¼ã</p>
<p>
ç¶åï¼ç»å½å°ä»»å¡è¿è¡å¤±è´¥çèç¹ä¸ï¼è¿å
¥
Modified:
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
---
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
(original)
+++
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
Tue Jun 16 01:17:51 2009
@@ -1894,8 +1894,8 @@
IsolationRunner</a> is a utility to help debug Map/Reduce
programs.</p>
<p>To use the <code>IsolationRunner</code>, first set
- <code>keep.failed.tasks.files</code> to <code>true</code>
- (also see <code>keep.tasks.files.pattern</code>).</p>
+ <code>keep.failed.task.files</code> to <code>true</code>
+ (also see <code>keep.task.files.pattern</code>).</p>
<p>
Next, go to the node on which the failed task ran and go to the
@@ -1909,6 +1909,8 @@
<p><code>IsolationRunner</code> will run the failed task in a single
jvm, which can be in the debugger, over precisely the same input.</p>
+
+ <p>Note that currently IsolationRunner will only re-run map
tasks.</p>
</section>
<section>
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
Tue Jun 16 01:17:51 2009
@@ -36,7 +36,17 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+/**
+ * IsolationRunner is intended to facilitate debugging by re-running a specific
+ * task, given left-over task files for a (typically failed) past job.
+ * Currently, it is limited to re-running map tasks.
+ *
+ * Users may coerce MapReduce to keep task files around by setting
+ * keep.failed.task.files. See mapred_tutorial.xml for more documentation.
+ */
public class IsolationRunner {
private static final Log LOG =
LogFactory.getLog(IsolationRunner.class.getName());
@@ -109,82 +119,57 @@
}
}
- private static ClassLoader makeClassLoader(JobConf conf,
+ private ClassLoader makeClassLoader(JobConf conf,
File workDir) throws IOException {
- List<URL> cp = new ArrayList<URL>();
-
+ List<String> classPaths = new ArrayList();
+ // Add jar clas files (includes lib/* and classes/*)
String jar = conf.getJar();
- if (jar != null) { // if jar exists, it into workDir
- File[] libs = new File(workDir, "lib").listFiles();
- if (libs != null) {
- for (int i = 0; i < libs.length; i++) {
- cp.add(new URL("file:" + libs[i].toString()));
- }
- }
- cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
- cp.add(new URL("file:" + workDir.toString() + "/"));
+ if (jar != null) {
+ TaskRunner.appendJobJarClasspaths(conf.getJar(), classPaths);
}
- return new URLClassLoader(cp.toArray(new URL[cp.size()]));
- }
-
- /**
- * Create empty sequence files for any of the map outputs that we don't have.
- * @param fs the filesystem to create the files in
- * @param dir the directory name to create the files in
- * @param conf the jobconf
- * @throws IOException if something goes wrong writing
- */
- private static void fillInMissingMapOutputs(FileSystem fs,
- TaskAttemptID taskId,
- int numMaps,
- JobConf conf) throws IOException
{
- Class<? extends WritableComparable> keyClass
- = conf.getMapOutputKeyClass().asSubclass(WritableComparable.class);
- Class<? extends Writable> valueClass
- = conf.getMapOutputValueClass().asSubclass(Writable.class);
- MapOutputFile namer = new MapOutputFile(taskId.getJobID());
- namer.setConf(conf);
- for(int i=0; i<numMaps; i++) {
- Path f = namer.getInputFile(i, taskId);
- if (!fs.exists(f)) {
- LOG.info("Create missing input: " + f);
- SequenceFile.Writer out =
- SequenceFile.createWriter(fs, conf, f, keyClass, valueClass);
- out.close();
- }
- }
+ // Add the workdir, too.
+ classPaths.add(workDir.toString());
+ // Note: TaskRunner.run() does more, including DistributedCache files.
+
+ // Convert to URLs
+ URL[] urls = new URL[classPaths.size()];
+ for (int i = 0; i < classPaths.size(); ++i) {
+ urls[i] = new File(classPaths.get(i)).toURL();
+ }
+ return new URLClassLoader(urls);
}
/**
- * Run a single task
- * @param args the first argument is the task directory
+ * Main method.
*/
- public static void main(String[] args
- ) throws ClassNotFoundException, IOException,
- InterruptedException {
+ boolean run(String[] args)
+ throws ClassNotFoundException, IOException, InterruptedException {
if (args.length != 1) {
System.out.println("Usage: IsolationRunner <path>/job.xml");
- System.exit(1);
+ return false;
}
File jobFilename = new File(args[0]);
if (!jobFilename.exists() || !jobFilename.isFile()) {
System.out.println(jobFilename + " is not a valid job file.");
- System.exit(1);
+ return false;
}
JobConf conf = new JobConf(new Path(jobFilename.toString()));
TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
+ if (taskId == null) {
+ System.out.println("mapred.task.id not found in configuration;" +
+ " job.xml is not a task config");
+ }
boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+ if (!isMap) {
+ System.out.println("Only map tasks are supported.");
+ return false;
+ }
int partition = conf.getInt("mapred.task.partition", 0);
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- File workDirName = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- taskId.getJobID().toString(),
- taskId.toString())
- + Path.SEPARATOR + "work",
- conf). toString());
+ File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
local.setWorkingDirectory(new Path(workDirName.toString()));
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
@@ -193,23 +178,29 @@
Thread.currentThread().setContextClassLoader(classLoader);
conf.setClassLoader(classLoader);
- Task task;
- if (isMap) {
- Path localSplit = new Path(new Path(jobFilename.toString()).getParent(),
- "split.dta");
- DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
- String splitClass = Text.readString(splitFile);
- BytesWritable split = new BytesWritable();
- split.readFields(splitFile);
- splitFile.close();
- task = new MapTask(jobFilename.toString(), taskId, partition,
splitClass, split);
- } else {
- int numMaps = conf.getNumMapTasks();
- fillInMissingMapOutputs(local, taskId, numMaps, conf);
- task = new ReduceTask(jobFilename.toString(), taskId, partition,
numMaps);
- }
+ Path localSplit = new Path(new Path(jobFilename.toString()).getParent(),
+ "split.dta");
+ DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
+ String splitClass = Text.readString(splitFile);
+ BytesWritable split = new BytesWritable();
+ split.readFields(splitFile);
+ splitFile.close();
+ Task task = new MapTask(jobFilename.toString(), taskId, partition,
splitClass, split);
task.setConf(conf);
task.run(conf, new FakeUmbilical());
+ return true;
}
+
+ /**
+ * Run a single task.
+ *
+ * @param args the first argument is the task directory
+ */
+ public static void main(String[] args)
+ throws ClassNotFoundException, IOException, InterruptedException {
+ if (!new IsolationRunner().run(args)) {
+ System.exit(1);
+ }
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue
Jun 16 01:17:51 2009
@@ -17,21 +17,30 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.filecache.*;
-import org.apache.hadoop.util.*;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
-import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
/** Base class that runs a task in a separate process. Tasks are run in a
* separate process in order to isolate the map/reduce system code from bugs in
@@ -49,6 +58,9 @@
private int exitCode = -1;
private boolean exitCodeSet = false;
+ private static String SYSTEM_PATH_SEPARATOR =
System.getProperty("path.separator");
+
+
private TaskTracker tracker;
protected JobConf conf;
@@ -108,163 +120,40 @@
//all the archives
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- File jobCacheDir = null;
- if (conf.getJar() != null) {
- jobCacheDir = new File(
- new Path(conf.getJar()).getParent().toString());
- }
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- t.getJobID().toString(),
- t.getTaskID().toString(),
- t.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- conf). toString());
-
+ File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(),
conf);
+
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
- FileStatus fileStatus;
- FileSystem fileSystem;
- Path localPath;
- String baseDir;
-
- if ((archives != null) || (files != null)) {
- if (archives != null) {
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- Path[] p = new Path[archives.length];
- for (int i = 0; i < archives.length;i++){
- fileSystem = FileSystem.get(archives[i], conf);
- fileStatus = fileSystem.getFileStatus(
- new Path(archives[i].getPath()));
- String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
- p[i] = DistributedCache.getLocalCache(archives[i], conf,
- new Path(baseDir),
- fileStatus,
- true, Long.parseLong(
- archivesTimestamps[i]),
- new Path(workDir.
- getAbsolutePath()),
- false);
-
- }
- DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
- }
- if ((files != null)) {
- String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
- Path[] p = new Path[files.length];
- for (int i = 0; i < files.length;i++){
- fileSystem = FileSystem.get(files[i], conf);
- fileStatus = fileSystem.getFileStatus(
- new Path(files[i].getPath()));
- String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
- p[i] = DistributedCache.getLocalCache(files[i], conf,
- new Path(baseDir),
- fileStatus,
- false, Long.parseLong(
- fileTimestamps[i]),
- new Path(workDir.
- getAbsolutePath()),
- false);
- }
- DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
- }
- Path localTaskFile = new Path(t.getJobFile());
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
+ setupDistributedCache(lDirAlloc, workDir, archives, files);
if (!prepare()) {
return;
}
- String sep = System.getProperty("path.separator");
- StringBuffer classPath = new StringBuffer();
+ // Accumulates class paths for child.
+ List<String> classPaths = new ArrayList<String>();
// start with same classpath as parent process
- classPath.append(System.getProperty("java.class.path"));
- classPath.append(sep);
+ appendSystemClasspaths(classPaths);
+
if (!workDir.mkdirs()) {
if (!workDir.isDirectory()) {
LOG.fatal("Mkdirs failed to create " + workDir.toString());
}
}
-
- String jar = conf.getJar();
- if (jar != null) {
- // if jar exists, it into workDir
- File[] libs = new File(jobCacheDir, "lib").listFiles();
- if (libs != null) {
- for (int i = 0; i < libs.length; i++) {
- classPath.append(sep); // add libs from jar to classpath
- classPath.append(libs[i]);
- }
- }
- classPath.append(sep);
- classPath.append(new File(jobCacheDir, "classes"));
- classPath.append(sep);
- classPath.append(jobCacheDir);
-
- }
// include the user specified classpath
+ appendJobJarClasspaths(conf.getJar(), classPaths);
- //archive paths
- Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
- if (archiveClasspaths != null && archives != null) {
- Path[] localArchives = DistributedCache
- .getLocalCacheArchives(conf);
- if (localArchives != null){
- for (int i=0;i<archives.length;i++){
- for(int j=0;j<archiveClasspaths.length;j++){
- if (archives[i].getPath().equals(
-
archiveClasspaths[j].toString())){
- classPath.append(sep);
- classPath.append(localArchives[i]
- .toString());
- }
- }
- }
- }
- }
- //file paths
- Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
- if (fileClasspaths!=null && files != null) {
- Path[] localFiles = DistributedCache
- .getLocalCacheFiles(conf);
- if (localFiles != null) {
- for (int i = 0; i < files.length; i++) {
- for (int j = 0; j < fileClasspaths.length; j++) {
- if (files[i].getPath().equals(
- fileClasspaths[j].toString())) {
- classPath.append(sep);
- classPath.append(localFiles[i].toString());
- }
- }
- }
- }
- }
-
- classPath.append(sep);
- classPath.append(workDir);
- // Build exec child jmv args.
+ // Distributed cache paths
+ appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+
+ // Include the working dir too
+ classPaths.add(workDir.toString());
+
+ // Build classpath
+
+
+ // Build exec child JVM args.
Vector<String> vargs = new Vector<String>(8);
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -308,12 +197,12 @@
if (libraryPath == null) {
libraryPath = workDir.getAbsolutePath();
} else {
- libraryPath += sep + workDir;
+ libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
}
boolean hasUserLDPath = false;
for(int i=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
- javaOptsSplit[i] += sep + libraryPath;
+ javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
hasUserLDPath = true;
break;
}
@@ -342,7 +231,8 @@
// Add classpath.
vargs.add("-classpath");
- vargs.add(classPath.toString());
+ String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+ vargs.add(classPath);
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
@@ -396,7 +286,7 @@
String oldLdLibraryPath = null;
oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
if (oldLdLibraryPath != null) {
- ldLibraryPath.append(sep);
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
@@ -494,6 +384,156 @@
tip.reportTaskFinished();
}
}
+
+ /** Creates the working directory pathname for a task attempt. */
+ static File formWorkDir(LocalDirAllocator lDirAlloc,
+ TaskAttemptID task, boolean isCleanup, JobConf conf)
+ throws IOException {
+ File workDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getLocalTaskDir(task.getJobID().toString(),
+ task.toString(), isCleanup)
+ + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
+ return workDir;
+ }
+
+ private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
+ URI[] archives, URI[] files) throws IOException {
+ FileStatus fileStatus;
+ FileSystem fileSystem;
+ Path localPath;
+ String baseDir;
+ if ((archives != null) || (files != null)) {
+ if (archives != null) {
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
+ Path[] p = new Path[archives.length];
+ for (int i = 0; i < archives.length;i++){
+ fileSystem = FileSystem.get(archives[i], conf);
+ fileStatus = fileSystem.getFileStatus(
+ new Path(archives[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(archives[i],conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ baseDir = localPath.toString().replace(cacheId, "");
+ p[i] = DistributedCache.getLocalCache(archives[i], conf,
+ new Path(baseDir),
+ fileStatus,
+ true, Long.parseLong(
+ archivesTimestamps[i]),
+ new Path(workDir.
+ getAbsolutePath()),
+ false);
+
+ }
+ DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
+ }
+ if ((files != null)) {
+ String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
+ Path[] p = new Path[files.length];
+ for (int i = 0; i < files.length;i++){
+ fileSystem = FileSystem.get(files[i], conf);
+ fileStatus = fileSystem.getFileStatus(
+ new Path(files[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(files[i], conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ baseDir = localPath.toString().replace(cacheId, "");
+ p[i] = DistributedCache.getLocalCache(files[i], conf,
+ new Path(baseDir),
+ fileStatus,
+ false, Long.parseLong(
+ fileTimestamps[i]),
+ new Path(workDir.
+ getAbsolutePath()),
+ false);
+ }
+ DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
+ }
+ Path localTaskFile = new Path(t.getJobFile());
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(localTaskFile, true);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+ }
+
+ private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives,
+ URI[] files, List<String> classPaths) throws IOException {
+ // Archive paths
+ Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+ if (archiveClasspaths != null && archives != null) {
+ Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+ if (localArchives != null){
+ for (int i=0;i<archives.length;i++){
+ for(int j=0;j<archiveClasspaths.length;j++){
+ if (archives[i].getPath().equals(
+ archiveClasspaths[j].toString())){
+ classPaths.add(localArchives[i].toString());
+ }
+ }
+ }
+ }
+ }
+
+ //file paths
+ Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+ if (fileClasspaths!=null && files != null) {
+ Path[] localFiles = DistributedCache
+ .getLocalCacheFiles(conf);
+ if (localFiles != null) {
+ for (int i = 0; i < files.length; i++) {
+ for (int j = 0; j < fileClasspaths.length; j++) {
+ if (files[i].getPath().equals(
+ fileClasspaths[j].toString())) {
+ classPaths.add(localFiles[i].toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void appendSystemClasspaths(List<String> classPaths) {
+ for (String c :
System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+ classPaths.add(c);
+ }
+ }
+
+ /**
+ * Given a "jobJar" (typically retrieved via {...@link
Configuration.getJar()}),
+ * appends classpath entries for it, as well as its lib/ and classes/
+ * subdirectories.
+ *
+ * @param jobJar Job jar from configuration
+ * @param classPaths Accumulator for class paths
+ */
+ static void appendJobJarClasspaths(String jobJar, List<String> classPaths) {
+ if (jobJar == null) {
+ return;
+
+ }
+ File jobCacheDir = new File(new Path(jobJar).getParent().toString());
+
+ // if jar exists, it into workDir
+ File[] libs = new File(jobCacheDir, "lib").listFiles();
+ if (libs != null) {
+ for (File l : libs) {
+ classPaths.add(l.toString());
+ }
+ }
+ classPaths.add(new File(jobCacheDir, "classes").toString());
+ classPaths.add(jobCacheDir.toString());
+ }
//Mostly for setting up the symlinks. Note that when we setup the distributed
//cache, we didn't create the symlinks. This is done on a per task basis
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue
Jun 16 01:17:51 2009
@@ -778,7 +778,7 @@
// job-specific shared directory for use as scratch space
Path workDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "work"), fConf);
+ + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
Modified:
hadoop/core/trunk/src/test/core/org/apache/hadoop/util/TestStringUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/util/TestStringUtils.java?rev=785044&r1=785043&r2=785044&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/core/org/apache/hadoop/util/TestStringUtils.java
(original)
+++ hadoop/core/trunk/src/test/core/org/apache/hadoop/util/TestStringUtils.java
Tue Jun 16 01:17:51 2009
@@ -18,6 +18,9 @@
package org.apache.hadoop.util;
+import java.util.ArrayList;
+import java.util.List;
+
import junit.framework.TestCase;
public class TestStringUtils extends TestCase {
@@ -118,4 +121,15 @@
assertEquals(-1259520L,
StringUtils.TraditionalBinaryPrefix.string2long("-1230k"));
assertEquals(956703965184L,
StringUtils.TraditionalBinaryPrefix.string2long("891g"));
}
+
+ public void testJoin() {
+ List<String> s = new ArrayList<String>();
+ s.add("a");
+ s.add("b");
+ s.add("c");
+ assertEquals("", StringUtils.join(":", s.subList(0, 0)));
+ assertEquals("a", StringUtils.join(":", s.subList(0, 1)));
+ assertEquals("a:b", StringUtils.join(":", s.subList(0, 2)));
+ assertEquals("a:b:c", StringUtils.join(":", s.subList(0, 3)));
+ }
}
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=785044&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
(added)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
Tue Jun 16 01:17:51 2009
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Re-runs a map task using the IsolationRunner.
+ *
+ * The task included here is an identity mapper that touches
+ * a file in a side-effect directory. This is used
+ * to verify that the task in fact ran.
+ */
+public class TestIsolationRunner extends TestCase {
+
+ private static final String SIDE_EFFECT_DIR_PROPERTY =
+ "test.isolationrunner.sideeffectdir";
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+ /** Identity mapper that also creates a side effect file. */
+ static class SideEffectMapper<K, V> extends IdentityMapper<K, V> {
+ private JobConf conf;
+ @Override
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+ @Override
+ public void close() throws IOException {
+ writeSideEffectFile(conf, "map");
+ }
+ }
+
+ static class SideEffectReducer<K, V> extends IdentityReducer<K, V> {
+ private JobConf conf;
+ @Override
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+ @Override
+ public void close() throws IOException {
+ writeSideEffectFile(conf, "reduce");
+ }
+ }
+
+ private static void deleteSideEffectFiles(JobConf conf) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), true);
+ assertEquals(0, countSideEffectFiles(conf, ""));
+ }
+
+ private static void writeSideEffectFile(JobConf conf, String prefix)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path sideEffectFile = new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY),
+ prefix + "-" + UUID.randomUUID().toString());
+ localFs.create(sideEffectFile).close();
+ }
+
+ private static int countSideEffectFiles(JobConf conf, final String prefix)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FileStatus[] files = localFs.listStatus(
+ new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return path.getName().startsWith(prefix + "-");
+ }
+ });
+ return files.length;
+ }
+
+ private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
+ throws IOException {
+ String[] localDirs = conf.getLocalDirs();
+ assertEquals(1, localDirs.length);
+ Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
+ "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);
+ Path attemptDir = new Path(jobCacheDir,
+ new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString());
+ return new Path(attemptDir, "job.xml");
+ }
+
+ public void testIsolationRunOfMapTask() throws
+ IOException, InterruptedException, ClassNotFoundException {
+ MiniMRCluster mr = null;
+ try {
+ mr = new MiniMRCluster(1, "file:///", 1);
+
+ // Run a job succesfully; keep task files.
+ JobConf conf = mr.createJobConf();
+ conf.setKeepTaskFilesPattern(".*");
+ conf.set(SIDE_EFFECT_DIR_PROPERTY, TEST_ROOT_DIR +
+ "/isolationrunnerjob/sideeffect");
+ // Delete previous runs' data.
+ deleteSideEffectFiles(conf);
+ JobID jobId = runJobNormally(conf);
+ assertEquals(1, countSideEffectFiles(conf, "map"));
+ assertEquals(1, countSideEffectFiles(conf, "reduce"));
+
+ deleteSideEffectFiles(conf);
+
+ // Retrieve succesful job's configuration and
+ // run IsolationRunner against the map task.
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path mapJobXml = getAttemptJobXml(conf, jobId,
+ TaskType.MAP).makeQualified(localFs);
+ assertTrue(localFs.exists(mapJobXml));
+
+ new IsolationRunner().run(new String[] {
+ new File(mapJobXml.toUri()).getCanonicalPath() });
+
+ assertEquals(1, countSideEffectFiles(conf, "map"));
+ assertEquals(0, countSideEffectFiles(conf, "reduce"));
+
+ // Clean up
+ deleteSideEffectFiles(conf);
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ static JobID runJobNormally(JobConf conf) throws IOException {
+ final Path inDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/input");
+ final Path outDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/output");
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(outDir, true);
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox jumps over lazy dog\n";
+ DataOutputStream file = fs.create(new Path(inDir, "file"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setMapperClass(SideEffectMapper.class);
+ conf.setReducerClass(SideEffectReducer.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+
+ JobClient jobClient = new JobClient(conf);
+ RunningJob job = jobClient.submitJob(conf);
+ job.waitForCompletion();
+ return job.getID();
+ }
+}