Author: cutting Date: Tue Dec 12 14:24:09 2006 New Revision: 486372 URL: http://svn.apache.org/viewvc?view=rev&rev=486372 Log: HADOOP-673. Give each task its own working directory again. Contributed by Mahadev.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Dec 12 14:24:09 2006 @@ -71,6 +71,9 @@ 21. HADOOP-792. Fix 'dfs -mv' to return correct status. (Dhruba Borthakur via cutting) +22. HADOOP-673. Give each task its own working directory again. + (Mahadev Konar via cutting) + Release 0.9.1 - 2006-12-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Dec 12 14:24:09 2006 @@ -234,10 +234,12 @@ String[] argvSplit = splitArgs(argv); String prog = argvSplit[0]; String userdir = System.getProperty("user.dir"); + File currentDir = new File(".").getAbsoluteFile(); + File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work"); if (new File(prog).isAbsolute()) { // we don't own it. Hope it is executable } else { - new MustangFile(prog).setExecutable(true, true); + new MustangFile(new File(jobCacheDir, prog).toString()).setExecutable(true, true); } if (job_.getInputValueClass().equals(BytesWritable.class)) { @@ -282,7 +284,7 @@ // if (!new File(argvSplit[0]).isAbsolute()) { PathFinder finder = new PathFinder("PATH"); - finder.prependPathComponent("."); + finder.prependPathComponent(jobCacheDir.toString()); File f = finder.getAbsolutePath(argvSplit[0]); if (f != null) { argvSplit[0] = f.getAbsolutePath(); Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Dec 12 14:24:09 2006 @@ -620,8 +620,8 @@ boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs); if (!b) fail(LINK_URI); - DistributedCache.createSymlink(jobConf_); } + DistributedCache.createSymlink(jobConf_); // set the jobconf for the caching parameters if (cacheArchives != null) DistributedCache.setCacheArchives(archiveURIs, jobConf_); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Dec 12 14:24:09 2006 @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.util.*; import org.apache.hadoop.fs.*; + import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.net.URI; @@ -108,6 +109,8 @@ String cacheId = makeRelative(cache, conf); synchronized (cachedArchives) { CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); + if (lcacheStatus == null) + return; synchronized (lcacheStatus) { lcacheStatus.refcount--; } @@ -320,7 +323,29 @@ return digest; } - + + /** + * This method create symlinks for all files in a given dir in another directory + * @param conf the configuration + * @param jobCacheDir the target directory for creating symlinks + * @param workDir the directory in which the symlinks are created + * @throws IOException + */ + public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) + throws IOException{ + if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){ + return; + } + boolean createSymlink = getSymlink(conf); + if (createSymlink){ + File[] list = jobCacheDir.listFiles(); + for (int i=0; i < list.length; i++){ + FileUtil.symLink(list[i].getAbsolutePath(), + new File(workDir, list[i].getName()).toString()); + } + } + } + private static String getFileSysName(URI url) { String fsname = url.getScheme(); if ("dfs".equals(fsname)) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Dec 12 14:24:09 2006 @@ -43,6 +43,16 @@ return false; } } else { + //try deleting the directory + // this might be a symlink + boolean b = false; + b = contents[i].delete(); + if (b){ + //this was indeed a symlink or an empty directory + continue; + } + // if not an empty directory or symlink let + // fullydelete handle it. if (! fullyDelete(contents[i])) { return false; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=486372&r1=486371&r2=486372 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Dec 12 14:24:09 2006 @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.filecache.*; +import org.apache.hadoop.util.*; import java.io.*; import java.util.Vector; import java.net.URI; @@ -82,7 +83,8 @@ //before preparing the job localize //all the archives - File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work"); + File workDir = new File(t.getJobFile()).getParentFile(); + File jobCacheDir = new File(workDir.getParent(), "work"); URI[] archives = DistributedCache.getCacheArchives(conf); URI[] files = DistributedCache.getCacheFiles(conf); if ((archives != null) || (files != null)) { @@ -104,8 +106,6 @@ } DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); } - - // sets the paths to local archives and paths Path localTaskFile = new Path(t.getJobFile()); FileSystem localFs = FileSystem.getNamed("local", conf); localFs.delete(localTaskFile); @@ -116,6 +116,16 @@ out.close(); } } + + // create symlinks for all the files in job cache dir in current + // workingdir for streaming + try{ + DistributedCache.createAllSymlink(conf, jobCacheDir, + workDir); + } catch(IOException ie){ + // Do not exit even if symlinks have not been created. + LOG.warn(StringUtils.stringifyException(ie)); + } if (! prepare()) { return; @@ -135,7 +145,7 @@ String jar = conf.getJar(); if (jar != null) { // if jar exists, it into workDir - File[] libs = new File(workDir, "lib").listFiles(); + File[] libs = new File(jobCacheDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.append(sep); // add libs from jar to classpath @@ -143,11 +153,13 @@ } } classPath.append(sep); - classPath.append(new File(workDir, "classes")); + classPath.append(new File(jobCacheDir, "classes")); classPath.append(sep); - classPath.append(workDir); + classPath.append(jobCacheDir); + } - + classPath.append(sep); + classPath.append(workDir); // Build exec child jmv args. Vector vargs = new Vector(8); File jvm = // use same jvm as parent