Repository: oozie Updated Branches: refs/heads/master 6d4a9d0ea -> d5f1e3864
OOZIE-1685 Oozie doesnât process correctly workflows with a non-default name node (benjzh via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5f1e386 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5f1e386 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5f1e386 Branch: refs/heads/master Commit: d5f1e3864e96c23133304a12adc5a14aebba854b Parents: 6d4a9d0 Author: Rohini Palaniswamy <[email protected]> Authored: Tue Jun 17 13:58:39 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Tue Jun 17 13:58:39 2014 -0700 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 62 +++--- .../oozie/service/HadoopAccessorService.java | 9 +- .../apache/oozie/service/ShareLibService.java | 11 ++ .../java/org/apache/oozie/util/JobUtils.java | 9 +- .../action/hadoop/ActionExecutorTestCase.java | 4 + .../action/hadoop/TestJavaActionExecutor.java | 190 ++++++++++++++++--- .../java/org/apache/oozie/test/XFsTestCase.java | 54 ++++-- .../java/org/apache/oozie/test/XTestCase.java | 121 +++++++++--- .../src/site/twiki/WorkflowFunctionalSpec.twiki | 6 + release-log.txt | 1 + 10 files changed, 358 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 40add2c..7a0d0e3 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -349,11 +349,27 @@ public class JavaActionExecutor extends ActionExecutor { throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { Namespace ns = element.getNamespace(); Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); + HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>(); + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); while (it.hasNext()) { Element e = it.next(); String jobXml = e.getTextTrim(); - Path path = new Path(appPath, jobXml); - FileSystem fs = context.getAppFileSystem(); + Path pathSpecified = new Path(jobXml); + Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml); + FileSystem fs; + if (filesystemsMap.containsKey(path.toUri().getAuthority())) { + fs = filesystemsMap.get(path.toUri().getAuthority()); + } + else { + if (path.toUri().getAuthority() != null) { + fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(), + has.createJobConf(path.toUri().getAuthority())); + } + else { + fs = context.getAppFileSystem(); + } + filesystemsMap.put(path.toUri().getAuthority(), fs); + } Configuration jobXmlConf = new XConfiguration(fs.open(path)); try { String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString(); @@ -432,22 +448,11 @@ public class JavaActionExecutor extends ActionExecutor { else if (fileName.endsWith(".jar")) { // .jar files if (!fileName.contains("#")) { String user = conf.get("user.name"); - Path pathToAdd; - // if filePath and appPath belong to same cluster, add URI path component else add absolute URI - if (uri.getScheme() != null && uri.getHost() != null && - uri.getPort() > 0 && baseUri.getScheme() != null && - baseUri.getHost() != null && baseUri.getPort() > 0 && - uri.getScheme().equalsIgnoreCase(baseUri.getScheme()) && - uri.getHost().equalsIgnoreCase(baseUri.getHost()) && - uri.getPort() == baseUri.getPort()) { - pathToAdd = new Path(uri.getPath()); - } else { - pathToAdd = new Path(uri.normalize()); - } + Path pathToAdd = new Path(uri.normalize()); Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf); } else { - DistributedCache.addCacheFile(uri, conf); + DistributedCache.addCacheFile(uri.normalize(), conf); } } else { // regular files @@ -502,25 +507,14 @@ public class JavaActionExecutor extends ActionExecutor { } } - protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames) + protected void addShareLib(Configuration conf, String[] actionShareLibNames) throws ActionExecutorException { if (actionShareLibNames != null) { - String user = conf.get("user.name"); - FileSystem fs; try { - - Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); - if (systemLibPath.toUri().getScheme() != null && systemLibPath.toUri().getAuthority() != null) { - fs = Services.get().get(HadoopAccessorService.class) - .createFileSystem(user, systemLibPath.toUri(), conf); - } - else { - fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); - } - for (String actionShareLibName : actionShareLibNames) { - - if (systemLibPath != null) { - ShareLibService shareLibService = Services.get().get(ShareLibService.class); + ShareLibService shareLibService = Services.get().get(ShareLibService.class); + FileSystem fs = shareLibService.getFileSystem(); + if (fs != null) { + for (String actionShareLibName : actionShareLibNames) { List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName); if (listOfPaths != null && !listOfPaths.isEmpty()) { @@ -532,10 +526,6 @@ public class JavaActionExecutor extends ActionExecutor { } } } - catch (HadoopAccessorException ex) { - throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode() - .toString(), ex.getMessage()); - } catch (IOException ex) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", ex.getMessage()); @@ -661,7 +651,7 @@ public class JavaActionExecutor extends ActionExecutor { // Action sharelibs are only added if user has specified to use system libpath if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { // add action specific sharelibs - addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf)); + addShareLib(conf, getShareLibNames(context, actionXml, conf)); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index bb68b0e..e2db6a6 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -34,6 +34,7 @@ import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; +import org.apache.oozie.util.JobUtils; import java.io.File; import java.io.FileInputStream; @@ -506,13 +507,9 @@ public class HadoopAccessorService implements Service { try { UserGroupInformation ugi = getUGI(user); ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override public Void run() throws Exception { - Configuration defaultConf = new Configuration(); - XConfiguration.copy(conf, defaultConf); - //Doing this NOP add first to have the FS created and cached - DistributedCache.addFileToClassPath(file, defaultConf); - - DistributedCache.addFileToClassPath(file, conf); + JobUtils.addFileToClassPath(file, conf, null); return null; } }); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index 320af8b..3ef5e07 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -668,4 +668,15 @@ public class ShareLibService implements Service, Instrumentable { } }); } + + /** + * Returns file system for shared libraries. + * <p/> + * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed + * + * @return file system for shared libraries + */ + public FileSystem getFileSystem() { + return fs; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/main/java/org/apache/oozie/util/JobUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/JobUtils.java b/core/src/main/java/org/apache/oozie/util/JobUtils.java index 135b096..485db17 100644 --- a/core/src/main/java/org/apache/oozie/util/JobUtils.java +++ b/core/src/main/java/org/apache/oozie/util/JobUtils.java @@ -140,15 +140,18 @@ public class JobUtils { * TODO: Remove the workaround when we drop the support for hadoop 0.20. * @param file Path of the file to be added * @param conf Configuration that contains the classpath setting - * @param fs FileSystem with respect to which path should be interpreted + * @param fs FileSystem with respect to which path should be interpreted (may be null) * @throws IOException */ public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException { Configuration defaultConf = new Configuration(); XConfiguration.copy(conf, defaultConf); - DistributedCache.addFileToClassPath(file, defaultConf, fs); + if (fs == null) { + // it fails with conf, therefore we pass defaultConf instead + fs = file.getFileSystem(defaultConf); + } // Hadoop 0.20/1.x. - if (defaultConf.get("mapred.job.classpath.files") != null) { + if (defaultConf.get("yarn.resourcemanager.address") == null) { // Duplicate hadoop 1.x code to workaround MAPREDUCE-2361 in Hadoop 0.20 // Refer OOZIE-1806. String filepath = file.toUri().getPath(); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java index bc2c1b6..3e778bc 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java @@ -61,6 +61,7 @@ public abstract class ActionExecutorTestCase extends XFsTestCase { @Override protected void setUp() throws Exception { + beforeSetUp(); super.setUp(); setSystemProps(); new Services().init(); @@ -69,6 +70,9 @@ public abstract class ActionExecutorTestCase extends XFsTestCase { protected void setSystemProps() throws Exception { } + protected void beforeSetUp() throws Exception { + } + @Override protected void tearDown() throws Exception { if (Services.get() != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 390ad3f..72a137c 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -71,13 +71,19 @@ import org.junit.Test; public class TestJavaActionExecutor extends ActionExecutorTestCase { @Override + protected void beforeSetUp() throws Exception { + super.beforeSetUp(); + setSystemProperty("oozie.test.hadoop.minicluster2", "true"); + } + + @Override protected void setSystemProps() throws Exception { super.setSystemProps(); setSystemProperty("oozie.service.ActionService.executor.classes", JavaActionExecutor.class.getName()); setSystemProperty("oozie.service.HadoopAccessorService.action.configurations", "*=hadoop-conf," + getJobTrackerUri() + "=action-conf"); - setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, getFsTestCaseDir() + "/systemlib"); + setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, getFsTestCaseDir().toUri().getPath() + "/systemlib"); new File(getTestCaseConfDir(), "action-conf").mkdir(); InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("test-action-config.xml"); OutputStream os = new FileOutputStream(new File(getTestCaseConfDir() + "/action-conf", "java.xml")); @@ -1358,7 +1364,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { } }; String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" - + getNameNodeUri() + "</name-node>" + "<main-class>" + LauncherMainTester.class.getName() + + getNameNode2Uri() + "</name-node>" + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + "</java>"; Element eActionXml = XmlUtils.parseXml(actionXml); Context context = createContext(actionXml, null); @@ -1375,35 +1381,19 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { getFileSystem().mkdirs(javaShareLibPath); Services.get().setService(ShareLibService.class); - Path appPath = getAppPath(); JobConf conf = ae.createBaseHadoopConf(context, eActionXml); - // The next line should not throw an Exception because it will get the scheme and authority from the appPath, and not the - // sharelib path because it doesn't have a scheme or authority - ae.addShareLib(appPath, conf, new String[]{"java-action-executor"}); - - appPath = new Path("foo://bar:1234/blah"); - conf = ae.createBaseHadoopConf(context, eActionXml); - // The next line should throw an Exception because it will get the scheme and authority from the appPath, which is obviously - // invalid, and not the sharelib path because it doesn't have a scheme or authority - try { - ae.addShareLib(appPath, conf, new String[]{"java-action-executor"}); - fail(); - } - catch (ActionExecutorException aee) { - assertEquals("E0902", aee.getErrorCode()); - assertTrue(aee.getMessage().contains("[No FileSystem for scheme: foo]")); - } + // Despite systemLibPath is not fully qualified and the action refers to the + // second namenode the next line won't throw exception because default fs is used + ae.addShareLib(conf, new String[]{"java-action-executor"}); // Set sharelib to a full path (i.e. include scheme and authority) Services.get().destroy(); setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, getNameNodeUri() + "/user/" + getTestUser() + "/share/"); new Services().init(); Services.get().setService(ShareLibService.class); - appPath = new Path("foo://bar:1234/blah"); conf = ae.createBaseHadoopConf(context, eActionXml); - // The next line should not throw an Exception because it will get the scheme and authority from the sharelib path (and not - // from the obviously invalid appPath) - ae.addShareLib(appPath, conf, new String[]{"java-action-executor"}); + // The next line should not throw an Exception because it will get the scheme and authority from the sharelib path + ae.addShareLib(conf, new String[]{"java-action-executor"}); } public void testFilesystemScheme() throws Exception { @@ -1935,8 +1925,10 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf.clear(); conf.set(WorkflowAppService.HADOOP_USER, getTestUser()); ae.addToCache(conf, appPath, appJarFullPath.toString(), false); - // assert that mapred.cache.files contains jar URI path - Path jarPath = new Path(appJarFullPath.toUri().getPath()); + // assert that mapred.cache.files contains jar URI path (full on Hadoop-2) + Path jarPath = createJobConf().get("yarn.resourcemanager.address") == null ? + new Path(appJarFullPath.toUri().getPath()) : + new Path(appJarFullPath.toUri()); assertTrue(conf.get("mapred.cache.files").contains(jarPath.toString())); // assert that dist cache classpath contains jar URI path Path[] paths = DistributedCache.getFileClassPaths(conf); @@ -2025,4 +2017,152 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertTrue(conf.get("mapred.cache.files").contains(appUri.getPath() + "/lib/a.jar#a.jar")); assertTrue(DistributedCache.getSymlink(conf)); } + + public void testJobXmlAndNonDefaultNamenode() throws Exception { + // By default the job.xml file is taken from the workflow application + // namenode, regadless the namenode specified for the action. To specify + // a job.xml on another namenode use a fully qualified file path. + + Path appPath = new Path(getFsTestCaseDir(), "app"); + getFileSystem().mkdirs(appPath); + + Path jobXmlAbsolutePath = new Path(getFsTestCaseDir().toUri().getPath(), "jobxmlpath/job.xml"); + assertTrue(jobXmlAbsolutePath.isAbsolute() && jobXmlAbsolutePath.toUri().getAuthority() == null); + Path jobXmlAbsolutePath2 = new Path(getFsTestCaseDir().toUri().getPath(), "jobxmlpath/job3.xml"); + assertTrue(jobXmlAbsolutePath2.isAbsolute() && jobXmlAbsolutePath2.toUri().getAuthority() == null); + Path jobXmlQualifiedPath = new Path(getFs2TestCaseDir(), "jobxmlpath/job4.xml"); + assertTrue(jobXmlQualifiedPath.toUri().getAuthority() != null); + + // Use non-default name node (second filesystem) and three job-xml configurations: + // 1. Absolute (but not fully qualified) path located in the first filesystem + // 2. Without path (fist filesystem) + // 3. Absolute (but not fully qualified) path located in the both filesystems + // (first should be used) + // 4. Fully qualified path located in the second filesystem + String str = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNode2Uri() + "</name-node>" + + "<job-xml>" + jobXmlAbsolutePath.toString() + "</job-xml>" + + "<job-xml>job2.xml</job-xml>" + + "<job-xml>" + jobXmlAbsolutePath2.toString() + "</job-xml>" + + "<job-xml>" + jobXmlQualifiedPath.toString() + "</job-xml>" + + "<configuration>" + + "<property><name>p1</name><value>v1a</value></property>" + + "<property><name>p2</name><value>v2</value></property>" + + "</configuration>" + + "</java>"; + Element xml = XmlUtils.parseXml(str); + + XConfiguration jConf = new XConfiguration(); + jConf.set("p1", "v1b"); + jConf.set("p3", "v3a"); + OutputStream os = getFileSystem().create(jobXmlAbsolutePath); + jConf.writeXml(os); + os.close(); + + jConf = new XConfiguration(); + jConf.set("p4", "v4"); + jConf.set("p3", "v3b"); + os = getFileSystem().create(new Path(appPath, "job2.xml")); + jConf.writeXml(os); + os.close(); + + // This configuration is expected to be used + jConf = new XConfiguration(); + jConf.set("p5", "v5a"); + jConf.set("p6", "v6a"); + os = getFileSystem().create(jobXmlAbsolutePath2); + jConf.writeXml(os); + os.close(); + + // This configuration is expected to be ignored + jConf = new XConfiguration(); + jConf.set("p5", "v5b"); + jConf.set("p6", "v6b"); + os = getFileSystem2().create(new Path(jobXmlAbsolutePath2.toUri().getPath())); + jConf.writeXml(os); + os.close(); + + jConf = new XConfiguration(); + jConf.set("p7", "v7a"); + jConf.set("p8", "v8a"); + os = getFileSystem2().create(jobXmlQualifiedPath); + jConf.writeXml(os); + os.close(); + + Context context = createContext("<java/>", null); + Configuration conf = new JavaActionExecutor().createBaseHadoopConf(context, xml); + int confSize0 = conf.size(); + JavaActionExecutor.parseJobXmlAndConfiguration(context, xml, appPath, conf); + assertEquals(confSize0 + 8, conf.size()); + assertEquals("v1a", conf.get("p1")); + assertEquals("v2", conf.get("p2")); + assertEquals("v3b", conf.get("p3")); + assertEquals("v4", conf.get("p4")); + assertEquals("v5a", conf.get("p5")); + assertEquals("v6a", conf.get("p6")); + assertEquals("v7a", conf.get("p7")); + assertEquals("v8a", conf.get("p8")); + } + + public void testActionShareLibWithNonDefaultNamenode() throws Exception { + + WorkflowAppService wps = Services.get().get(WorkflowAppService.class); + + Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARED_LIB_PREFIX + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()).toString()); + + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "sourcejar.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + Path javaShareLibPath = new Path(systemLibPath, "java"); + getFileSystem().mkdirs(javaShareLibPath); + Path jar1Path = new Path(javaShareLibPath, "jar1.jar"); + OutputStream os1 = getFileSystem().create(jar1Path); + IOUtils.copyStream(is, os1); + Path jar2Path = new Path(javaShareLibPath, "jar2.jar"); + OutputStream os2 = getFileSystem().create(jar2Path); + is = new FileInputStream(jarFile); // is not resetable + IOUtils.copyStream(is, os2); + Path launcherPath = new Path(systemLibPath, "oozie"); + getFileSystem().mkdirs(launcherPath); + Path jar3Path = new Path(launcherPath, "jar3.jar"); + OutputStream os3 = getFileSystem().create(jar3Path); + is = new FileInputStream(jarFile); + IOUtils.copyStream(is, os3); + + String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNode2Uri() + "</name-node>" + + "<job-xml>job.xml</job-xml>" + + "<main-class>"+ LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + + XConfiguration jConf = new XConfiguration(); + jConf.set("p", "v"); + OutputStream os = getFileSystem().create(new Path(getAppPath(), "job.xml")); + jConf.writeXml(os); + os.close(); + + Context context = createContext(actionXml, null); + + Services.get().setService(ShareLibService.class); + + // Test oozie server action sharelib setting + WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); + XConfiguration wfConf = new XConfiguration(); + wfConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + wfConf.set(OozieClient.APP_PATH, new Path(getAppPath(), "workflow.xml").toString()); + wfConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true); + workflow.setConf(XmlUtils.prettyPrint(wfConf).toString()); + + Services.get().getConf().set("oozie.action.sharelib.for.java", "java"); + + final RunningJob runningJob = submitAction(context); + waitFor(60 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return runningJob.isComplete(); + } + }); + assertTrue(runningJob.isSuccessful()); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/test/XFsTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java index 18cb742..ac1e2df 100644 --- a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java @@ -47,7 +47,9 @@ import java.net.URI; public abstract class XFsTestCase extends XTestCase { private static HadoopAccessorService has; private FileSystem fileSystem; + private FileSystem fileSystem2; private Path fsTestDir; + private Path fsTestDir2; /** * Set up the testcase. @@ -72,18 +74,27 @@ public abstract class XFsTestCase extends XTestCase { JobConf jobConf = has.createJobConf(getNameNodeUri()); XConfiguration.copy(conf, jobConf); fileSystem = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), jobConf); - Path path = new Path(fileSystem.getWorkingDirectory(), java.util.UUID.randomUUID().toString()); - fsTestDir = fileSystem.makeQualified(path); - System.out.println(XLog.format("Setting FS testcase work dir[{0}]", fsTestDir)); - if (fileSystem.exists(fsTestDir)) { - setAllPermissions(fileSystem, fsTestDir); + fsTestDir = initFileSystem(fileSystem); + if (System.getProperty("oozie.test.hadoop.minicluster2", "false").equals("true")) { + fileSystem2 = has.createFileSystem(getTestUser(), new URI(getNameNode2Uri()), jobConf); + fsTestDir2 = initFileSystem(fileSystem2); } - fileSystem.delete(fsTestDir, true); - if (!fileSystem.mkdirs(path)) { - throw new IOException(XLog.format("Could not create FS testcase dir [{0}]", fsTestDir)); + } + + private Path initFileSystem(FileSystem fs) throws Exception { + Path path = new Path(fs.getWorkingDirectory(), java.util.UUID.randomUUID().toString()); + Path testDirInFs = fs.makeQualified(path); + System.out.println(XLog.format("Setting FS testcase work dir[{0}]", testDirInFs)); + if (fs.exists(testDirInFs)) { + setAllPermissions(fs, testDirInFs); } - fileSystem.setOwner(fsTestDir, getTestUser(), getTestGroup()); - fileSystem.setPermission(fsTestDir, FsPermission.valueOf("-rwxrwx--x")); + fs.delete(testDirInFs, true); + if (!fs.mkdirs(path)) { + throw new IOException(XLog.format("Could not create FS testcase dir [{0}]", testDirInFs)); + } + fs.setOwner(testDirInFs, getTestUser(), getTestGroup()); + fs.setPermission(testDirInFs, FsPermission.valueOf("-rwxrwx--x")); + return testDirInFs; } private void setAllPermissions(FileSystem fileSystem, Path path) throws IOException { @@ -112,15 +123,24 @@ public abstract class XFsTestCase extends XTestCase { } /** - * Return the file system used by the tescase. + * Return the file system used by the test case. * - * @return the file system used by the tescase. + * @return the file system used by the test case. */ protected FileSystem getFileSystem() { return fileSystem; } /** + * Return the file system of the second cluster. + * + * @return the second file system used by the test case. + */ + protected FileSystem getFileSystem2() { + return fileSystem2; + } + + /** * Return the FS test working directory. The directory name is the full class name of the test plus the test method * name. * @@ -131,6 +151,16 @@ public abstract class XFsTestCase extends XTestCase { } /** + * Return the FS test working directory of the second cluster. The directory name is + * the full class name of the test plus the test method name. + * + * @return the second FS test working directory path, it is always an full and absolute path. + */ + protected Path getFs2TestCaseDir() { + return fsTestDir2; + } + + /** * Return a JobClient to the test JobTracker. * * @return a JobClient to the test JobTracker. http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 6bf0a8f..e739ec3 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import javax.persistence.EntityManager; import javax.persistence.Query; import junit.framework.TestCase; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -195,6 +197,12 @@ public abstract class XTestCase extends TestCase { public static final String OOZIE_TEST_NAME_NODE = "oozie.test.name.node"; /** + * System property to specify the second Hadoop Name Node to use for testing. </p> If this property is not set, the assumed + * value is 'locahost:9100'. + */ + public static final String OOZIE_TEST_NAME_NODE2 = "oozie.test.name.node2"; + + /** * System property to specify the Hadoop Version to use for testing. </p> If this property is not set, the assumed * value is "0.20.0" */ @@ -374,6 +382,10 @@ public abstract class XTestCase extends TestCase { } if (System.getProperty("oozie.test.hadoop.minicluster", "true").equals("true")) { setUpEmbeddedHadoop(getTestCaseDir()); + // Second cluster is not necessary without the first one + if (System.getProperty("oozie.test.hadoop.minicluster2", "false").equals("true")) { + setUpEmbeddedHadoop2(); + } } if (System.getProperty("oozie.test.db.host") == null) { @@ -735,6 +747,16 @@ public abstract class XTestCase extends TestCase { return System.getProperty(OOZIE_TEST_NAME_NODE, "hdfs://localhost:9000"); } + /** + * Return the second Hadoop Name Node to use for testing. </p> The value is taken from the Java sytem property {@link + * #OOZIE_TEST_NAME_NODE2}, if this property is not set, the assumed value is 'locahost:9100'. + * + * @return the second name node URI. + */ + protected String getNameNode2Uri() { + return System.getProperty(OOZIE_TEST_NAME_NODE2, "hdfs://localhost:9100"); + } + public String getKeytabFile() { String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath(); return System.getProperty("oozie.test.kerberos.keytab.file", defaultFile); @@ -866,6 +888,7 @@ public abstract class XTestCase extends TestCase { } private static MiniDFSCluster dfsCluster = null; + private static MiniDFSCluster dfsCluster2 = null; private static MiniMRCluster mrCluster = null; private static MiniHCatServer hcatServer = null; @@ -877,37 +900,12 @@ public abstract class XTestCase extends TestCase { int taskTrackers = 2; int dataNodes = 2; String oozieUser = getOozieUser(); - JobConf conf = new JobConf(); - conf.set("dfs.block.access.token.enable", "false"); - conf.set("dfs.permissions", "true"); - conf.set("hadoop.security.authentication", "simple"); - - //Doing this because Hadoop 1.x does not support '*' and - //Hadoop 0.23.x does not process wildcard if the value is - // '*,127.0.0.1' - StringBuilder sb = new StringBuilder(); - sb.append("127.0.0.1,localhost"); - for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { - sb.append(",").append(i.getCanonicalHostName()); - } - conf.set("hadoop.proxyuser." + oozieUser + ".hosts", sb.toString()); - - conf.set("hadoop.proxyuser." + oozieUser + ".groups", getTestGroup()); - conf.set("mapred.tasktracker.map.tasks.maximum", "4"); - conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); - + JobConf conf = createDFSConfig(); String[] userGroups = new String[] { getTestGroup(), getTestGroup2() }; UserGroupInformation.createUserForTesting(oozieUser, userGroups); UserGroupInformation.createUserForTesting(getTestUser(), userGroups); UserGroupInformation.createUserForTesting(getTestUser2(), userGroups); UserGroupInformation.createUserForTesting(getTestUser3(), new String[] { "users" } ); - conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster"); - - // Scheduler properties required for YARN CapacityScheduler to work - conf.set("yarn.scheduler.capacity.root.queues", "default"); - conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - // Required to prevent deadlocks with YARN CapacityScheduler - conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); try { dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); @@ -945,6 +943,64 @@ public abstract class XTestCase extends TestCase { } } + private void setUpEmbeddedHadoop2() throws Exception { + if (dfsCluster != null && dfsCluster2 == null) { + // Trick dfs location for MiniDFSCluster since it doesn't accept location as input) + String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data"); + try { + System.setProperty("test.build.data", FilenameUtils.concat(testBuildDataSaved, "2")); + // Only DFS cluster is created based upon current need + dfsCluster2 = new MiniDFSCluster(createDFSConfig(), 2, true, null); + FileSystem fileSystem = dfsCluster2.getFileSystem(); + fileSystem.mkdirs(new Path("target/test-data")); + fileSystem.mkdirs(new Path("/user")); + fileSystem.mkdirs(new Path("/tmp")); + fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); + System.setProperty(OOZIE_TEST_NAME_NODE2, fileSystem.getConf().get("fs.default.name")); + } + catch (Exception ex) { + shutdownMiniCluster2(); + throw ex; + } + finally { + // Restore previus value + System.setProperty("test.build.data", testBuildDataSaved); + } + } + } + + private JobConf createDFSConfig() throws UnknownHostException { + JobConf conf = new JobConf(); + conf.set("dfs.block.access.token.enable", "false"); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "simple"); + + //Doing this because Hadoop 1.x does not support '*' and + //Hadoop 0.23.x does not process wildcard if the value is + // '*,127.0.0.1' + StringBuilder sb = new StringBuilder(); + sb.append("127.0.0.1,localhost"); + for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { + sb.append(",").append(i.getCanonicalHostName()); + } + conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString()); + + conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup()); + conf.set("mapred.tasktracker.map.tasks.maximum", "4"); + conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); + + conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster"); + + // Scheduler properties required for YARN CapacityScheduler to work + conf.set("yarn.scheduler.capacity.root.queues", "default"); + conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + // Required to prevent deadlocks with YARN CapacityScheduler + conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); + return conf; + } + private void setupHCatalogServer() throws Exception { if (hcatServer == null) { hcatServer = new MiniHCatServer(RUNMODE.SERVER, createJobConf()); @@ -972,6 +1028,16 @@ public abstract class XTestCase extends TestCase { } } + private static void shutdownMiniCluster2() { + try { + if (dfsCluster2 != null) { + dfsCluster2.shutdown(); + } + } + catch (Exception ex) { + System.out.println(ex); + } + } private static final AtomicLong LAST_TESTCASE_FINISHED = new AtomicLong(); private static final AtomicInteger RUNNING_TESTCASES = new AtomicInteger(); @@ -998,6 +1064,7 @@ public abstract class XTestCase extends TestCase { } } shutdownMiniCluster(); + shutdownMiniCluster2(); } } @@ -1019,7 +1086,7 @@ public abstract class XTestCase extends TestCase { * Returns a jobconf preconfigured to talk with the test cluster/minicluster. * @return a jobconf preconfigured to talk with the test cluster/minicluster. */ - protected JobConf createJobConf() { + protected JobConf createJobConf() throws IOException { JobConf jobConf; if (mrCluster != null) { jobConf = createJobConfFromMRCluster(); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/docs/src/site/twiki/WorkflowFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki index f7590d0..f0eb393 100644 --- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki +++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki @@ -14,6 +14,10 @@ Map/Reduce and Pig jobs. %TOC% ---++ Changelog + +---+++!! 2014MAY08 + + * #3.2.2.4 Added support for fully qualified job-xml path ---+++!! 2013JUL03 * #Appendix A, Added new workflow schema 0.5 and SLA schema 0.2 @@ -687,6 +691,8 @@ In case of a hcatalog URI, the hive-site.xml needs to be shipped using =file= ta need to be placed in workflow lib directory or specified using =archive= tag. The =job-xml= element, if present, must refer to a Hadoop JobConf =job.xml= file bundled in the workflow application. +By default the =job.xml= file is taken from the workflow application namenode, regardless the namenode specified for the action. +To specify a =job.xml= on another namenode use a fully qualified file path. The =job-xml= element is optional and as of schema 0.4, multiple =job-xml= elements are allowed in order to specify multiple Hadoop JobConf =job.xml= files. The =configuration= element, if present, contains JobConf properties for the Hadoop job. http://git-wip-us.apache.org/repos/asf/oozie/blob/d5f1e386/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ba350e1..e8fa1a8 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1685 Oozie doesnât process correctly workflows with a non-default name node (benjzh via rohini) OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang) OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter) OOZIE-1659 oozie-site is missing email-action-0.2 schema (jagatsingh via rkanter)
