Author: tomwhite
Date: Wed May 27 08:55:34 2009
New Revision: 779066

URL: http://svn.apache.org/viewvc?rev=779066&view=rev
Log:
HADOOP-5635. Change distributed cache to work with other distributed file 
systems. Contributed by Andrew Hitchcock.

Modified:
    hadoop/core/trunk/CHANGES.txt
    
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
    
hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 27 08:55:34 2009
@@ -689,6 +689,9 @@
     CombineFileInputFormat is used as job InputFormat.
     (Amareshwari Sriramadasu via dhruba)
 
+    HADOOP-5635. Change distributed cache to work with other distributed file
+    systems. (Andrew Hitchcock via tomwhite)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 Wed May 27 08:55:34 2009
@@ -1131,7 +1131,7 @@
 
   protected RunningJob running_;
   protected JobID jobId_;
-  protected static final String LINK_URI = "You need to specify the uris as 
hdfs://host:port/#linkname," +
+  protected static final String LINK_URI = "You need to specify the uris as 
scheme://path#linkname," +
     "Please specify a different link name for all of your caching URIs";
 
 }

Modified: 
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java 
(original)
+++ 
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java 
Wed May 27 08:55:34 2009
@@ -35,10 +35,10 @@
  * </p>
  * 
  * <p>Applications specify the files, via urls (hdfs:// or http://) to be 
cached 
- * via the {...@link org.apache.hadoop.mapred.JobConf}.
- * The <code>DistributedCache</code> assumes that the
- * files specified via hdfs:// urls are already present on the 
- * {...@link FileSystem} at the path specified by the url.</p>
+ * via the {...@link org.apache.hadoop.mapred.JobConf}. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {...@link FileSystem} at the path specified by the 
url
+ * and are accessible by every machine in the cluster.</p>
  * 
  * <p>The framework will copy the necessary files on to the slave node before 
  * any tasks for the job are executed on that node. Its efficiency stems from 
@@ -129,9 +129,7 @@
    * previously cached (and valid) or copy it from the {...@link FileSystem} 
now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no 
schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the 
files/archives
    * @param fileStatus The file status on the dfs.
@@ -162,9 +160,7 @@
    * previously cached (and valid) or copy it from the {...@link FileSystem} 
now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no 
schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the 
files/archives
    * @param fileStatus The file status on the dfs.
@@ -231,9 +227,7 @@
    * previously cached (and valid) or copy it from the {...@link FileSystem} 
now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no 
schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the 
files/archives
    * @param isArchive if the cache is an archive or a file. In case it is an 
@@ -350,7 +344,7 @@
     if(cache.getFragment() == null) {
        doSymlink = false;
     }
-    FileSystem fs = getFileSystem(cache, conf);
+    FileSystem fs = FileSystem.get(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + 
cache.getFragment();
     File flink = new File(link);
     if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -533,14 +527,6 @@
     }  
   }
   
-  private static FileSystem getFileSystem(URI cache, Configuration conf)
-    throws IOException {
-    if ("hdfs".equals(cache.getScheme()))
-      return FileSystem.get(cache, conf);
-    else
-      return FileSystem.get(conf);
-  }
-
   /**
    * Set the configuration with the given set of archives
    * @param archives The list of archives that need to be localized

Modified: 
hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
 (original)
+++ 
hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
 Wed May 27 08:55:34 2009
@@ -55,6 +55,15 @@
     assertTrue("DistributedCache failed deleting old cache when the cache 
store is full.",
         dirStatuses.length > 1);
   }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///" + 
firstCacheFile.toUri().getPath());
+    Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, 
new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    assertNotNull("DistributedCache cached file on non-default filesystem.", 
result);
+  }
 
   private void createTempFile(FileSystem fs, Path p) throws IOException {
     FSDataOutputStream out = fs.create(p);


Reply via email to