Author: cutting Date: Fri Mar 2 12:03:35 2007 New Revision: 513924 URL: http://svn.apache.org/viewvc?view=rev&rev=513924 Log: HADOOP-1032. Permit one to specify jars that will be cached across multiple jobs. Contributed by Gautam Kowshik.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513924&r1=513923&r2=513924 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 2 12:03:35 2007 @@ -182,6 +182,9 @@ 55. HADOOP-1041. Optimize mapred counter implementation. Also group counters by their declaring Enum. (David Bowen via cutting) +56. HADOOP-1032. Permit one to specify jars that will be cached + across multiple jobs. (Gautam Kowshik via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=513924&r1=513923&r2=513924 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Fri Mar 2 12:03:35 2007 @@ -521,7 +521,80 @@ conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," + uri.toString()); } - + + /** + * Add an file path to the current set of classpath entries It adds the file + * to cache as well. + * + * @param file Path of the file to be added + * @param conf Configuration that contains the classpath setting + */ + public static void addFileToClassPath(Path file, Configuration conf) + throws IOException { + String classpath = conf.get("mapred.job.classpath.files"); + conf.set("mapred.job.classpath.files", classpath == null ? file.toString() + : classpath + System.getProperty("path.separator") + file.toString()); + FileSystem fs = FileSystem.get(conf); + URI uri = fs.makeQualified(file).toUri(); + + addCacheFile(uri, conf); + } + + /** + * Get the file entries in classpath as an array of Path + * + * @param conf Configuration that contains the classpath setting + */ + public static Path[] getFileClassPaths(Configuration conf) { + String classpath = conf.get("mapred.job.classpath.files"); + if (classpath == null) + return null; + ArrayList list = Collections.list(new StringTokenizer(classpath, System + .getProperty("path.separator"))); + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path((String) list.get(i)); + } + return paths; + } + + /** + * Add an archive path to the current set of classpath entries. It adds the + * archive to cache as well. + * + * @param archive Path of the archive to be added + * @param conf Configuration that contains the classpath setting + */ + public static void addArchiveToClassPath(Path archive, Configuration conf) + throws IOException { + String classpath = conf.get("mapred.job.classpath.archives"); + conf.set("mapred.job.classpath.archives", classpath == null ? archive + .toString() : classpath + System.getProperty("path.separator") + + archive.toString()); + FileSystem fs = FileSystem.get(conf); + URI uri = fs.makeQualified(archive).toUri(); + + addCacheArchive(uri, conf); + } + + /** + * Get the archive entries in classpath as an array of Path + * + * @param conf Configuration that contains the classpath setting + */ + public static Path[] getArchiveClassPaths(Configuration conf) { + String classpath = conf.get("mapred.job.classpath.archives"); + if (classpath == null) + return null; + ArrayList list = Collections.list(new StringTokenizer(classpath, System + .getProperty("path.separator"))); + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path((String) list.get(i)); + } + return paths; + } + /** * This method allows you to create symlinks in the current working directory * of the task to all the cache files/archives Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=513924&r1=513923&r2=513924 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 2 12:03:35 2007 @@ -171,6 +171,45 @@ classPath.append(jobCacheDir); } + + // include the user specified classpath + + //archive paths + Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); + if (archiveClasspaths != null && archives != null) { + Path[] localArchives = DistributedCache + .getLocalCacheArchives(conf); + if (localArchives != null){ + for (int i=0;i<archives.length;i++){ + for(int j=0;j<archiveClasspaths.length;j++){ + if(archives[i].getPath().equals( + archiveClasspaths[j].toString())){ + classPath.append(sep); + classPath.append(localArchives[i] + .toString()); + } + } + } + } + } + //file paths + Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf); + if(fileClasspaths!=null && files != null) { + Path[] localFiles = DistributedCache + .getLocalCacheFiles(conf); + if (localFiles != null) { + for (int i = 0; i < files.length; i++) { + for (int j = 0; j < fileClasspaths.length; j++) { + if (files[i].getPath().equals( + fileClasspaths[j].toString())) { + classPath.append(sep); + classPath.append(localFiles[i].toString()); + } + } + } + } + } + classPath.append(sep); classPath.append(workDir); // Build exec child jmv args.