Repository: oozie
Updated Branches:
  refs/heads/master 6d4a9d0ea -> d5f1e3864


OOZIE-1685 Oozie doesn’t process correctly workflows with a non-default name 
node (benjzh via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5f1e386
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5f1e386
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5f1e386

Branch: refs/heads/master
Commit: d5f1e3864e96c23133304a12adc5a14aebba854b
Parents: 6d4a9d0
Author: Rohini Palaniswamy <[email protected]>
Authored: Tue Jun 17 13:58:39 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Tue Jun 17 13:58:39 2014 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java |  62 +++---
 .../oozie/service/HadoopAccessorService.java    |   9 +-
 .../apache/oozie/service/ShareLibService.java   |  11 ++
 .../java/org/apache/oozie/util/JobUtils.java    |   9 +-
 .../action/hadoop/ActionExecutorTestCase.java   |   4 +
 .../action/hadoop/TestJavaActionExecutor.java   | 190 ++++++++++++++++---
 .../java/org/apache/oozie/test/XFsTestCase.java |  54 ++++--
 .../java/org/apache/oozie/test/XTestCase.java   | 121 +++++++++---
 .../src/site/twiki/WorkflowFunctionalSpec.twiki |   6 +
 release-log.txt                                 |   1 +
 10 files changed, 358 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 40add2c..7a0d0e3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -349,11 +349,27 @@ public class JavaActionExecutor extends ActionExecutor {
             throws IOException, ActionExecutorException, 
HadoopAccessorException, URISyntaxException {
         Namespace ns = element.getNamespace();
         Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
+        HashMap<String, FileSystem> filesystemsMap = new HashMap<String, 
FileSystem>();
+        HadoopAccessorService has = 
Services.get().get(HadoopAccessorService.class);
         while (it.hasNext()) {
             Element e = it.next();
             String jobXml = e.getTextTrim();
-            Path path = new Path(appPath, jobXml);
-            FileSystem fs = context.getAppFileSystem();
+            Path pathSpecified = new Path(jobXml);
+            Path path = pathSpecified.isAbsolute() ? pathSpecified : new 
Path(appPath, jobXml);
+            FileSystem fs;
+            if (filesystemsMap.containsKey(path.toUri().getAuthority())) {
+              fs = filesystemsMap.get(path.toUri().getAuthority());
+            }
+            else {
+              if (path.toUri().getAuthority() != null) {
+                fs = has.createFileSystem(context.getWorkflow().getUser(), 
path.toUri(),
+                        has.createJobConf(path.toUri().getAuthority()));
+              }
+              else {
+                fs = context.getAppFileSystem();
+              }
+              filesystemsMap.put(path.toUri().getAuthority(), fs);
+            }
             Configuration jobXmlConf = new XConfiguration(fs.open(path));
             try {
                 String jobXmlConfString = 
XmlUtils.prettyPrint(jobXmlConf).toString();
@@ -432,22 +448,11 @@ public class JavaActionExecutor extends ActionExecutor {
                 else if (fileName.endsWith(".jar")) { // .jar files
                     if (!fileName.contains("#")) {
                         String user = conf.get("user.name");
-                        Path pathToAdd;
-                        // if filePath and appPath belong to same cluster, add 
URI path component else add absolute URI
-                        if (uri.getScheme() != null && uri.getHost() != null &&
-                            uri.getPort() > 0 && baseUri.getScheme() != null &&
-                            baseUri.getHost() != null && baseUri.getPort() > 0 
&&
-                            
uri.getScheme().equalsIgnoreCase(baseUri.getScheme()) &&
-                            uri.getHost().equalsIgnoreCase(baseUri.getHost()) 
&&
-                            uri.getPort() == baseUri.getPort()) {
-                          pathToAdd = new Path(uri.getPath());
-                        } else {
-                          pathToAdd = new Path(uri.normalize());
-                        }
+                        Path pathToAdd = new Path(uri.normalize());
                         
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, 
pathToAdd, conf);
                     }
                     else {
-                        DistributedCache.addCacheFile(uri, conf);
+                        DistributedCache.addCacheFile(uri.normalize(), conf);
                     }
                 }
                 else { // regular files
@@ -502,25 +507,14 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    protected void addShareLib(Path appPath, Configuration conf, String[] 
actionShareLibNames)
+    protected void addShareLib(Configuration conf, String[] 
actionShareLibNames)
             throws ActionExecutorException {
         if (actionShareLibNames != null) {
-            String user = conf.get("user.name");
-            FileSystem fs;
             try {
-
-                Path systemLibPath = 
Services.get().get(WorkflowAppService.class).getSystemLibPath();
-                if (systemLibPath.toUri().getScheme() != null && 
systemLibPath.toUri().getAuthority() != null) {
-                    fs = Services.get().get(HadoopAccessorService.class)
-                            .createFileSystem(user, systemLibPath.toUri(), 
conf);
-                }
-                else {
-                    fs = 
Services.get().get(HadoopAccessorService.class).createFileSystem(user, 
appPath.toUri(), conf);
-                }
-                for (String actionShareLibName : actionShareLibNames) {
-
-                    if (systemLibPath != null) {
-                        ShareLibService shareLibService = 
Services.get().get(ShareLibService.class);
+                ShareLibService shareLibService = 
Services.get().get(ShareLibService.class);
+                FileSystem fs = shareLibService.getFileSystem();
+                if (fs != null) {
+                  for (String actionShareLibName : actionShareLibNames) {
                         List<Path> listOfPaths = 
shareLibService.getShareLibJars(actionShareLibName);
                         if (listOfPaths != null && !listOfPaths.isEmpty()) {
 
@@ -532,10 +526,6 @@ public class JavaActionExecutor extends ActionExecutor {
                     }
                 }
             }
-            catch (HadoopAccessorException ex) {
-                throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 
ex.getErrorCode()
-                        .toString(), ex.getMessage());
-            }
             catch (IOException ex) {
                 throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should 
never happen",
                         ex.getMessage());
@@ -661,7 +651,7 @@ public class JavaActionExecutor extends ActionExecutor {
         // Action sharelibs are only added if user has specified to use system 
libpath
         if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
             // add action specific sharelibs
-            addShareLib(appPath, conf, getShareLibNames(context, actionXml, 
conf));
+            addShareLib(conf, getShareLibNames(context, actionXml, conf));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java 
b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index bb68b0e..e2db6a6 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -34,6 +34,7 @@ import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.JobUtils;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -506,13 +507,9 @@ public class HadoopAccessorService implements Service {
         try {
             UserGroupInformation ugi = getUGI(user);
             ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
                 public Void run() throws Exception {
-                    Configuration defaultConf = new Configuration();
-                    XConfiguration.copy(conf, defaultConf);
-                    //Doing this NOP add first to have the FS created and 
cached
-                    DistributedCache.addFileToClassPath(file, defaultConf);
-
-                    DistributedCache.addFileToClassPath(file, conf);
+                    JobUtils.addFileToClassPath(file, conf, null);
                     return null;
                 }
             });

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/service/ShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java 
b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
index 320af8b..3ef5e07 100644
--- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java
+++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
@@ -668,4 +668,15 @@ public class ShareLibService implements Service, 
Instrumentable {
             }
         });
     }
+
+    /**
+     * Returns file system for shared libraries.
+     * <p/>
+     * If WorkflowAppService#getSystemLibPath doesn't have authority then a 
default one assumed
+     *
+     * @return file system for shared libraries
+     */
+    public FileSystem getFileSystem() {
+        return fs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/util/JobUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/JobUtils.java 
b/core/src/main/java/org/apache/oozie/util/JobUtils.java
index 135b096..485db17 100644
--- a/core/src/main/java/org/apache/oozie/util/JobUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/JobUtils.java
@@ -140,15 +140,18 @@ public class JobUtils {
      * TODO: Remove the workaround when we drop the support for hadoop 0.20.
      * @param file Path of the file to be added
      * @param conf Configuration that contains the classpath setting
-     * @param fs FileSystem with respect to which path should be interpreted
+     * @param fs FileSystem with respect to which path should be interpreted 
(may be null)
      * @throws IOException
      */
     public static void addFileToClassPath(Path file, Configuration conf, 
FileSystem fs) throws IOException {
       Configuration defaultConf = new Configuration();
       XConfiguration.copy(conf, defaultConf);
-      DistributedCache.addFileToClassPath(file, defaultConf, fs);
+      if (fs == null) {
+        // it fails with conf, therefore we pass defaultConf instead
+        fs = file.getFileSystem(defaultConf);
+      }
       // Hadoop 0.20/1.x.
-      if (defaultConf.get("mapred.job.classpath.files") != null) {
+      if (defaultConf.get("yarn.resourcemanager.address") == null) {
           // Duplicate hadoop 1.x code to workaround MAPREDUCE-2361 in Hadoop 
0.20
           // Refer OOZIE-1806.
           String filepath = file.toUri().getPath();

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
index bc2c1b6..3e778bc 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
@@ -61,6 +61,7 @@ public abstract class ActionExecutorTestCase extends 
XFsTestCase {
 
     @Override
     protected void setUp() throws Exception {
+        beforeSetUp();
         super.setUp();
         setSystemProps();
         new Services().init();
@@ -69,6 +70,9 @@ public abstract class ActionExecutorTestCase extends 
XFsTestCase {
     protected void setSystemProps() throws Exception {
     }
 
+    protected void beforeSetUp() throws Exception {
+    }
+
     @Override
     protected void tearDown() throws Exception {
         if (Services.get() != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 390ad3f..72a137c 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -71,13 +71,19 @@ import org.junit.Test;
 public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
     @Override
+    protected void beforeSetUp() throws Exception {
+        super.beforeSetUp();
+        setSystemProperty("oozie.test.hadoop.minicluster2", "true");
+    }
+
+    @Override
     protected void setSystemProps() throws Exception {
         super.setSystemProps();
 
         setSystemProperty("oozie.service.ActionService.executor.classes", 
JavaActionExecutor.class.getName());
         
setSystemProperty("oozie.service.HadoopAccessorService.action.configurations",
                           "*=hadoop-conf," + getJobTrackerUri() + 
"=action-conf");
-        setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, 
getFsTestCaseDir() + "/systemlib");
+        setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, 
getFsTestCaseDir().toUri().getPath() + "/systemlib");
         new File(getTestCaseConfDir(), "action-conf").mkdir();
         InputStream is = 
Thread.currentThread().getContextClassLoader().getResourceAsStream("test-action-config.xml");
         OutputStream os = new FileOutputStream(new File(getTestCaseConfDir() + 
"/action-conf", "java.xml"));
@@ -1358,7 +1364,7 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
             }
         };
         String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + 
"</job-tracker>" + "<name-node>"
-                + getNameNodeUri() + "</name-node>" + "<main-class>" + 
LauncherMainTester.class.getName()
+                + getNameNode2Uri() + "</name-node>" + "<main-class>" + 
LauncherMainTester.class.getName()
                 + "</main-class>" + "</java>";
         Element eActionXml = XmlUtils.parseXml(actionXml);
         Context context = createContext(actionXml, null);
@@ -1375,35 +1381,19 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         getFileSystem().mkdirs(javaShareLibPath);
         Services.get().setService(ShareLibService.class);
 
-        Path appPath = getAppPath();
         JobConf conf = ae.createBaseHadoopConf(context, eActionXml);
-        // The next line should not throw an Exception because it will get the 
scheme and authority from the appPath, and not the
-        // sharelib path because it doesn't have a scheme or authority
-        ae.addShareLib(appPath, conf, new String[]{"java-action-executor"});
-
-        appPath = new Path("foo://bar:1234/blah");
-        conf = ae.createBaseHadoopConf(context, eActionXml);
-        // The next line should throw an Exception because it will get the 
scheme and authority from the appPath, which is obviously
-        // invalid, and not the sharelib path because it doesn't have a scheme 
or authority
-        try {
-            ae.addShareLib(appPath, conf, new 
String[]{"java-action-executor"});
-            fail();
-        }
-        catch (ActionExecutorException aee) {
-            assertEquals("E0902", aee.getErrorCode());
-            assertTrue(aee.getMessage().contains("[No FileSystem for scheme: 
foo]"));
-        }
+        // Despite systemLibPath is not fully qualified and the action refers 
to the
+        // second namenode the next line won't throw exception because default 
fs is used
+        ae.addShareLib(conf, new String[]{"java-action-executor"});
 
         // Set sharelib to a full path (i.e. include scheme and authority)
         Services.get().destroy();
         setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, getNameNodeUri() 
+ "/user/" + getTestUser() + "/share/");
         new Services().init();
         Services.get().setService(ShareLibService.class);
-        appPath = new Path("foo://bar:1234/blah");
         conf = ae.createBaseHadoopConf(context, eActionXml);
-        // The next line should not throw an Exception because it will get the 
scheme and authority from the sharelib path (and not
-        // from the obviously invalid appPath)
-        ae.addShareLib(appPath, conf, new String[]{"java-action-executor"});
+        // The next line should not throw an Exception because it will get the 
scheme and authority from the sharelib path
+        ae.addShareLib(conf, new String[]{"java-action-executor"});
     }
 
     public void testFilesystemScheme() throws Exception {
@@ -1935,8 +1925,10 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         conf.clear();
         conf.set(WorkflowAppService.HADOOP_USER, getTestUser());
         ae.addToCache(conf, appPath, appJarFullPath.toString(), false);
-        // assert that mapred.cache.files contains jar URI path
-        Path jarPath = new Path(appJarFullPath.toUri().getPath());
+        // assert that mapred.cache.files contains jar URI path (full on 
Hadoop-2)
+        Path jarPath = createJobConf().get("yarn.resourcemanager.address") == 
null ?
+                new Path(appJarFullPath.toUri().getPath()) :
+                new Path(appJarFullPath.toUri());
         
assertTrue(conf.get("mapred.cache.files").contains(jarPath.toString()));
         // assert that dist cache classpath contains jar URI path
         Path[] paths = DistributedCache.getFileClassPaths(conf);
@@ -2025,4 +2017,152 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(conf.get("mapred.cache.files").contains(appUri.getPath() + 
"/lib/a.jar#a.jar"));
         assertTrue(DistributedCache.getSymlink(conf));
     }
+
+    public void testJobXmlAndNonDefaultNamenode() throws Exception {
+        // By default the job.xml file is taken from the workflow application
+        // namenode, regadless the namenode specified for the action. To 
specify
+        // a job.xml on another namenode use a fully qualified file path.
+
+        Path appPath = new Path(getFsTestCaseDir(), "app");
+        getFileSystem().mkdirs(appPath);
+
+        Path jobXmlAbsolutePath = new 
Path(getFsTestCaseDir().toUri().getPath(), "jobxmlpath/job.xml");
+        assertTrue(jobXmlAbsolutePath.isAbsolute() && 
jobXmlAbsolutePath.toUri().getAuthority() == null);
+        Path jobXmlAbsolutePath2 = new 
Path(getFsTestCaseDir().toUri().getPath(), "jobxmlpath/job3.xml");
+        assertTrue(jobXmlAbsolutePath2.isAbsolute() && 
jobXmlAbsolutePath2.toUri().getAuthority() == null);
+        Path jobXmlQualifiedPath = new Path(getFs2TestCaseDir(), 
"jobxmlpath/job4.xml");
+        assertTrue(jobXmlQualifiedPath.toUri().getAuthority() != null);
+
+        // Use non-default name node (second filesystem) and three job-xml 
configurations:
+        // 1. Absolute (but not fully qualified) path located in the first 
filesystem
+        // 2. Without path (fist filesystem)
+        // 3. Absolute (but not fully qualified) path located in the both 
filesystems
+        //   (first should be used)
+        // 4. Fully qualified path located in the second filesystem
+        String str = "<java>"
+                + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+                + "<name-node>" + getNameNode2Uri() + "</name-node>"
+                + "<job-xml>" + jobXmlAbsolutePath.toString() + "</job-xml>"
+                + "<job-xml>job2.xml</job-xml>"
+                + "<job-xml>" + jobXmlAbsolutePath2.toString() + "</job-xml>"
+                + "<job-xml>" + jobXmlQualifiedPath.toString() + "</job-xml>"
+                + "<configuration>"
+                + "<property><name>p1</name><value>v1a</value></property>"
+                + "<property><name>p2</name><value>v2</value></property>"
+                + "</configuration>"
+                + "</java>";
+        Element xml = XmlUtils.parseXml(str);
+
+        XConfiguration jConf = new XConfiguration();
+        jConf.set("p1", "v1b");
+        jConf.set("p3", "v3a");
+        OutputStream os = getFileSystem().create(jobXmlAbsolutePath);
+        jConf.writeXml(os);
+        os.close();
+
+        jConf = new XConfiguration();
+        jConf.set("p4", "v4");
+        jConf.set("p3", "v3b");
+        os = getFileSystem().create(new Path(appPath, "job2.xml"));
+        jConf.writeXml(os);
+        os.close();
+
+        // This configuration is expected to be used
+        jConf = new XConfiguration();
+        jConf.set("p5", "v5a");
+        jConf.set("p6", "v6a");
+        os = getFileSystem().create(jobXmlAbsolutePath2);
+        jConf.writeXml(os);
+        os.close();
+
+        // This configuration is expected to be ignored
+        jConf = new XConfiguration();
+        jConf.set("p5", "v5b");
+        jConf.set("p6", "v6b");
+        os = getFileSystem2().create(new 
Path(jobXmlAbsolutePath2.toUri().getPath()));
+        jConf.writeXml(os);
+        os.close();
+
+        jConf = new XConfiguration();
+        jConf.set("p7", "v7a");
+        jConf.set("p8", "v8a");
+        os = getFileSystem2().create(jobXmlQualifiedPath);
+        jConf.writeXml(os);
+        os.close();
+
+        Context context = createContext("<java/>", null);
+        Configuration conf = new 
JavaActionExecutor().createBaseHadoopConf(context, xml);
+        int confSize0 = conf.size();
+        JavaActionExecutor.parseJobXmlAndConfiguration(context, xml, appPath, 
conf);
+        assertEquals(confSize0 + 8, conf.size());
+        assertEquals("v1a", conf.get("p1"));
+        assertEquals("v2", conf.get("p2"));
+        assertEquals("v3b", conf.get("p3"));
+        assertEquals("v4", conf.get("p4"));
+        assertEquals("v5a", conf.get("p5"));
+        assertEquals("v6a", conf.get("p6"));
+        assertEquals("v7a", conf.get("p7"));
+        assertEquals("v8a", conf.get("p8"));
+    }
+
+    public void testActionShareLibWithNonDefaultNamenode() throws Exception {
+
+        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
+
+        Path systemLibPath = new Path(wps.getSystemLibPath(), 
ShareLibService.SHARED_LIB_PREFIX
+                + new SimpleDateFormat("yyyyMMddHHmmss").format(new 
Date()).toString());
+
+        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), 
"sourcejar.jar", LauncherMainTester.class);
+        InputStream is = new FileInputStream(jarFile);
+        Path javaShareLibPath = new Path(systemLibPath, "java");
+        getFileSystem().mkdirs(javaShareLibPath);
+        Path jar1Path = new Path(javaShareLibPath, "jar1.jar");
+        OutputStream os1 = getFileSystem().create(jar1Path);
+        IOUtils.copyStream(is, os1);
+        Path jar2Path = new Path(javaShareLibPath, "jar2.jar");
+        OutputStream os2 = getFileSystem().create(jar2Path);
+        is = new FileInputStream(jarFile); // is not resetable
+        IOUtils.copyStream(is, os2);
+        Path launcherPath = new Path(systemLibPath, "oozie");
+        getFileSystem().mkdirs(launcherPath);
+        Path jar3Path = new Path(launcherPath, "jar3.jar");
+        OutputStream os3 = getFileSystem().create(jar3Path);
+        is = new FileInputStream(jarFile);
+        IOUtils.copyStream(is, os3);
+
+        String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + 
"</job-tracker>" +
+                "<name-node>" + getNameNode2Uri() + "</name-node>" +
+                "<job-xml>job.xml</job-xml>" +
+                "<main-class>"+ LauncherMainTester.class.getName() + 
"</main-class>" +
+                "</java>";
+
+        XConfiguration jConf = new XConfiguration();
+        jConf.set("p", "v");
+        OutputStream os = getFileSystem().create(new Path(getAppPath(), 
"job.xml"));
+        jConf.writeXml(os);
+        os.close();
+
+        Context context = createContext(actionXml, null);
+
+        Services.get().setService(ShareLibService.class);
+
+        // Test oozie server action sharelib setting
+        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
+        XConfiguration wfConf = new XConfiguration();
+        wfConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+        wfConf.set(OozieClient.APP_PATH, new Path(getAppPath(), 
"workflow.xml").toString());
+        wfConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
+        workflow.setConf(XmlUtils.prettyPrint(wfConf).toString());
+
+        Services.get().getConf().set("oozie.action.sharelib.for.java", "java");
+
+        final RunningJob runningJob = submitAction(context);
+        waitFor(60 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return runningJob.isComplete();
+            }
+        });
+        assertTrue(runningJob.isSuccessful());
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
index 18cb742..ac1e2df 100644
--- a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
@@ -47,7 +47,9 @@ import java.net.URI;
 public abstract class XFsTestCase extends XTestCase {
     private static HadoopAccessorService has;
     private FileSystem fileSystem;
+    private FileSystem fileSystem2;
     private Path fsTestDir;
+    private Path fsTestDir2;
 
     /**
      * Set up the testcase.
@@ -72,18 +74,27 @@ public abstract class XFsTestCase extends XTestCase {
         JobConf jobConf = has.createJobConf(getNameNodeUri());
         XConfiguration.copy(conf, jobConf);
         fileSystem = has.createFileSystem(getTestUser(), new 
URI(getNameNodeUri()), jobConf);
-        Path path = new Path(fileSystem.getWorkingDirectory(), 
java.util.UUID.randomUUID().toString());
-        fsTestDir = fileSystem.makeQualified(path);
-        System.out.println(XLog.format("Setting FS testcase work dir[{0}]", 
fsTestDir));
-        if (fileSystem.exists(fsTestDir)) {
-            setAllPermissions(fileSystem, fsTestDir);
+        fsTestDir = initFileSystem(fileSystem);
+        if (System.getProperty("oozie.test.hadoop.minicluster2", 
"false").equals("true")) {
+            fileSystem2 = has.createFileSystem(getTestUser(), new 
URI(getNameNode2Uri()), jobConf);
+            fsTestDir2 = initFileSystem(fileSystem2);
         }
-        fileSystem.delete(fsTestDir, true);
-        if (!fileSystem.mkdirs(path)) {
-            throw new IOException(XLog.format("Could not create FS testcase 
dir [{0}]", fsTestDir));
+    }
+
+    private Path initFileSystem(FileSystem fs) throws Exception {
+        Path path = new Path(fs.getWorkingDirectory(), 
java.util.UUID.randomUUID().toString());
+        Path testDirInFs = fs.makeQualified(path);
+        System.out.println(XLog.format("Setting FS testcase work dir[{0}]", 
testDirInFs));
+        if (fs.exists(testDirInFs)) {
+            setAllPermissions(fs, testDirInFs);
         }
-        fileSystem.setOwner(fsTestDir, getTestUser(), getTestGroup());
-        fileSystem.setPermission(fsTestDir, 
FsPermission.valueOf("-rwxrwx--x"));
+        fs.delete(testDirInFs, true);
+        if (!fs.mkdirs(path)) {
+            throw new IOException(XLog.format("Could not create FS testcase 
dir [{0}]", testDirInFs));
+        }
+        fs.setOwner(testDirInFs, getTestUser(), getTestGroup());
+        fs.setPermission(testDirInFs, FsPermission.valueOf("-rwxrwx--x"));
+        return testDirInFs;
     }
 
     private void setAllPermissions(FileSystem fileSystem, Path path) throws 
IOException {
@@ -112,15 +123,24 @@ public abstract class XFsTestCase extends XTestCase {
     }
 
     /**
-     * Return the file system used by the tescase.
+     * Return the file system used by the test case.
      *
-     * @return the file system used by the tescase.
+     * @return the file system used by the test case.
      */
     protected FileSystem getFileSystem() {
         return fileSystem;
     }
 
     /**
+     * Return the file system of the second cluster.
+     *
+     * @return the second file system used by the test case.
+     */
+    protected FileSystem getFileSystem2() {
+        return fileSystem2;
+    }
+
+    /**
      * Return the FS test working directory. The directory name is the full 
class name of the test plus the test method
      * name.
      *
@@ -131,6 +151,16 @@ public abstract class XFsTestCase extends XTestCase {
     }
 
     /**
+     * Return the FS test working directory of the second cluster. The 
directory name is
+     * the full class name of the test plus the test method name.
+     *
+     * @return the second FS test working directory path, it is always an full 
and absolute path.
+     */
+    protected Path getFs2TestCaseDir() {
+        return fsTestDir2;
+    }
+
+    /**
      * Return a JobClient to the test JobTracker.
      *
      * @return a JobClient to the test JobTracker.

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 6bf0a8f..e739ec3 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
 import junit.framework.TestCase;
+import org.apache.commons.io.FilenameUtils;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -195,6 +197,12 @@ public abstract class XTestCase extends TestCase {
     public static final String OOZIE_TEST_NAME_NODE = "oozie.test.name.node";
 
     /**
+     * System property to specify the second Hadoop Name Node to use for 
testing. </p> If this property is not set, the assumed
+     * value is 'locahost:9100'.
+     */
+    public static final String OOZIE_TEST_NAME_NODE2 = "oozie.test.name.node2";
+
+    /**
      * System property to specify the Hadoop Version to use for testing. </p> 
If this property is not set, the assumed
      * value is "0.20.0"
      */
@@ -374,6 +382,10 @@ public abstract class XTestCase extends TestCase {
         }
         if (System.getProperty("oozie.test.hadoop.minicluster", 
"true").equals("true")) {
             setUpEmbeddedHadoop(getTestCaseDir());
+            // Second cluster is not necessary without the first one
+            if (System.getProperty("oozie.test.hadoop.minicluster2", 
"false").equals("true")) {
+                setUpEmbeddedHadoop2();
+            }
         }
 
         if (System.getProperty("oozie.test.db.host") == null) {
@@ -735,6 +747,16 @@ public abstract class XTestCase extends TestCase {
         return System.getProperty(OOZIE_TEST_NAME_NODE, 
"hdfs://localhost:9000");
     }
 
+    /**
+     * Return the second Hadoop Name Node to use for testing. </p> The value 
is taken from the Java sytem property {@link
+     * #OOZIE_TEST_NAME_NODE2}, if this property is not set, the assumed value 
is 'locahost:9100'.
+     *
+     * @return the second name node URI.
+     */
+    protected String getNameNode2Uri() {
+        return System.getProperty(OOZIE_TEST_NAME_NODE2, 
"hdfs://localhost:9100");
+    }
+
     public String getKeytabFile() {
         String defaultFile = new File(System.getProperty("user.home"), 
"oozie.keytab").getAbsolutePath();
         return System.getProperty("oozie.test.kerberos.keytab.file", 
defaultFile);
@@ -866,6 +888,7 @@ public abstract class XTestCase extends TestCase {
     }
 
     private static MiniDFSCluster dfsCluster = null;
+    private static MiniDFSCluster dfsCluster2 = null;
     private static MiniMRCluster mrCluster = null;
     private static MiniHCatServer hcatServer = null;
 
@@ -877,37 +900,12 @@ public abstract class XTestCase extends TestCase {
             int taskTrackers = 2;
             int dataNodes = 2;
             String oozieUser = getOozieUser();
-            JobConf conf = new JobConf();
-            conf.set("dfs.block.access.token.enable", "false");
-            conf.set("dfs.permissions", "true");
-            conf.set("hadoop.security.authentication", "simple");
-
-            //Doing this because Hadoop 1.x does not support '*' and
-            //Hadoop 0.23.x does not process wildcard if the value is
-            // '*,127.0.0.1'
-            StringBuilder sb = new StringBuilder();
-            sb.append("127.0.0.1,localhost");
-            for (InetAddress i : 
InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
-                sb.append(",").append(i.getCanonicalHostName());
-            }
-            conf.set("hadoop.proxyuser." + oozieUser + ".hosts", 
sb.toString());
-
-            conf.set("hadoop.proxyuser." + oozieUser + ".groups", 
getTestGroup());
-            conf.set("mapred.tasktracker.map.tasks.maximum", "4");
-            conf.set("mapred.tasktracker.reduce.tasks.maximum", "4");
-
+            JobConf conf = createDFSConfig();
             String[] userGroups = new String[] { getTestGroup(), 
getTestGroup2() };
             UserGroupInformation.createUserForTesting(oozieUser, userGroups);
             UserGroupInformation.createUserForTesting(getTestUser(), 
userGroups);
             UserGroupInformation.createUserForTesting(getTestUser2(), 
userGroups);
             UserGroupInformation.createUserForTesting(getTestUser3(), new 
String[] { "users" } );
-            conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster");
-
-            // Scheduler properties required for YARN CapacityScheduler to work
-            conf.set("yarn.scheduler.capacity.root.queues", "default");
-            conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
-            // Required to prevent deadlocks with YARN CapacityScheduler
-            conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", 
"0.5");
 
             try {
                 dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
@@ -945,6 +943,64 @@ public abstract class XTestCase extends TestCase {
         }
     }
 
+    private void setUpEmbeddedHadoop2() throws Exception {
+        if (dfsCluster != null && dfsCluster2 == null) {
+            // Trick dfs location for MiniDFSCluster since it doesn't accept 
location as input)
+            String testBuildDataSaved = System.getProperty("test.build.data", 
"build/test/data");
+            try {
+                System.setProperty("test.build.data", 
FilenameUtils.concat(testBuildDataSaved, "2"));
+                // Only DFS cluster is created based upon current need
+                dfsCluster2 = new MiniDFSCluster(createDFSConfig(), 2, true, 
null);
+                FileSystem fileSystem = dfsCluster2.getFileSystem();
+                fileSystem.mkdirs(new Path("target/test-data"));
+                fileSystem.mkdirs(new Path("/user"));
+                fileSystem.mkdirs(new Path("/tmp"));
+                fileSystem.setPermission(new Path("target/test-data"), 
FsPermission.valueOf("-rwxrwxrwx"));
+                fileSystem.setPermission(new Path("/user"), 
FsPermission.valueOf("-rwxrwxrwx"));
+                fileSystem.setPermission(new Path("/tmp"), 
FsPermission.valueOf("-rwxrwxrwx"));
+                System.setProperty(OOZIE_TEST_NAME_NODE2, 
fileSystem.getConf().get("fs.default.name"));
+            }
+            catch (Exception ex) {
+                shutdownMiniCluster2();
+                throw ex;
+            }
+            finally {
+                // Restore previus value
+                System.setProperty("test.build.data", testBuildDataSaved);
+            }
+        }
+    }
+
+    private JobConf createDFSConfig() throws UnknownHostException {
+      JobConf conf = new JobConf();
+      conf.set("dfs.block.access.token.enable", "false");
+      conf.set("dfs.permissions", "true");
+      conf.set("hadoop.security.authentication", "simple");
+
+      //Doing this because Hadoop 1.x does not support '*' and
+      //Hadoop 0.23.x does not process wildcard if the value is
+      // '*,127.0.0.1'
+      StringBuilder sb = new StringBuilder();
+      sb.append("127.0.0.1,localhost");
+      for (InetAddress i : 
InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
+          sb.append(",").append(i.getCanonicalHostName());
+      }
+      conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString());
+
+      conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", 
getTestGroup());
+      conf.set("mapred.tasktracker.map.tasks.maximum", "4");
+      conf.set("mapred.tasktracker.reduce.tasks.maximum", "4");
+
+      conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster");
+
+      // Scheduler properties required for YARN CapacityScheduler to work
+      conf.set("yarn.scheduler.capacity.root.queues", "default");
+      conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+      // Required to prevent deadlocks with YARN CapacityScheduler
+      conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5");
+      return conf;
+    }
+
     private void setupHCatalogServer() throws Exception {
         if (hcatServer == null) {
             hcatServer = new MiniHCatServer(RUNMODE.SERVER, createJobConf());
@@ -972,6 +1028,16 @@ public abstract class XTestCase extends TestCase {
         }
     }
 
+    private static void shutdownMiniCluster2() {
+        try {
+            if (dfsCluster2 != null) {
+                dfsCluster2.shutdown();
+            }
+        }
+        catch (Exception ex) {
+            System.out.println(ex);
+        }
+    }
     private static final AtomicLong LAST_TESTCASE_FINISHED = new AtomicLong();
     private static final AtomicInteger RUNNING_TESTCASES = new AtomicInteger();
 
@@ -998,6 +1064,7 @@ public abstract class XTestCase extends TestCase {
                 }
             }
             shutdownMiniCluster();
+            shutdownMiniCluster2();
         }
     }
 
@@ -1019,7 +1086,7 @@ public abstract class XTestCase extends TestCase {
      * Returns a jobconf preconfigured to talk with the test 
cluster/minicluster.
      * @return a jobconf preconfigured to talk with the test 
cluster/minicluster.
      */
-    protected JobConf createJobConf() {
+    protected JobConf createJobConf() throws IOException {
         JobConf jobConf;
         if (mrCluster != null) {
             jobConf = createJobConfFromMRCluster();

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki 
b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
index f7590d0..f0eb393 100644
--- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
+++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -14,6 +14,10 @@ Map/Reduce and Pig jobs.
 %TOC%
 
 ---++ Changelog
+
+---+++!! 2014MAY08
+
+   * #3.2.2.4 Added support for fully qualified job-xml path
 ---+++!! 2013JUL03
 
    * #Appendix A, Added new workflow schema 0.5 and SLA schema 0.2
@@ -687,6 +691,8 @@ In case of a hcatalog URI, the hive-site.xml needs to be 
shipped using =file= ta
 need to be placed in workflow lib directory or specified using =archive= tag.
 
 The =job-xml= element, if present, must refer to a Hadoop JobConf =job.xml= 
file bundled in the workflow application.
+By default the =job.xml= file is taken from the workflow application namenode, 
regardless the namenode specified for the action.
+To specify a =job.xml= on another namenode use a fully qualified file path.
 The =job-xml= element is optional and as of schema 0.4, multiple =job-xml= 
elements are allowed in order to specify multiple Hadoop JobConf =job.xml= 
files.
 
 The =configuration= element, if present, contains JobConf properties for the 
Hadoop job.

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ba350e1..e8fa1a8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1685 Oozie doesn’t process correctly workflows with a non-default name 
node (benjzh via rohini)
 OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)
 OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes 
(rkanter)
 OOZIE-1659 oozie-site is missing email-action-0.2 schema (jagatsingh via 
rkanter)

Reply via email to