Author: ryota
Date: Wed Jun  5 16:39:50 2013
New Revision: 1489945

URL: http://svn.apache.org/r1489945
Log:
OOZIE-1381 Oozie does not support access to the distributed cache file under 
different name node

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 Wed Jun  5 16:39:50 2013
@@ -307,31 +307,31 @@ public class JavaActionExecutor extends 
 
     Configuration addToCache(Configuration conf, Path appPath, String 
filePath, boolean archive)
             throws ActionExecutorException {
-        Path path = null;
+
+        URI uri = null;
         try {
-            if (filePath.startsWith("/")) {
-                path = new Path(filePath);
-            }
-            else {
-                path = new Path(appPath, filePath);
+            uri = new URI(filePath);
+            URI baseUri = appPath.toUri();
+            if (uri.getScheme() == null) {
+                String resolvedPath = uri.getPath();
+                if (!resolvedPath.startsWith("/")) {
+                    resolvedPath = baseUri.getPath() + "/" + resolvedPath;
+                }
+                uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), 
resolvedPath, uri.getQuery(), uri.getFragment());
             }
-            URI uri = new URI(path.toUri().getPath());
             if (archive) {
-                DistributedCache.addCacheArchive(uri, conf);
+                DistributedCache.addCacheArchive(uri.normalize(), conf);
             }
             else {
                 String fileName = filePath.substring(filePath.lastIndexOf("/") 
+ 1);
-                if (fileName.endsWith(".so") || fileName.contains(".so.")) {  
// .so files
-                    uri = new Path(path.toString() + "#" + fileName).toUri();
-                    uri = new URI(uri.getPath());
-                    DistributedCache.addCacheFile(uri, conf);
+                if (fileName.endsWith(".so") || fileName.contains(".so.")) { 
// .so files
+                    uri = new URI(uri.getScheme(), uri.getAuthority(), 
uri.getPath(), uri.getQuery(), fileName);
+                    DistributedCache.addCacheFile(uri.normalize(), conf);
                 }
                 else if (fileName.endsWith(".jar")) { // .jar files
                     if (!fileName.contains("#")) {
-                        path = new Path(uri.toString());
-
                         String user = conf.get("user.name");
-                        
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, 
conf);
+                        
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new 
Path(uri.normalize()), conf);
                     }
                     else {
                         DistributedCache.addCacheFile(uri, conf);
@@ -339,10 +339,9 @@ public class JavaActionExecutor extends 
                 }
                 else { // regular files
                     if (!fileName.contains("#")) {
-                        uri = new Path(path.toString() + "#" + 
fileName).toUri();
-                        uri = new URI(uri.getPath());
+                        uri = new URI(uri.getScheme(), uri.getAuthority(), 
uri.getPath(), uri.getQuery(), fileName);
                     }
-                    DistributedCache.addCacheFile(uri, conf);
+                    DistributedCache.addCacheFile(uri.normalize(), conf);
                 }
             }
             DistributedCache.createSymlink(conf);
@@ -350,7 +349,7 @@ public class JavaActionExecutor extends 
         }
         catch (Exception ex) {
             XLog.getLog(getClass()).debug(
-                    "Errors when add to DistributedCache. Path=" + path + ", 
archive=" + archive + ", conf="
+                    "Errors when add to DistributedCache. Path=" + 
uri.toString() + ", archive=" + archive + ", conf="
                             + XmlUtils.prettyPrint(conf).toString());
             throw convertException(ex);
         }

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
 Wed Jun  5 16:39:50 2013
@@ -1490,4 +1490,115 @@ public class TestJavaActionExecutor exte
         jae.injectLauncherUseUberMode(conf);
         assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
     }
