Author: rkanter
Date: Mon Jan 28 18:10:56 2013
New Revision: 1439545
URL: http://svn.apache.org/viewvc?rev=1439545&view=rev
Log:
OOZIE-87 GH-47: Feature to supply a comma separated list of jars in an 'archive
tag' of workflow (jaoki via rkanter)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/trunk/release-log.txt
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1439545&r1=1439544&r2=1439545&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Mon Jan 28 18:10:56 2013
@@ -478,14 +478,14 @@ public class JavaActionExecutor extends
// files and archives defined in the action
for (Element eProp : (List<Element>) actionXml.getChildren()) {
if (eProp.getName().equals("file")) {
- String[] pathes = eProp.getTextTrim().split(",");
- for (String path : pathes) {
+ String[] filePaths = eProp.getTextTrim().split(",");
+ for (String path : filePaths) {
addToCache(conf, appPath, path.trim(), false);
}
}
else if (eProp.getName().equals("archive")) {
- String[] pathes = eProp.getTextTrim().split(",");
- for (String path : pathes){
+ String[] archivePaths = eProp.getTextTrim().split(",");
+ for (String path : archivePaths){
addToCache(conf, appPath, path.trim(), true);
}
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java?rev=1439545&r1=1439544&r2=1439545&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
Mon Jan 28 18:10:56 2013
@@ -18,6 +18,7 @@
package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
@@ -45,6 +46,7 @@ import java.io.FileOutputStream;
import java.io.Writer;
import java.io.OutputStreamWriter;
import java.io.StringReader;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
@@ -893,4 +895,115 @@ public class TestMapReduceActionExecutor
e.addContent(new Element("streaming"));
assertEquals("mapreduce-streaming", ae.getDefaultShareLibName(e));
}
+
+ /**
+ * https://issues.apache.org/jira/browse/OOZIE-87
+ * This test covers map-reduce action
+ * @throws Exception
+ */
+ public void testCommaSeparatedFilesAndArchives() throws Exception {
+ Path root = new Path(getFsTestCaseDir(), "root");
+
+ Path jar = new Path("jar.jar");
+ getFileSystem().create(new Path(getAppPath(), jar)).close();
+ Path rootJar = new Path(root, "rootJar.jar");
+ getFileSystem().create(rootJar).close();
+
+ Path file = new Path("file");
+ getFileSystem().create(new Path(getAppPath(), file)).close();
+ Path rootFile = new Path(root, "rootFile");
+ getFileSystem().create(rootFile).close();
+
+ Path so = new Path("soFile.so");
+ getFileSystem().create(new Path(getAppPath(), so)).close();
+ Path rootSo = new Path(root, "rootSoFile.so");
+ getFileSystem().create(rootSo).close();
+
+ Path so1 = new Path("soFile.so.1");
+ getFileSystem().create(new Path(getAppPath(), so1)).close();
+ Path rootSo1 = new Path(root, "rootSoFile.so.1");
+ getFileSystem().create(rootSo1).close();
+
+ Path archive = new Path("archive.tar");
+ getFileSystem().create(new Path(getAppPath(), archive)).close();
+ Path rootArchive = new Path(root, "rootArchive.tar");
+ getFileSystem().create(rootArchive).close();
+
+ String actionXml = "<map-reduce>" +
+ " <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ " <name-node>" + getNameNodeUri() + "</name-node>" +
+ " <main-class>CLASS</main-class>" +
+ " <file>" + jar.toString() +
+ "," + rootJar.toString() +
+ "," + file.toString() +
+ ", " + rootFile.toString() + // with leading and
trailing spaces
+ " ," + so.toString() +
+ "," + rootSo.toString() +
+ "," + so1.toString() +
+ "," + rootSo1.toString() + "</file>\n" +
+ " <archive>" + archive.toString() + ", "
+ + rootArchive.toString() + " </archive>\n" + //
with leading and trailing spaces
+ "</map-reduce>";
+
+ Element eActionXml = XmlUtils.parseXml(actionXml);
+
+ Context context = createContext("map-reduce", actionXml);
+
+ Path appPath = getAppPath();
+
+ MapReduceActionExecutor ae = new MapReduceActionExecutor();
+
+ Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
+ ae.setupActionConf(jobConf, context, eActionXml, appPath);
+ ae.setLibFilesArchives(context, eActionXml, appPath, jobConf);
+
+
+ assertTrue(DistributedCache.getSymlink(jobConf));
+
+ Path[] filesInClasspath = DistributedCache.getFileClassPaths(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar}) {
+ boolean found = false;
+ for (Path c : filesInClasspath) {
+ if (!found && p.toUri().getPath().equals(c.toUri().getPath()))
{
+ found = true;
+ }
+ }
+ assertTrue("file " + p.toUri().getPath() + " not found in
classpath", found);
+ }
+ for (Path p : new Path[]{new Path(getAppPath(), file), rootFile, new
Path(getAppPath(), so), rootSo,
+ new Path(getAppPath(), so1), rootSo1}) {
+ boolean found = false;
+ for (Path c : filesInClasspath) {
+ if (!found && p.toUri().getPath().equals(c.toUri().getPath()))
{
+ found = true;
+ }
+ }
+ assertFalse("file " + p.toUri().getPath() + " found in classpath",
found);
+ }
+
+ URI[] filesInCache = DistributedCache.getCacheFiles(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar, new
Path(getAppPath(), file), rootFile,
+ new Path(getAppPath(), so), rootSo, new
Path(getAppPath(), so1), rootSo1}) {
+ boolean found = false;
+ for (URI c : filesInCache) {
+ if (!found && p.toUri().getPath().equals(c.getPath())) {
+ found = true;
+ }
+ }
+ assertTrue("file " + p.toUri().getPath() + " not found in cache",
found);
+ }
+
+ URI[] archivesInCache = DistributedCache.getCacheArchives(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), archive),
rootArchive}) {
+ boolean found = false;
+ for (URI c : archivesInCache) {
+ if (!found && p.toUri().getPath().equals(c.getPath())) {
+ found = true;
+ }
+ }
+ assertTrue("archive " + p.toUri().getPath() + " not found in
cache", found);
+ }
+ }
+
+
}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1439545&r1=1439544&r2=1439545&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Mon Jan 28 18:10:56 2013
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-87 GH-47: Feature to supply a comma separated list of jars in an
'archive tag' of workflow (jaoki via rkanter)
OOZIE-1188 Typo in documentation for using login server example (rkanter)
OOZIE-1160 Oozie web-console to display all job URLs spawned by Pig (mona)
OOZIE-1177 HostnameFilter should only catch UnknownHostException, not
Exception (rkanter)
Modified:
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java?rev=1439545&r1=1439544&r2=1439545&view=diff
==============================================================================
---
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
(original)
+++
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
Mon Jan 28 18:10:56 2013
@@ -18,6 +18,7 @@
package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
@@ -26,6 +27,7 @@ import org.apache.hadoop.mapred.RunningJ
import org.apache.hadoop.mapred.JobID;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.Services;
@@ -44,6 +46,7 @@ import java.io.FileInputStream;
import java.io.Writer;
import java.io.OutputStreamWriter;
import java.io.StringReader;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -413,4 +416,112 @@ public class TestPigActionExecutor exten
_testSubmit(actionXml, true);
}
+ /**
+ * https://issues.apache.org/jira/browse/OOZIE-87
+ * This test covers pig action
+ * @throws Exception
+ */
+ public void testCommaSeparatedFilesAndArchives() throws Exception {
+ Path root = new Path(getFsTestCaseDir(), "root");
+
+ Path jar = new Path("jar.jar");
+ getFileSystem().create(new Path(getAppPath(), jar)).close();
+ Path rootJar = new Path(root, "rootJar.jar");
+ getFileSystem().create(rootJar).close();
+
+ Path file = new Path("file");
+ getFileSystem().create(new Path(getAppPath(), file)).close();
+ Path rootFile = new Path(root, "rootFile");
+ getFileSystem().create(rootFile).close();
+
+ Path so = new Path("soFile.so");
+ getFileSystem().create(new Path(getAppPath(), so)).close();
+ Path rootSo = new Path(root, "rootSoFile.so");
+ getFileSystem().create(rootSo).close();
+
+ Path so1 = new Path("soFile.so.1");
+ getFileSystem().create(new Path(getAppPath(), so1)).close();
+ Path rootSo1 = new Path(root, "rootSoFile.so.1");
+ getFileSystem().create(rootSo1).close();
+
+ Path archive = new Path("archive.tar");
+ getFileSystem().create(new Path(getAppPath(), archive)).close();
+ Path rootArchive = new Path(root, "rootArchive.tar");
+ getFileSystem().create(rootArchive).close();
+
+ String actionXml = "<pig>" +
+ " <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ " <name-node>" + getNameNodeUri() + "</name-node>" +
+ " <script>id.pig</script>" +
+ " <file>" + jar.toString() +
+ "," + rootJar.toString() +
+ "," + file.toString() +
+ ", " + rootFile.toString() + // with leading and
trailing spaces
+ " ," + so.toString() +
+ "," + rootSo.toString() +
+ "," + so1.toString() +
+ "," + rootSo1.toString() + "</file>\n" +
+ " <archive>" + archive.toString() + ", "
+ + rootArchive.toString() + " </archive>\n" + //
with leading and trailing spaces
+ "</pig>";
+
+ Element eActionXml = XmlUtils.parseXml(actionXml);
+
+ Context context = createContext(actionXml);
+
+ Path appPath = getAppPath();
+
+ PigActionExecutor ae = new PigActionExecutor();
+
+ Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
+ ae.setupActionConf(jobConf, context, eActionXml, appPath);
+ ae.setLibFilesArchives(context, eActionXml, appPath, jobConf);
+
+
+ assertTrue(DistributedCache.getSymlink(jobConf));
+
+ Path[] filesInClasspath = DistributedCache.getFileClassPaths(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar}) {
+ boolean found = false;
+ for (Path c : filesInClasspath) {
+ if (!found && p.toUri().getPath().equals(c.toUri().getPath()))
{
+ found = true;
+ }
+ }
+ assertTrue("file " + p.toUri().getPath() + " not found in
classpath", found);
+ }
+ for (Path p : new Path[]{new Path(getAppPath(), file), rootFile, new
Path(getAppPath(), so), rootSo,
+ new Path(getAppPath(), so1), rootSo1}) {
+ boolean found = false;
+ for (Path c : filesInClasspath) {
+ if (!found && p.toUri().getPath().equals(c.toUri().getPath()))
{
+ found = true;
+ }
+ }
+ assertFalse("file " + p.toUri().getPath() + " found in classpath",
found);
+ }
+
+ URI[] filesInCache = DistributedCache.getCacheFiles(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar, new
Path(getAppPath(), file), rootFile,
+ new Path(getAppPath(), so), rootSo, new
Path(getAppPath(), so1), rootSo1}) {
+ boolean found = false;
+ for (URI c : filesInCache) {
+ if (!found && p.toUri().getPath().equals(c.getPath())) {
+ found = true;
+ }
+ }
+ assertTrue("file " + p.toUri().getPath() + " not found in cache",
found);
+ }
+
+ URI[] archivesInCache = DistributedCache.getCacheArchives(jobConf);
+ for (Path p : new Path[]{new Path(getAppPath(), archive),
rootArchive}) {
+ boolean found = false;
+ for (URI c : archivesInCache) {
+ if (!found && p.toUri().getPath().equals(c.getPath())) {
+ found = true;
+ }
+ }
+ assertTrue("archive " + p.toUri().getPath() + " not found in
cache", found);
+ }
+ }
}