Author: ryota
Date: Wed Jun 5 16:39:50 2013
New Revision: 1489945
URL: http://svn.apache.org/r1489945
Log:
OOZIE-1381 Oozie does not support access to the distributed cache file under
different name node
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Wed Jun 5 16:39:50 2013
@@ -307,31 +307,31 @@ public class JavaActionExecutor extends
Configuration addToCache(Configuration conf, Path appPath, String
filePath, boolean archive)
throws ActionExecutorException {
- Path path = null;
+
+ URI uri = null;
try {
- if (filePath.startsWith("/")) {
- path = new Path(filePath);
- }
- else {
- path = new Path(appPath, filePath);
+ uri = new URI(filePath);
+ URI baseUri = appPath.toUri();
+ if (uri.getScheme() == null) {
+ String resolvedPath = uri.getPath();
+ if (!resolvedPath.startsWith("/")) {
+ resolvedPath = baseUri.getPath() + "/" + resolvedPath;
+ }
+ uri = new URI(baseUri.getScheme(), baseUri.getAuthority(),
resolvedPath, uri.getQuery(), uri.getFragment());
}
- URI uri = new URI(path.toUri().getPath());
if (archive) {
- DistributedCache.addCacheArchive(uri, conf);
+ DistributedCache.addCacheArchive(uri.normalize(), conf);
}
else {
String fileName = filePath.substring(filePath.lastIndexOf("/")
+ 1);
- if (fileName.endsWith(".so") || fileName.contains(".so.")) {
// .so files
- uri = new Path(path.toString() + "#" + fileName).toUri();
- uri = new URI(uri.getPath());
- DistributedCache.addCacheFile(uri, conf);
+ if (fileName.endsWith(".so") || fileName.contains(".so.")) {
// .so files
+ uri = new URI(uri.getScheme(), uri.getAuthority(),
uri.getPath(), uri.getQuery(), fileName);
+ DistributedCache.addCacheFile(uri.normalize(), conf);
}
else if (fileName.endsWith(".jar")) { // .jar files
if (!fileName.contains("#")) {
- path = new Path(uri.toString());
-
String user = conf.get("user.name");
-
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path,
conf);
+
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new
Path(uri.normalize()), conf);
}
else {
DistributedCache.addCacheFile(uri, conf);
@@ -339,10 +339,9 @@ public class JavaActionExecutor extends
}
else { // regular files
if (!fileName.contains("#")) {
- uri = new Path(path.toString() + "#" +
fileName).toUri();
- uri = new URI(uri.getPath());
+ uri = new URI(uri.getScheme(), uri.getAuthority(),
uri.getPath(), uri.getQuery(), fileName);
}
- DistributedCache.addCacheFile(uri, conf);
+ DistributedCache.addCacheFile(uri.normalize(), conf);
}
}
DistributedCache.createSymlink(conf);
@@ -350,7 +349,7 @@ public class JavaActionExecutor extends
}
catch (Exception ex) {
XLog.getLog(getClass()).debug(
- "Errors when add to DistributedCache. Path=" + path + ",
archive=" + archive + ", conf="
+ "Errors when add to DistributedCache. Path=" +
uri.toString() + ", archive=" + archive + ", conf="
+ XmlUtils.prettyPrint(conf).toString());
throw convertException(ex);
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
Wed Jun 5 16:39:50 2013
@@ -1490,4 +1490,115 @@ public class TestJavaActionExecutor exte
jae.injectLauncherUseUberMode(conf);
assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
}
+
+ public void testAddToCache() throws Exception {
+ JavaActionExecutor ae = new JavaActionExecutor();
+ Configuration conf = new XConfiguration();
+
+ Path appPath = new Path(getFsTestCaseDir(), "wf");
+ URI appUri = appPath.toUri();
+
+ // test archive without fragment
+ Path archivePath = new Path("test.jar");
+ Path archiveFullPath = new Path(appPath, archivePath);
+ ae.addToCache(conf, appPath, archiveFullPath.toString(), true);
+
assertTrue(conf.get("mapred.cache.archives").contains(archiveFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test archive with fragment
+ Path archiveFragmentPath = new Path("test.jar#a.jar");
+ Path archiveFragmentFullPath = new Path(appPath, archiveFragmentPath);
+ conf.clear();
+ ae.addToCache(conf, appPath, archiveFragmentFullPath.toString(), true);
+
assertTrue(conf.get("mapred.cache.archives").contains(archiveFragmentFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test .so without fragment
+ Path appSoPath = new Path("lib/a.so");
+ Path appSoFullPath = new Path(appPath, appSoPath);
+ conf.clear();
+ ae.addToCache(conf, appPath, appSoFullPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(appSoFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test .so with fragment
+ Path appSoFragmentPath = new Path("lib/a.so#a.so");
+ Path appSoFragmentFullPath = new Path(appPath, appSoFragmentPath);
+ conf.clear();
+ ae.addToCache(conf, appPath, appSoFragmentFullPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(appSoFragmentFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test .jar without fragment
+ Path appJarPath = new Path("lib/a.jar");
+ Path appJarFullPath = new Path(appPath, appJarPath);
+ conf.clear();
+ conf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+ ae.addToCache(conf, appPath, appJarFullPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(appJarFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test .jar with fragment
+ Path appJarFragmentPath = new Path("lib/a.jar#a.jar");
+ Path appJarFragmentFullPath = new Path(appPath, appJarFragmentPath);
+ conf.clear();
+ conf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+ ae.addToCache(conf, appPath, appJarFragmentFullPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(appJarFragmentFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test regular file without fragment
+ Path appFilePath = new Path("lib/a.txt");
+ Path appFileFullPath = new Path(appPath, appFilePath);
+ conf.clear();
+ ae.addToCache(conf, appPath, appFileFullPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(appFileFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test regular file with fragment
+ Path appFileFragmentPath = new Path("lib/a.txt#a.txt");
+ Path appFileFragmentFullPath = new Path(appPath, appFileFragmentPath);
+ conf.clear();
+ ae.addToCache(conf, appPath, appFileFragmentFullPath.toString(),
false);
+
assertTrue(conf.get("mapred.cache.files").contains(appFileFragmentFullPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test path starting with "/" for archive
+ Path testPath = new Path("/tmp/testpath/a.jar#a.jar");
+ conf.clear();
+ ae.addToCache(conf, appPath, testPath.toString(), true);
+
assertTrue(conf.get("mapred.cache.archives").contains(testPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test path starting with "/" for cache.file
+ conf.clear();
+ ae.addToCache(conf, appPath, testPath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(testPath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test absolute path for archive
+ Path testAbsolutePath = new
Path("hftp://namenode.test.com:8020/tmp/testpath/a.jar#a.jar");
+ conf.clear();
+ ae.addToCache(conf, appPath, testAbsolutePath.toString(), true);
+
assertTrue(conf.get("mapred.cache.archives").contains(testAbsolutePath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test absolute path for cache files
+ conf.clear();
+ ae.addToCache(conf, appPath, testAbsolutePath.toString(), false);
+
assertTrue(conf.get("mapred.cache.files").contains(testAbsolutePath.toString()));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test relative path for archive
+ conf.clear();
+ ae.addToCache(conf, appPath, "lib/a.jar#a.jar", true);
+ assertTrue(conf.get("mapred.cache.archives").contains(appUri.getPath()
+ "/lib/a.jar#a.jar"));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+
+ // test relative path for cache files
+ conf.clear();
+ ae.addToCache(conf, appPath, "lib/a.jar#a.jar", false);
+ assertTrue(conf.get("mapred.cache.files").contains(appUri.getPath() +
"/lib/a.jar#a.jar"));
+ assertTrue(conf.get("mapred.create.symlink").contains("yes"));
+ }
}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1489945&r1=1489944&r2=1489945&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Jun 5 16:39:50 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1381 Oozie does not support access to the distributed cache file under
different name node (ryota)
OOZIE-1298 TestPartitionDependencyManagerEhcache.testEvictionOnTimeToIdle is
flakey (rohini)
OOZIE-1397 failure in running test cases (aklochkov via rohini)
OOZIE-1294 SLA Email Notification (ryota via virag)