Repository: oozie Updated Branches: refs/heads/master c2fe52b83 -> 0f086d41b
OOZIE-2030 Configuration properties from global section is not getting set in Hadoop job conf when using sub-workflow action in Oozie workflow.xml (jaydeepvishwakarma via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0f086d41 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0f086d41 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0f086d41 Branch: refs/heads/master Commit: 0f086d41b8f274abe0959705f76fa3225ab5ff2e Parents: c2fe52b Author: Rohini Palaniswamy <[email protected]> Authored: Sat Jan 9 16:02:12 2016 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Sat Jan 9 16:02:12 2016 -0800 ---------------------------------------------------------------------- .../action/oozie/SubWorkflowActionExecutor.java | 4 + .../workflow/lite/LiteWorkflowAppParser.java | 135 +++++++++++++++++-- .../oozie/TestSubWorkflowActionExecutor.java | 113 +++++++++++++++- release-log.txt | 1 + 4 files changed, 238 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0f086d41/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java index 33efc60..6bf3598 100644 --- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java @@ -306,4 +306,8 @@ public class SubWorkflowActionExecutor extends ActionExecutor { public boolean isCompleted(String externalStatus) { return FINAL_STATUS.contains(externalStatus); } + + public boolean supportsConfigurationJobXML() { + return true; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f086d41/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index d3a6523..03c84f1 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -18,8 +18,10 @@ package org.apache.oozie.workflow.lite; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Writable; +import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.util.ELUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -27,7 +29,9 @@ import org.apache.oozie.util.XmlUtils; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.ParameterVerifier; import org.apache.oozie.util.ParameterVerifierException; +import org.apache.oozie.util.WritableUtils; import org.apache.oozie.ErrorCode; +import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.service.Services; import org.apache.oozie.service.ActionService; @@ -46,6 +50,12 @@ import java.io.IOException; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; @@ -54,6 +64,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.zip.*; /** * Class to parse and validate workflow xml @@ -95,6 +106,7 @@ public class LiteWorkflowAppParser { public static final String DEFAULT_NAME_NODE = "oozie.actions.default.name-node"; public static final String DEFAULT_JOB_TRACKER = "oozie.actions.default.job-tracker"; + public static final String OOZIE_GLOBAL = "oozie.wf.globalconf"; private static final String JOB_TRACKER = "job-tracker"; private static final String NAME_NODE = "name-node"; @@ -417,7 +429,9 @@ public class LiteWorkflowAppParser { throws WorkflowException { Namespace ns = root.getNamespace(); LiteWorkflowApp def = null; - GlobalSectionData gData = null; + GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ? + null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); + boolean serializedGlobalConf = false; for (Element eNode : (List<Element>) root.getChildren()) { if (eNode.getName().equals(START_E)) { def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, @@ -457,6 +471,11 @@ public class LiteWorkflowAppParser { } else if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { continue; } else { + if (!serializedGlobalConf && elem.getName().equals(SubWorkflowActionExecutor.ACTION_TYPE) && + elem.getChild(("propagate-configuration"), ns) != null) { + serializedGlobalConf = true; + jobConf.set(OOZIE_GLOBAL, getGlobalString(gData)); + } eActionConf = elem; handleDefaultsAndGlobal(gData, configDefault, elem); } @@ -484,6 +503,10 @@ public class LiteWorkflowAppParser { } else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { // No operation is required } else if (eNode.getName().equals(GLOBAL)) { + if(jobConf.get(OOZIE_GLOBAL) != null) { + gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); + handleDefaultsAndGlobal(gData, null, eNode); + } gData = parseGlobalSection(ns, eNode); } else if (eNode.getName().equals(PARAMETERS)) { // No operation is required @@ -495,6 +518,47 @@ public class LiteWorkflowAppParser { } /** + * Read the GlobalSectionData from Base64 string. + * @param globalStr + * @return GlobalSectionData + * @throws WorkflowException + */ + private GlobalSectionData getGlobalFromString(String globalStr) throws WorkflowException { + GlobalSectionData globalSectionData = new GlobalSectionData(); + try { + byte[] data = Base64.decodeBase64(globalStr); + Inflater inflater = new Inflater(); + DataInputStream ois = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(data), inflater)); + globalSectionData.readFields(ois); + ois.close(); + } catch (Exception ex) { + throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); + } + return globalSectionData; + } + + + /** + * Write the GlobalSectionData to a Base64 string. + * @param globalSectionData + * @return String + * @throws WorkflowException + */ + private String getGlobalString(GlobalSectionData globalSectionData) throws WorkflowException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oos = null; + try { + Deflater def = new Deflater(); + oos = new DataOutputStream(new DeflaterOutputStream(baos, def)); + globalSectionData.write(oos); + oos.close(); + } catch (IOException e) { + throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); + } + return Base64.encodeBase64String(baos.toByteArray()); + } + + /** * Validate workflow xml * * @param app @@ -570,11 +634,14 @@ public class LiteWorkflowAppParser { parent.addContent(child); } - private class GlobalSectionData { - final String jobTracker; - final String nameNode; - final List<String> jobXmls; - final Configuration conf; + private class GlobalSectionData implements Writable { + String jobTracker; + String nameNode; + List<String> jobXmls; + Configuration conf; + + public GlobalSectionData() { + } public GlobalSectionData(String jobTracker, String nameNode, List<String> jobXmls, Configuration conf) { this.jobTracker = jobTracker; @@ -582,6 +649,43 @@ public class LiteWorkflowAppParser { this.jobXmls = jobXmls; this.conf = conf; } + + @Override + public void write(DataOutput dataOutput) throws IOException { + WritableUtils.writeStr(dataOutput, jobTracker); + WritableUtils.writeStr(dataOutput, nameNode); + + if(jobXmls != null && !jobXmls.isEmpty()) { + dataOutput.writeInt(jobXmls.size()); + for (String content : jobXmls) { + WritableUtils.writeStr(dataOutput, content); + } + } else { + dataOutput.writeInt(0); + } + if(conf != null) { + WritableUtils.writeStr(dataOutput, XmlUtils.prettyPrint(conf).toString()); + } else { + WritableUtils.writeStr(dataOutput, null); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + jobTracker = WritableUtils.readStr(dataInput); + nameNode = WritableUtils.readStr(dataInput); + int length = dataInput.readInt(); + if (length > 0) { + jobXmls = new ArrayList<String>(); + for (int i = 0; i < length; i++) { + jobXmls.add(WritableUtils.readStr(dataInput)); + } + } + String confString = WritableUtils.readStr(dataInput); + if(confString != null) { + conf = new XConfiguration(new StringReader(confString)); + } + } } private GlobalSectionData parseGlobalSection(Namespace ns, Element global) throws WorkflowException { @@ -625,20 +729,23 @@ public class LiteWorkflowAppParser { private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement) throws WorkflowException { + ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName()); - if (ae == null) { + if (ae == null && !GLOBAL.equals(actionElement.getName())) { throw new WorkflowException(ErrorCode.E0723, actionElement.getName(), ActionService.class.getName()); } Namespace actionNs = actionElement.getNamespace(); - if (ae.requiresNameNodeJobTracker()) { + if (SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || + GLOBAL.equals(actionElement.getName()) || ae.requiresNameNodeJobTracker()) { if (actionElement.getChild(NAME_NODE, actionNs) == null) { if (gData != null && gData.nameNode != null) { addChildElement(actionElement, actionNs, NAME_NODE, gData.nameNode); } else if (defaultNameNode != null) { addChildElement(actionElement, actionNs, NAME_NODE, defaultNameNode); - } else { + } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || + GLOBAL.equals(actionElement.getName()))) { throw new WorkflowException(ErrorCode.E0701, "No " + NAME_NODE + " defined"); } } @@ -647,13 +754,14 @@ public class LiteWorkflowAppParser { addChildElement(actionElement, actionNs, JOB_TRACKER, gData.jobTracker); } else if (defaultJobTracker != null) { addChildElement(actionElement, actionNs, JOB_TRACKER, defaultJobTracker); - } else { + } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || + GLOBAL.equals(actionElement.getName()))) { throw new WorkflowException(ErrorCode.E0701, "No " + JOB_TRACKER + " defined"); } } } - if (ae.supportsConfigurationJobXML()) { + if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { @SuppressWarnings("unchecked") List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs); if (gData != null && gData.jobXmls != null) { @@ -706,5 +814,4 @@ public class LiteWorkflowAppParser { } } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/0f086d41/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java index 9ab897a..26e5031 100644 --- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.ActionExecutorTestCase; +import org.apache.oozie.action.hadoop.LauncherMainTester; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; @@ -33,6 +34,8 @@ import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.XLogService; import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; import java.io.*; import java.net.URI; @@ -544,7 +547,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } public String getLazyWorkflow() { - return "<workflow-app xmlns='uri:oozie:workflow:0.3' name='app'>" + + return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" + "<start to='java' />" + " <action name='java'>" + "<java>" + @@ -634,4 +637,112 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } } + + public void testParentGlobalConf() throws Exception { + try { + Path subWorkflowAppPath = getFsTestCaseDir(); + FileSystem fs = getFileSystem(); + Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); + Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath)); + writer.write(getWorkflow()); + writer.close(); + + String workflowUri = getTestCaseFileUri("workflow.xml"); + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + + "<global>" + + " <configuration>" + + " <property>" + + " <name>foo2</name>" + + " <value>foo2</value>" + + " </property>" + + " <property>" + + " <name>foo3</name>" + + " <value>foo3</value>" + + " </property>" + + " </configuration>" + + "</global>" + + "<start to=\"subwf\"/>" + + "<action name=\"subwf\">" + + " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" + + " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + + "<propagate-configuration/>" + + " <configuration>" + + " <property>" + + " <name>foo3</name>" + + " <value>actionconf</value>" + + " </property>" + + " </configuration>" + + " </sub-workflow>" + + " <ok to=\"end\"/>" + + " <error to=\"fail\"/>" + + "</action>" + + "<kill name=\"fail\">" + + " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + LocalOozie.start(); + final OozieClient wfClient = LocalOozie.getClient(); + Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, workflowUri); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("appName", "var-app-name"); + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + + waitFor(JOB_TIMEOUT, new Predicate() { + public boolean evaluate() throws Exception { + return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.SUCCEEDED) && + (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.OK); + } + }); + WorkflowJob subWorkflow = wfClient.getJobInfo(wfClient.getJobInfo(jobId). + getActions().get(1).getExternalId()); + Configuration subWorkflowConf = new XConfiguration(new StringReader(subWorkflow.getConf())); + Element eConf = XmlUtils.parseXml(subWorkflow.getActions().get(1).getConf()); + Element element = eConf.getChild("configuration", eConf.getNamespace()); + Configuration actionConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(element).toString())); + assertEquals(actionConf.get("foo1"), "foo1"); + assertEquals(actionConf.get("foo2"), "subconf"); + assertEquals(actionConf.get("foo3"), "foo3"); + // Checking the action conf configuration. + assertEquals(subWorkflowConf.get("foo3"), "actionconf"); + } finally { + LocalOozie.stop(); + } + } + + public String getWorkflow() { + return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" + + "<global>" + + " <configuration>" + + " <property>" + + " <name>foo1</name>" + + " <value>foo1</value>" + + " </property>" + + " <property>" + + " <name>foo2</name>" + + " <value>subconf</value>" + + " </property>" + + " </configuration>" + + "</global>" + + "<start to='java' />" + + "<action name='java'>" + + "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "<arg>exit0</arg>" + + "</java>" + + "<ok to='end' />" + + "<error to='fail' />" + + "</action>" + + "<kill name='fail'>" + + "<message>shell action fail, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name='end' />" + + "</workflow-app>"; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f086d41/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 275db88..bf8d35f 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2030 Configuration properties from global section is not getting set in Hadoop job conf when using sub-workflow action in Oozie workflow.xml (jaydeepvishwakarma via rohini) OOZIE-2380 Oozie Hive action failed with wrong tmp path (vaifer via rkanter) OOZIE-2222 Oozie UI parent job should be clickable (puru) OOZIE-2407 AbandonedService should not send mail if there is no abandoned coord (puru)
