OOZIE-2687 Create XML schema for launcher configurations (asasvari)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d135b88c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d135b88c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d135b88c Branch: refs/heads/master Commit: d135b88ce510178503a3683063783a58be0061f5 Parents: 8aca098 Author: Attila Sasvari <[email protected]> Authored: Mon Sep 18 13:40:38 2017 +0200 Committer: Attila Sasvari <[email protected]> Committed: Mon Sep 18 13:40:53 2017 +0200 ---------------------------------------------------------------------- client/src/main/resources/hive-action-1.0.xsd | 49 ++ client/src/main/resources/hive2-action-1.0.xsd | 48 ++ client/src/main/resources/oozie-common-1.0.xsd | 91 ++++ .../src/main/resources/oozie-workflow-1.0.xsd | 299 +++++++++++ client/src/main/resources/shell-action-1.0.xsd | 48 ++ client/src/main/resources/spark-action-1.0.xsd | 49 ++ client/src/main/resources/sqoop-action-1.0.xsd | 47 ++ .../oozie/action/hadoop/JavaActionExecutor.java | 325 ++++++++---- .../action/hadoop/SqoopActionExecutor.java | 2 +- .../oozie/action/ssh/SshActionExecutor.java | 10 +- .../org/apache/oozie/jms/MessageReceiver.java | 2 +- .../oozie/service/LiteWorkflowStoreService.java | 1 - .../org/apache/oozie/service/SchemaService.java | 9 +- .../org/apache/oozie/util/WritableUtils.java | 7 + .../org/apache/oozie/util/schema/Input.java | 132 +++++ .../oozie/util/schema/ResourceResolver.java | 40 ++ .../workflow/lite/LauncherConfigHandler.java | 67 +++ .../workflow/lite/LiteWorkflowAppParser.java | 30 +- .../oozie/workflow/lite/LiteWorkflowLib.java | 2 - core/src/main/resources/oozie-default.xml | 43 +- .../action/hadoop/ActionExecutorTestCase.java | 39 +- .../oozie/action/hadoop/LauncherMainTester.java | 9 + .../action/hadoop/TestJavaActionExecutor.java | 212 +++++++- .../apache/oozie/service/TestSchemaService.java | 497 ++++++++++++++++--- .../oozie/util/TestMetricsInstrumentation.java | 4 +- .../lite/TestLiteWorkflowAppParser.java | 51 ++ .../wf-schema-global-launcherconf-override.xml | 56 +++ .../resources/wf-schema-global-launcherconf.xml | 45 ++ pom.xml | 2 +- release-log.txt | 1 + .../apache/oozie/action/hadoop/LauncherAM.java | 8 + .../apache/oozie/action/hadoop/UDFTester.java | 2 +- 32 files changed, 2013 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/hive-action-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/hive-action-1.0.xsd b/client/src/main/resources/hive-action-1.0.xsd new file mode 100644 index 0000000..f486a3e --- /dev/null +++ b/client/src/main/resources/hive-action-1.0.xsd @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:hive="uri:oozie:hive-action:1.0" + elementFormDefault="qualified" + targetNamespace="uri:oozie:hive-action:1.0"> + + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + + <xs:element name="hive" type="hive:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="hive:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="hive:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="hive:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="query" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:choice> + <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/hive2-action-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/hive2-action-1.0.xsd b/client/src/main/resources/hive2-action-1.0.xsd new file mode 100644 index 0000000..685f4b9 --- /dev/null +++ b/client/src/main/resources/hive2-action-1.0.xsd @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:hive2="uri:oozie:hive2-action:1.0" elementFormDefault="qualified" + targetNamespace="uri:oozie:hive2-action:1.0"> + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + <xs:element name="hive2" type="hive2:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="hive2:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="hive2:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="hive2:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="jdbc-url" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="password" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="query" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:choice> + <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> +</xs:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/oozie-common-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/oozie-common-1.0.xsd b/client/src/main/resources/oozie-common-1.0.xsd new file mode 100644 index 0000000..ddae912 --- /dev/null +++ b/client/src/main/resources/oozie-common-1.0.xsd @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified"> + + <xs:complexType name="LAUNCHER"> + <xs:choice maxOccurs="unbounded"> + <!-- Oozie Launcher job memory in MB --> + <xs:element name="memory.mb" minOccurs="0" type="xs:unsignedInt"/> + <xs:element name="vcores" minOccurs="0" type="xs:unsignedInt"/> + <xs:element name="java-opts" minOccurs="0" type="xs:string"/> + <xs:element name="env" minOccurs="0" type="xs:string"/> + <xs:element name="queue" minOccurs="0" type="xs:string"/> + <xs:element name="sharelib" minOccurs="0" type="xs:string"/> + </xs:choice> + </xs:complexType> + + <xs:complexType name="CONFIGURATION"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PREPARE"> + <xs:sequence> + <xs:element name="delete" type="DELETE" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="mkdir" type="MKDIR" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="DELETE"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="MKDIR"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="FLAG"/> + + + <xs:complexType name="MOVE"> + <xs:attribute name="source" type="xs:string" use="required"/> + <xs:attribute name="target" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="CHMOD"> + <xs:sequence> + <xs:element name="recursive" type="FLAG" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="path" type="xs:string" use="required"/> + <xs:attribute name="permissions" type="xs:string" use="required"/> + <xs:attribute name="dir-files" type="xs:string"/> + </xs:complexType> + + <xs:complexType name="TOUCHZ"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="CHGRP"> + <xs:sequence> + <xs:element name="recursive" type="FLAG" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="path" type="xs:string" use="required"/> + <xs:attribute name="group" type="xs:string" use="required"/> + <xs:attribute name="dir-files" type="xs:string"/> + </xs:complexType> +</xs:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/oozie-workflow-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/oozie-workflow-1.0.xsd b/client/src/main/resources/oozie-workflow-1.0.xsd new file mode 100644 index 0000000..51aee27 --- /dev/null +++ b/client/src/main/resources/oozie-workflow-1.0.xsd @@ -0,0 +1,299 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:1.0" + elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:1.0"> + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + + <xs:element name="workflow-app" type="workflow:WORKFLOW-APP"/> + <xs:simpleType name="IDENTIFIER"> + <xs:restriction base="xs:string"> + <xs:pattern value="([a-zA-Z_]([\-_a-zA-Z0-9])*){1,39}"/> + </xs:restriction> + </xs:simpleType> + + <xs:complexType name="WORKFLOW-APP"> + <xs:sequence> + <xs:element name="parameters" type="workflow:PARAMETERS" minOccurs="0" maxOccurs="1"/> + <xs:element name="global" type="workflow:GLOBAL" minOccurs="0" maxOccurs="1"/> + <xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/> + <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/> + <xs:element name="fork" type="workflow:FORK" minOccurs="1" maxOccurs="1"/> + <xs:element name="join" type="workflow:JOIN" minOccurs="1" maxOccurs="1"/> + <xs:element name="kill" type="workflow:KILL" minOccurs="1" maxOccurs="1"/> + <xs:element name="action" type="workflow:ACTION" minOccurs="1" maxOccurs="1"/> + </xs:choice> + <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/> + <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="PARAMETERS"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="GLOBAL"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="workflow:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="START"> + <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="END"> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="DECISION"> + <xs:sequence> + <xs:element name="switch" type="workflow:SWITCH" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:element name="switch" type="workflow:SWITCH"/> + + <xs:complexType name="SWITCH"> + <xs:sequence> + <xs:sequence> + <xs:element name="case" type="workflow:CASE" minOccurs="1" maxOccurs="unbounded"/> + <xs:element name="default" type="workflow:DEFAULT" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CASE"> + <xs:simpleContent> + <xs:extension base="xs:string"> + <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/> + </xs:extension> + </xs:simpleContent> + </xs:complexType> + + <xs:complexType name="DEFAULT"> + <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="FORK_TRANSITION"> + <xs:attribute name="start" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="FORK"> + <xs:sequence> + <xs:element name="path" type="workflow:FORK_TRANSITION" minOccurs="2" maxOccurs="unbounded"/> + </xs:sequence> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="JOIN"> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:element name="kill" type="workflow:KILL"/> + + <xs:complexType name="KILL"> + <xs:sequence> + <xs:element name="message" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:complexType name="ACTION_TRANSITION"> + <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/> + </xs:complexType> + + <xs:element name="map-reduce" type="workflow:MAP-REDUCE"/> + <xs:element name="pig" type="workflow:PIG"/> + <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW"/> + <xs:element name="fs" type="workflow:FS"/> + <xs:element name="java" type="workflow:JAVA"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="map-reduce" type="workflow:MAP-REDUCE" minOccurs="1" maxOccurs="1"/> + <xs:element name="pig" type="workflow:PIG" minOccurs="1" maxOccurs="1"/> + <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW" minOccurs="1" maxOccurs="1"/> + <xs:element name="fs" type="workflow:FS" minOccurs="1" maxOccurs="1"/> + <xs:element name="java" type="workflow:JAVA" minOccurs="1" maxOccurs="1"/> + <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/> + </xs:choice> + <xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/> + <xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/> + <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/> + <xs:attribute name="cred" type="xs:string"/> + <xs:attribute name="retry-max" type="xs:string"/> + <xs:attribute name="retry-interval" type="xs:string"/> + <xs:attribute name="retry-policy" type="xs:string"/> + </xs:complexType> + + <xs:complexType name="MAP-REDUCE"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:choice minOccurs="0" maxOccurs="1"> + <xs:element name="streaming" type="workflow:STREAMING" minOccurs="0" maxOccurs="1"/> + <xs:element name="pipes" type="workflow:PIPES" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="launcher" type="workflow:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="config-class" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PIG"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="workflow:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="SUB-WORKFLOW"> + <xs:sequence> + <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="propagate-configuration" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="FS"> + <xs:sequence> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="delete" type="workflow:DELETE"/> + <xs:element name="mkdir" type="workflow:MKDIR"/> + <xs:element name="move" type="workflow:MOVE"/> + <xs:element name="chmod" type="workflow:CHMOD"/> + <xs:element name="touchz" type="workflow:TOUCHZ"/> + <xs:element name="chgrp" type="workflow:CHGRP"/> + </xs:choice> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="JAVA"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="workflow:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="main-class" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:choice minOccurs="0" maxOccurs="1"> + <xs:element name="java-opts" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="java-opt" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> + </xs:choice> + <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="capture-output" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="STREAMING"> + <xs:sequence> + <xs:element name="mapper" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="reducer" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="record-reader" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="record-reader-mapping" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="env" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PIPES"> + <xs:sequence> + <xs:element name="map" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="reduce" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="inputformat" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="partitioner" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="writer" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="program" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CREDENTIALS"> + <xs:sequence minOccurs="0" maxOccurs="unbounded"> + <xs:element name="credential" type="workflow:CREDENTIAL"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CREDENTIAL"> + <xs:sequence minOccurs="0" maxOccurs="unbounded" > + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + <xs:attribute name="name" type="xs:string" use="required"/> + <xs:attribute name="type" type="xs:string" use="required"/> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/shell-action-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/shell-action-1.0.xsd b/client/src/main/resources/shell-action-1.0.xsd new file mode 100644 index 0000000..c6406c8 --- /dev/null +++ b/client/src/main/resources/shell-action-1.0.xsd @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:shell="uri:oozie:shell-action:1.0" + elementFormDefault="qualified" + targetNamespace="uri:oozie:shell-action:1.0"> + + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + + <xs:element name="shell" type="shell:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="shell:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="shell:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="shell:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="exec" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="env-var" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="capture-output" type="shell:FLAG" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/spark-action-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/spark-action-1.0.xsd b/client/src/main/resources/spark-action-1.0.xsd new file mode 100644 index 0000000..80a6a46 --- /dev/null +++ b/client/src/main/resources/spark-action-1.0.xsd @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:spark="uri:oozie:spark-action:1.0" elementFormDefault="qualified" + targetNamespace="uri:oozie:spark-action:1.0"> + + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + + <xs:element name="spark" type="spark:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="spark:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/client/src/main/resources/sqoop-action-1.0.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/sqoop-action-1.0.xsd b/client/src/main/resources/sqoop-action-1.0.xsd new file mode 100644 index 0000000..4463ac4 --- /dev/null +++ b/client/src/main/resources/sqoop-action-1.0.xsd @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:sqoop="uri:oozie:sqoop-action:1.0" + elementFormDefault="qualified" + targetNamespace="uri:oozie:sqoop-action:1.0"> + + <xs:include schemaLocation="oozie-common-1.0.xsd"/> + + <xs:element name="sqoop" type="sqoop:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:choice> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:choice> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="sqoop:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="launcher" type="sqoop:LAUNCHER" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="sqoop:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:choice> + <xs:element name="command" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="arg" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> + </xs:choice> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 49fd4b8..9d1afb5 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -22,6 +22,28 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.io.Closeables; import com.google.common.primitives.Ints; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -41,6 +63,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -98,8 +121,6 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,12 +128,18 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Objects; +import java.util.Properties; import java.util.Properties; import java.util.Set; +import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closeables; public class JavaActionExecutor extends ActionExecutor { - public static final String RUNNING = "RUNNING"; public static final String SUCCEEDED = "SUCCEEDED"; public static final String KILLED = "KILLED"; @@ -122,6 +149,11 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_NAME_NODE = "fs.default.name"; public static final String OOZIE_COMMON_LIBDIR = "oozie"; + public static final String DEFAULT_LAUNCHER_VCORES = "oozie.launcher.default.vcores"; + public static final String DEFAULT_LAUNCHER_MEMORY_MB = "oozie.launcher.default.memory.mb"; + public static final String DEFAULT_LAUNCHER_PRIORITY = "oozie.launcher.default.priority"; + public static final String DEFAULT_LAUNCHER_QUEUE = "oozie.launcher.default.queue"; + public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; @@ -260,6 +292,12 @@ public class JavaActionExecutor extends ActionExecutor { conf.set(HADOOP_NAME_NODE, nameNode); conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); + // FIXME - think about this! + Element e = actionXml.getChild("config-class", ns); + if (e != null) { + conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } + return conf; } @@ -299,6 +337,11 @@ public class JavaActionExecutor extends ActionExecutor { throw convertException(ex); } XConfiguration.copy(launcherConf, conf); + // Inject config-class for launcher to use for action + Element e = actionXml.getChild("config-class", ns); + if (e != null) { + conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } checkForDisallowedProps(launcherConf, "launcher configuration"); return conf; } @@ -876,13 +919,20 @@ public class JavaActionExecutor extends ActionExecutor { if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) { opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS)); } + List<Element> javaopts = actionXml.getChildren("java-opt", ns); - for (Element opt: javaopts) { - opts.append(" ").append(opt.getTextTrim()); + + // Either one or more <java-opt> element or one <java-opts> can be present since oozie-workflow-0.4 + if (!javaopts.isEmpty()) { + for (Element opt : javaopts) { + opts.append(" ").append(opt.getTextTrim()); + } } - Element opt = actionXml.getChild("java-opts", ns); - if (opt != null) { - opts.append(" ").append(opt.getTextTrim()); + else { + Element opt = actionXml.getChild("java-opts", ns); + if (opt != null) { + opts.append(" ").append(opt.getTextTrim()); + } } launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); @@ -942,6 +992,7 @@ public class JavaActionExecutor extends ActionExecutor { } Element actionXml = XmlUtils.parseXml(action.getConf()); + LOG.debug("ActionXML: {0}", action.getConf()); // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); @@ -950,6 +1001,14 @@ public class JavaActionExecutor extends ActionExecutor { LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); + String jobName = actionConf.get(HADOOP_JOB_NAME); + if (jobName == null || jobName.isEmpty()) { + jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", + getType(), context.getWorkflow().getAppName(), + action.getName(), context.getWorkflow().getId()); + actionConf.set(HADOOP_JOB_NAME, jobName); + } + injectActionCallback(context, actionConf); if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { @@ -1052,7 +1111,7 @@ public class JavaActionExecutor extends ActionExecutor { ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(), - credentials); + credentials, actionXml); yarnClient.submitApplication(appContext); launcherId = appId.toString(); @@ -1138,22 +1197,19 @@ public class JavaActionExecutor extends ActionExecutor { } private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, - String user, Context context, Configuration actionConf, String actionName, - Credentials credentials) + String user, Context context, Configuration actionConf, String actionName, + Credentials credentials, Element actionXml) throws IOException, HadoopAccessorException, URISyntaxException { ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - String appName = getAppName(context); - + setResources(launcherJobConf, appContext); + setPriority(launcherJobConf, appContext); + setQueue(launcherJobConf, appContext); appContext.setApplicationId(appId); - appContext.setApplicationName(appName); + setApplicationName(context, actionName, appContext); appContext.setApplicationType("Oozie Launcher"); - Priority pri = Records.newRecord(Priority.class); - int priority = 0; // TODO: OYA: Add a constant or a config - pri.setPriority(priority); - appContext.setPriority(pri); - appContext.setQueue("default"); // TODO: will be possible to set in <launcher> + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // Set the resources to localize @@ -1170,40 +1226,13 @@ public class JavaActionExecutor extends ActionExecutor { localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR); amContainer.setLocalResources(localResources); - // Set the environment variables - Map<String, String> env = new HashMap<String, String>(); - // This adds the Hadoop jars to the classpath in the Launcher JVM - ClasspathUtils.setupClasspath(env, launcherJobConf); - - if (needToAddMapReduceToClassPath()) { - ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); - } - - addActionSpecificEnvVars(env); - amContainer.setEnvironment(Collections.unmodifiableMap(env)); - - // Set the command - List<String> vargs = new ArrayList<String>(6); - vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) - + "/bin/java"); - - vargs.add("-Dlog4j.configuration=container-log4j.properties"); - vargs.add("-Dlog4j.debug=true"); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0); - vargs.add("-Dhadoop.root.logger=INFO,CLA"); - vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); - vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); - - Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); - vargs.add("-Djava.io.tmpdir=" + amTmpDir); + setEnvironmentVariables(launcherJobConf, amContainer); + List<String> vargs = createCommand(context); + setJavaOpts(launcherJobConf, actionXml, vargs); vargs.add(LauncherAM.class.getCanonicalName()); - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + - Path.SEPARATOR + ApplicationConstants.STDOUT); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + - Path.SEPARATOR + ApplicationConstants.STDERR); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); @@ -1221,51 +1250,181 @@ public class JavaActionExecutor extends ActionExecutor { amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } - // Set Resources - // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores) - Resource resource = Resource.newInstance(2048, 1); - appContext.setResource(resource); appContext.setCancelTokensWhenComplete(true); return appContext; } - Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context, - final WorkflowAction action, - final Configuration actionConf) throws Exception { - if (context == null || action == null) { - LOG.warn("context or action is null"); - return null; + private List<String> createCommand(Context context) { + List<String> vargs = new ArrayList<String>(6); + vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) + + "/bin/java"); + + vargs.add("-Dlog4j.configuration=container-log4j.properties"); + vargs.add("-Dlog4j.debug=true"); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024); + vargs.add("-Dhadoop.root.logger=INFO,CLA"); + vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); + vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); + return vargs; + } + + private void setJavaOpts(Configuration launcherJobConf, Element actionXml, List<String> vargs) { + // Note: for backward compatibility reasons, we have to support the <java-opts> tag inside the <java> action + // If both java/java-opt(s) and launcher/java-opts are defined, we pick java/java-opts + // We also display a warning to let users know that they should migrate their workflow + StringBuilder javaOpts = new StringBuilder(); + boolean oldJavaOpts = handleJavaOpts(actionXml, javaOpts); + if (oldJavaOpts) { + vargs.add(javaOpts.toString()); + } + + final String oozieLauncherJavaOpts = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY); + if (oozieLauncherJavaOpts != null) { + if (oldJavaOpts) { + LOG.warn("<java-opts> was defined inside the <launcher> tag -- ignored"); + } else { + vargs.add(oozieLauncherJavaOpts); + } + } + } + + private boolean handleJavaOpts(Element actionXml, StringBuilder javaOpts) { + Namespace ns = actionXml.getNamespace(); + boolean oldJavaOpts = false; + @SuppressWarnings("unchecked") + List<Element> javaopts = actionXml.getChildren("java-opt", ns); + for (Element opt: javaopts) { + javaOpts.append(" ").append(opt.getTextTrim()); + oldJavaOpts = true; + } + Element opt = actionXml.getChild("java-opts", ns); + if (opt != null) { + javaOpts.append(" ").append(opt.getTextTrim()); + oldJavaOpts = true; + } + + if (oldJavaOpts) { + LOG.warn("Note: <java-opts> inside the action is used in the workflow. Please move <java-opts> tag under" + + " the <launcher> element. See the documentation for details"); + } + return oldJavaOpts; + } + + private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) { + String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), + context.getWorkflow().getAppName(), actionName, + context.getWorkflow().getId()); + appContext.setApplicationName(jobName); + } + + private void setEnvironmentVariables(Configuration launcherJobConf, ContainerLaunchContext amContainer) throws IOException { + Map<String, String> env = new HashMap<>(); + + final String oozieLauncherEnvProperty = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY); + if (oozieLauncherEnvProperty != null) { + Map<String, String> environmentVars = extractEnvVarsFromOozieLauncherProps(oozieLauncherEnvProperty); + env.putAll(environmentVars); } - if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && !UserGroupInformation.isSecurityEnabled()) { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); - return null; + // This adds the Hadoop jars to the classpath in the Launcher JVM + ClasspathUtils.setupClasspath(env, launcherJobConf); + + if (needToAddMapReduceToClassPath()) { + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + } + + addActionSpecificEnvVars(env); + amContainer.setEnvironment(ImmutableMap.copyOf(env)); + } + + private void setQueue(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + String queue; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY) != null) { + queue = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY); + } else { + queue = Preconditions.checkNotNull(ConfigurationService.get(DEFAULT_LAUNCHER_QUEUE), "Default queue is undefined"); + } + appContext.setQueue(queue); + } + + private void setPriority(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + int priority; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY) != null) { + priority = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY, -1); + } else { + int defaultPriority = ConfigurationService.getInt(DEFAULT_LAUNCHER_PRIORITY); + priority = defaultPriority; + } + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + appContext.setPriority(pri); + } + + private void setResources(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + int memory; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY) != null) { + memory = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, -1); + Preconditions.checkArgument(memory > 0, "Launcher memory is 0 or negative"); + } else { + int defaultMemory = ConfigurationService.getInt(DEFAULT_LAUNCHER_MEMORY_MB, -1); + Preconditions.checkArgument(defaultMemory > 0, "Default launcher memory is 0 or negative"); + memory = defaultMemory; } - final XConfiguration wfJobConf = getWorkflowConf(context); - if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && - wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)) && - !UserGroupInformation.isSecurityEnabled()) { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); - return null; + int vcores; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY) != null) { + vcores = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, -1); + Preconditions.checkArgument(vcores > 0, "Launcher vcores is 0 or negative"); + } else { + int defaultVcores = ConfigurationService.getInt(DEFAULT_LAUNCHER_VCORES); + Preconditions.checkArgument(defaultVcores > 0, "Default launcher vcores is 0 or negative"); + vcores = defaultVcores; } + Resource resource = Resource.newInstance(memory, vcores); + appContext.setResource(resource); + } - final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context, action); - if (credPropertiesMap.isEmpty()) { - LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); - return credPropertiesMap; + private Map<String, String> extractEnvVarsFromOozieLauncherProps(String oozieLauncherEnvProperty) { + Map<String, String> envMap = new LinkedHashMap<>(); + for (String envVar : StringUtils.split(oozieLauncherEnvProperty, File.pathSeparatorChar)) { + String[] env = StringUtils.split(envVar, '='); + Preconditions.checkArgument(env.length == 2, "Invalid launcher setting for environment variables: \"%s\". " + + "<env> should contain a list of ENV_VAR_NAME=VALUE separated by the '%s' character. " + + "Example on Unix: A=foo1:B=foo2", oozieLauncherEnvProperty, File.pathSeparator); + envMap.put(env[0], env[1]); } + return envMap; + } - for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - if (entry.getValue() != null) { - final CredentialsProperties prop = entry.getValue(); - LOG.debug("Credential Properties set for action : " + action.getId()); - for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) { - final String key = propEntry.getKey(); - final String value = propEntry.getValue(); - actionConf.set(key, value); - LOG.debug("property : '" + key + "', value : '" + value + "'"); + protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, + WorkflowAction action, Configuration actionConf) throws Exception { + HashMap<String, CredentialsProperties> credPropertiesMap = null; + if (context != null && action != null) { + if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { + XConfiguration wfJobConf = getWorkflowConf(context); + if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || + !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { + credPropertiesMap = getActionCredentialsProperties(context, action); + if (!credPropertiesMap.isEmpty()) { + for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + CredentialsProperties prop = entry.getValue(); + LOG.debug("Credential Properties set for action : " + action.getId()); + for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { + String key = propEntry.getKey(); + String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); + } + } + } + } else { + LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + } + } else { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java index 8fdc50c..a0dfd31 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java @@ -77,8 +77,8 @@ public class SqoopActionExecutor extends JavaActionExecutor { if (e != null) { String strConf = XmlUtils.prettyPrint(e).toString(); XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); - checkForDisallowedProps(inlineConf, "inline configuration"); XConfiguration.copy(inlineConf, actionConf); + checkForDisallowedProps(inlineConf, "inline configuration"); } } catch (IOException ex) { throw convertException(ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java index 7e33485..5890b8c 100644 --- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java @@ -329,8 +329,8 @@ public class SshActionExecutor extends ActionExecutor { * * @param command Command to execute as String. * @return exit status of the execution. - * @throws IOException if process exits with status nonzero. - * @throws InterruptedException if process does not run properly. + * @throws IOException if processSettings exits with status nonzero. + * @throws InterruptedException if processSettings does not run properly. */ public int executeCommand(String command) throws IOException, InterruptedException { Runtime runtime = Runtime.getRuntime(); @@ -396,7 +396,7 @@ public class SshActionExecutor extends ActionExecutor { * @param action action object. * @param recoveryId action id + run number to enable recovery in rerun * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments - * @return process id of the running command. + * @return processSettings id of the running command. * @throws IOException thrown if failed to run the command. * @throws InterruptedException thrown if any interruption happens. */ @@ -468,7 +468,7 @@ public class SshActionExecutor extends ActionExecutor { } /** - * Get the return value of a process. + * Get the return value of a processSettings. * * @param command command to be executed. * @return zero if execution is successful and any non zero value for failure. @@ -679,7 +679,7 @@ public class SshActionExecutor extends ActionExecutor { * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the * store content may exceed this length. - * @return the exit value of the process. + * @return the exit value of the processSettings. * @throws IOException */ private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java index 47bfd2b..87d0c5e 100644 --- a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java +++ b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java @@ -49,7 +49,7 @@ public class MessageReceiver implements MessageListener { } /** - * Get the MessageHandler that will process the message + * Get the MessageHandler that will processSettings the message * * @return message handler */ http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java index ffc29af..97a75ff 100644 --- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java +++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java @@ -18,7 +18,6 @@ package org.apache.oozie.service; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.oozie.action.control.EndActionExecutor; import org.apache.oozie.action.control.ForkActionExecutor; http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/service/SchemaService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SchemaService.java b/core/src/main/java/org/apache/oozie/service/SchemaService.java index 137e2c0..9d2a521 100644 --- a/core/src/main/java/org/apache/oozie/service/SchemaService.java +++ b/core/src/main/java/org/apache/oozie/service/SchemaService.java @@ -29,15 +29,17 @@ import javax.xml.transform.stream.StreamSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.schema.ResourceResolver; import org.xml.sax.SAXException; + /** * Service that loads Oozie workflow definition schema and registered extension * schemas. */ + public class SchemaService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService."; @@ -95,9 +97,12 @@ public class SchemaService implements Service { } List<StreamSource> sources = new ArrayList<StreamSource>(); for (String schemaName : schemaNames) { - sources.add(new StreamSource(IOUtils.getResourceAsStream(schemaName, -1))); + StreamSource s = new StreamSource(IOUtils.getResourceAsStream(schemaName, -1)); + s.setSystemId(schemaName); + sources.add(s); } SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + factory.setResourceResolver(new ResourceResolver()); return factory.newSchema(sources.toArray(new StreamSource[sources.size()])); } http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/util/WritableUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java b/core/src/main/java/org/apache/oozie/util/WritableUtils.java index aa027e3..5a4cb24 100644 --- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java +++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java @@ -151,6 +151,13 @@ public class WritableUtils { } } + /** + * Write string list. + * + * @param dataOutput the data output + * @param list the list + * @throws IOException Signals that an I/O exception has occurred. + */ public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { dataOutput.writeInt(list.size()); for (String str : list) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/util/schema/Input.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/schema/Input.java b/core/src/main/java/org/apache/oozie/util/schema/Input.java new file mode 100644 index 0000000..d5ac886 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/schema/Input.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util.schema; + +import org.w3c.dom.ls.LSInput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; + +/** + * Utility class to handle schema import and include statements + */ +class Input implements LSInput { + private static final String UTF_8_ENCODING = "UTF-8"; + private BufferedInputStream inputStream; + private String publicId; + private String systemId; + + public Input(String publicId, String sysId, InputStream input) { + this.publicId = publicId; + this.systemId = sysId; + this.inputStream = new BufferedInputStream(input); + } + + public String getPublicId() { + return publicId; + } + + public void setPublicId(String publicId) { + this.publicId = publicId; + } + + public String getBaseURI() { + return null; + } + + @Override + public void setBaseURI(String baseURI) { + + } + + @Override + public InputStream getByteStream() { + return null; + } + + @Override + public void setByteStream(InputStream byteStream) { + + } + + @Override + public boolean getCertifiedText() { + return false; + } + + @Override + public void setCertifiedText(boolean certifiedText) { + + } + + public Reader getCharacterStream() { + return null; + } + + @Override + public void setCharacterStream(Reader characterStream) { + + } + + @Override + public String getEncoding() { + return null; + } + + @Override + public void setEncoding(String encoding) { + + } + + @Override + public String getStringData() { + synchronized (inputStream) { + try { + byte[] input = new byte[inputStream.available()]; + inputStream.read(input); + String contents = new String(input, UTF_8_ENCODING); + return contents; + } catch (IOException e) { + e.printStackTrace(); + System.out.println("Exception " + e); + return null; + } + } + } + + @Override + public void setStringData(String stringData) { + + } + + @Override + public String getSystemId() { + return systemId; + } + + @Override + public void setSystemId(String systemId) { + this.systemId = systemId; + } + + public BufferedInputStream getInputStream() { + return inputStream; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/util/schema/ResourceResolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/schema/ResourceResolver.java b/core/src/main/java/org/apache/oozie/util/schema/ResourceResolver.java new file mode 100644 index 0000000..cbdc23c --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/schema/ResourceResolver.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.schema; + +import org.apache.commons.io.FilenameUtils; +import org.w3c.dom.ls.LSInput; +import org.w3c.dom.ls.LSResourceResolver; + +import java.io.InputStream; + +/** + * Utility class to handle schema import and include statements + */ +public class ResourceResolver implements LSResourceResolver { + + public LSInput resolveResource(String type, String namespaceURI, + String publicId, String systemId, String baseURI) { + if (systemId == null) { + systemId = FilenameUtils.getName(baseURI); + } + InputStream resourceAsStream = this.getClass().getClassLoader().getResourceAsStream(systemId); + return new Input(publicId, systemId, resourceAsStream); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/workflow/lite/LauncherConfigHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LauncherConfigHandler.java b/core/src/main/java/org/apache/oozie/workflow/lite/LauncherConfigHandler.java new file mode 100644 index 0000000..c367742 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LauncherConfigHandler.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.workflow.lite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.action.hadoop.LauncherAM; +import org.jdom.Element; +import org.jdom.Namespace; + +class LauncherConfigHandler { + private static final String LAUNCHER_MEMORY_MB = "memory.mb"; + private static final String LAUNCHER_VCORES = "vcores"; + private static final String LAUNCHER_PRIORITY = "priority"; + private static final String LAUNCHER_JAVAOPTS = "java-opts"; + private static final String LAUNCHER_ENV = "env"; + private static final String LAUNCHER_SHARELIB = "sharelib"; + private static final String LAUNCHER_QUEUE = "queue"; + + private final Configuration entries; + private final Element xmlLauncherElement; + private final Namespace ns; + + public LauncherConfigHandler(Configuration entries, Element globalLauncherElement, Namespace ns) { + this.entries = entries; + this.xmlLauncherElement = globalLauncherElement; + this.ns = ns; + } + + private void setStringCfgSetting(String xmlTag, String configKey) { + Element launcherSetting = xmlLauncherElement.getChild(xmlTag, ns); + if (launcherSetting != null) { + entries.set(configKey, launcherSetting.getText()); + } + } + + private void setIntCfgSetting(String xmlTag, String configKey) { + Element launcherSetting = xmlLauncherElement.getChild(xmlTag, ns); + if (launcherSetting != null) { + entries.setInt(configKey, Integer.parseInt(launcherSetting.getText())); + } + } + + public void processSettings() { + setIntCfgSetting(LAUNCHER_MEMORY_MB, LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY); + setIntCfgSetting(LAUNCHER_VCORES, LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY); + setIntCfgSetting(LAUNCHER_PRIORITY, LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY); + setStringCfgSetting(LAUNCHER_JAVAOPTS, LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY); + setStringCfgSetting(LAUNCHER_ENV, LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY); + setStringCfgSetting(LAUNCHER_QUEUE, LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY); + setStringCfgSetting(LAUNCHER_SHARELIB, LauncherAM.OOZIE_LAUNCHER_SHARELIB_PROPERTY); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index a74e5c7..aa0e06b 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -68,6 +68,7 @@ import org.xml.sax.SAXException; */ public class LiteWorkflowAppParser { + private static final String LAUNCHER_E = "launcher"; private static final String DECISION_E = "decision"; private static final String ACTION_E = "action"; private static final String END_E = "end"; @@ -214,6 +215,7 @@ public class LiteWorkflowAppParser { private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf) throws WorkflowException { Namespace ns = root.getNamespace(); + LiteWorkflowApp def = null; GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ? null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); @@ -264,10 +266,10 @@ public class LiteWorkflowAppParser { } eActionConf = elem; if (SUBWORKFLOW_E.equals(elem.getName())) { - handleDefaultsAndGlobal(gData, null, elem); + handleDefaultsAndGlobal(gData, null, elem, ns); } else { - handleDefaultsAndGlobal(gData, configDefault, elem); + handleDefaultsAndGlobal(gData, configDefault, elem, ns); } } } @@ -300,9 +302,11 @@ public class LiteWorkflowAppParser { } else if (eNode.getName().equals(GLOBAL)) { if(jobConf.get(OOZIE_GLOBAL) != null) { gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); - handleDefaultsAndGlobal(gData, null, eNode); + handleDefaultsAndGlobal(gData, null, eNode, ns); } + gData = parseGlobalSection(ns, eNode); + } else if (eNode.getName().equals(PARAMETERS)) { // No operation is required } else { @@ -438,7 +442,7 @@ public class LiteWorkflowAppParser { } } - Configuration globalConf = null; + Configuration globalConf = new XConfiguration(); Element globalConfigurationElement = global.getChild(CONFIGURATION, ns); if (globalConfigurationElement != null) { try { @@ -447,12 +451,18 @@ public class LiteWorkflowAppParser { throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); } } + + Element globalLauncherElement = global.getChild(LAUNCHER_E, ns); + if (globalLauncherElement != null) { + LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(globalConf, globalLauncherElement, ns); + launcherConfigHandler.processSettings(); + } gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf); } return gData; } - private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement) + private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement, Namespace ns) throws WorkflowException { ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName()); @@ -497,7 +507,7 @@ public class LiteWorkflowAppParser { // If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's // <configuration> and <job-xml> fields. We also merge this with those from the <global> section, if given. If none are // defined, empty values are placed. Exceptions are thrown if there's an error parsing, but not if they're not given. - if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { + if (GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { @SuppressWarnings("unchecked") List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs); if (gData != null && gData.jobXmls != null) { @@ -524,12 +534,20 @@ public class LiteWorkflowAppParser { if (gData != null && gData.conf != null) { XConfiguration.copy(gData.conf, actionConf); } + + Element launcherConfiguration = actionElement.getChild(LAUNCHER_E, actionNs); + if (launcherConfiguration != null) { + LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(actionConf, launcherConfiguration, actionNs); + launcherConfigHandler.processSettings(); + } + Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs); if (actionConfiguration != null) { //copy and override XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())), actionConf); } + int position = actionElement.indexOf(actionConfiguration); actionElement.removeContent(actionConfiguration); //replace with enhanced one Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false)); http://git-wip-us.apache.org/repos/asf/oozie/blob/d135b88c/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java index 23df086..2e09889 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java @@ -31,8 +31,6 @@ import org.apache.hadoop.conf.Configuration; import javax.xml.validation.Schema; import java.io.StringReader; -import java.util.Date; -import java.util.Map; //TODO javadoc public abstract class LiteWorkflowLib implements WorkflowLib {