+
+    public void testAddToCache() throws Exception {
+        JavaActionExecutor ae = new JavaActionExecutor();
+        Configuration conf = new XConfiguration();
+
+        Path appPath = new Path(getFsTestCaseDir(), "wf");
+        URI appUri = appPath.toUri();
+
+        // test archive without fragment
+        Path archivePath = new Path("test.jar");
+        Path archiveFullPath = new Path(appPath, archivePath);
+        ae.addToCache(conf, appPath, archiveFullPath.toString(), true);
+        
assertTrue(conf.get("mapred.cache.archives").contains(archiveFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test archive with fragment
+        Path archiveFragmentPath = new Path("test.jar#a.jar");
+        Path archiveFragmentFullPath = new Path(appPath, archiveFragmentPath);
+        conf.clear();
+        ae.addToCache(conf, appPath, archiveFragmentFullPath.toString(), true);
+        
assertTrue(conf.get("mapred.cache.archives").contains(archiveFragmentFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test .so without fragment
+        Path appSoPath = new Path("lib/a.so");
+        Path appSoFullPath = new Path(appPath, appSoPath);
+        conf.clear();
+        ae.addToCache(conf, appPath, appSoFullPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appSoFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test .so with fragment
+        Path appSoFragmentPath = new Path("lib/a.so#a.so");
+        Path appSoFragmentFullPath = new Path(appPath, appSoFragmentPath);
+        conf.clear();
+        ae.addToCache(conf, appPath, appSoFragmentFullPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appSoFragmentFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test .jar without fragment
+        Path appJarPath = new Path("lib/a.jar");
+        Path appJarFullPath = new Path(appPath, appJarPath);
+        conf.clear();
+        conf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+        ae.addToCache(conf, appPath, appJarFullPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appJarFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test .jar with fragment
+        Path appJarFragmentPath = new Path("lib/a.jar#a.jar");
+        Path appJarFragmentFullPath = new Path(appPath, appJarFragmentPath);
+        conf.clear();
+        conf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+        ae.addToCache(conf, appPath, appJarFragmentFullPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appJarFragmentFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test regular file without fragment
+        Path appFilePath = new Path("lib/a.txt");
+        Path appFileFullPath = new Path(appPath, appFilePath);
+        conf.clear();
+        ae.addToCache(conf, appPath, appFileFullPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appFileFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test regular file with fragment
+        Path appFileFragmentPath = new Path("lib/a.txt#a.txt");
+        Path appFileFragmentFullPath = new Path(appPath, appFileFragmentPath);
+        conf.clear();
+        ae.addToCache(conf, appPath, appFileFragmentFullPath.toString(), 
false);
+        
assertTrue(conf.get("mapred.cache.files").contains(appFileFragmentFullPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test path starting with "/" for archive
+        Path testPath = new Path("/tmp/testpath/a.jar#a.jar");
+        conf.clear();
+        ae.addToCache(conf, appPath, testPath.toString(), true);
+        
assertTrue(conf.get("mapred.cache.archives").contains(testPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test path starting with "/" for cache.file
+        conf.clear();
+        ae.addToCache(conf, appPath, testPath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(testPath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test absolute path for archive
+        Path testAbsolutePath = new 
Path("hftp://namenode.test.com:8020/tmp/testpath/a.jar#a.jar";);
+        conf.clear();
+        ae.addToCache(conf, appPath, testAbsolutePath.toString(), true);
+        
assertTrue(conf.get("mapred.cache.archives").contains(testAbsolutePath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test absolute path for cache files
+        conf.clear();
+        ae.addToCache(conf, appPath, testAbsolutePath.toString(), false);
+        
assertTrue(conf.get("mapred.cache.files").contains(testAbsolutePath.toString()));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test relative path for archive
+        conf.clear();
+        ae.addToCache(conf, appPath, "lib/a.jar#a.jar", true);
+        assertTrue(conf.get("mapred.cache.archives").contains(appUri.getPath() 
+ "/lib/a.jar#a.jar"));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+        // test relative path for cache files
+        conf.clear();
+        ae.addToCache(conf, appPath, "lib/a.jar#a.jar", false);
+        assertTrue(conf.get("mapred.cache.files").contains(appUri.getPath() + 
"/lib/a.jar#a.jar"));
+        assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+    }
 }

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Jun  5 16:39:50 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1381 Oozie does not support access to the distributed cache file under 
different name node (ryota)
 OOZIE-1298 TestPartitionDependencyManagerEhcache.testEvictionOnTimeToIdle is 
flakey (rohini)
 OOZIE-1397 failure in running test cases (aklochkov via rohini)
 OOZIE-1294 SLA Email Notification (ryota via virag)


Reply via email to