Author: cutting
Date: Fri Mar  2 12:03:35 2007
New Revision: 513924

URL: http://svn.apache.org/viewvc?view=rev&rev=513924
Log:
HADOOP-1032.  Permit one to specify jars that will be cached across multiple 
jobs.  Contributed by Gautam Kowshik.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Mar  2 12:03:35 2007
@@ -182,6 +182,9 @@
 55. HADOOP-1041.  Optimize mapred counter implementation.  Also group
     counters by their declaring Enum.  (David Bowen via cutting)
 
+56. HADOOP-1032.  Permit one to specify jars that will be cached
+    across multiple jobs.  (Gautam Kowshik via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java 
(original)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java 
Fri Mar  2 12:03:35 2007
@@ -521,7 +521,80 @@
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
         + uri.toString());
   }
-  
+
+       /**
+        * Add an file path to the current set of classpath entries It adds the 
file
+        * to cache as well.
+        * 
+        * @param file Path of the file to be added
+        * @param conf Configuration that contains the classpath setting
+        */
+       public static void addFileToClassPath(Path file, Configuration conf)
+                       throws IOException {
+               String classpath = conf.get("mapred.job.classpath.files");
+               conf.set("mapred.job.classpath.files", classpath == null ? 
file.toString()
+                               : classpath + 
System.getProperty("path.separator") + file.toString());
+               FileSystem fs = FileSystem.get(conf);
+               URI uri = fs.makeQualified(file).toUri();
+
+               addCacheFile(uri, conf);
+       }
+
+       /**
+        * Get the file entries in classpath as an array of Path
+        * 
+        * @param conf Configuration that contains the classpath setting
+        */
+       public static Path[] getFileClassPaths(Configuration conf) {
+               String classpath = conf.get("mapred.job.classpath.files");
+               if (classpath == null)
+                       return null;
+               ArrayList list = Collections.list(new 
StringTokenizer(classpath, System
+                               .getProperty("path.separator")));
+               Path[] paths = new Path[list.size()];
+               for (int i = 0; i < list.size(); i++) {
+                       paths[i] = new Path((String) list.get(i));
+               }
+               return paths;
+       }
+
+       /**
+        * Add an archive path to the current set of classpath entries. It adds 
the
+        * archive to cache as well.
+        * 
+        * @param archive Path of the archive to be added
+        * @param conf Configuration that contains the classpath setting
+        */
+       public static void addArchiveToClassPath(Path archive, Configuration 
conf)
+                       throws IOException {
+               String classpath = conf.get("mapred.job.classpath.archives");
+               conf.set("mapred.job.classpath.archives", classpath == null ? 
archive
+                               .toString() : classpath + 
System.getProperty("path.separator")
+                               + archive.toString());
+               FileSystem fs = FileSystem.get(conf);
+               URI uri = fs.makeQualified(archive).toUri();
+
+               addCacheArchive(uri, conf);
+       }
+
+       /**
+        * Get the archive entries in classpath as an array of Path
+        * 
+        * @param conf Configuration that contains the classpath setting
+        */
+       public static Path[] getArchiveClassPaths(Configuration conf) {
+               String classpath = conf.get("mapred.job.classpath.archives");
+               if (classpath == null)
+                       return null;
+               ArrayList list = Collections.list(new 
StringTokenizer(classpath, System
+                               .getProperty("path.separator")));
+               Path[] paths = new Path[list.size()];
+               for (int i = 0; i < list.size(); i++) {
+                       paths[i] = new Path((String) list.get(i));
+               }
+               return paths;
+       }
+
   /**
    * This method allows you to create symlinks in the current working directory
    * of the task to all the cache files/archives

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=513924&r1=513923&r2=513924
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri 
Mar  2 12:03:35 2007
@@ -171,6 +171,45 @@
         classPath.append(jobCacheDir);
        
       }
+
+               // include the user specified classpath
+               
+               //archive paths
+               Path[] archiveClasspaths = 
DistributedCache.getArchiveClassPaths(conf);
+               if (archiveClasspaths != null && archives != null) {
+                       Path[] localArchives = DistributedCache
+                                       .getLocalCacheArchives(conf);
+                       if (localArchives != null){
+                               for (int i=0;i<archives.length;i++){
+                                       for(int 
j=0;j<archiveClasspaths.length;j++){
+                                               if(archives[i].getPath().equals(
+                                                               
archiveClasspaths[j].toString())){
+                                                       classPath.append(sep);
+                                                       
classPath.append(localArchives[i]
+                                                                       
.toString());
+                                               }
+                                       }
+                               }
+                       }
+               }
+               //file paths
+               Path[] fileClasspaths = 
DistributedCache.getFileClassPaths(conf);
+               if(fileClasspaths!=null && files != null) {
+                       Path[] localFiles = DistributedCache
+                                       .getLocalCacheFiles(conf);
+                       if (localFiles != null) {
+                               for (int i = 0; i < files.length; i++) {
+                                       for (int j = 0; j < 
fileClasspaths.length; j++) {
+                                               if (files[i].getPath().equals(
+                                                               
fileClasspaths[j].toString())) {
+                                                       classPath.append(sep);
+                                                       
classPath.append(localFiles[i].toString());
+                                               }
+                                       }
+                               }
+                       }
+               }
+
       classPath.append(sep);
       classPath.append(workDir);
       //  Build exec child jmv args.


Reply via email to