Repository: oozie
Updated Branches:
  refs/heads/master 718943fe4 -> 90c068574


OOZIE-1954 Add a way for the MapReduce action to be configured by Java code 
(rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/90c06857
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/90c06857
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/90c06857

Branch: refs/heads/master
Commit: 90c0685746f6115db9accdc05d80a099e116e873
Parents: 718943f
Author: Robert Kanter <[email protected]>
Authored: Tue Sep 16 11:07:30 2014 -0700
Committer: Robert Kanter <[email protected]>
Committed: Tue Sep 16 11:07:30 2014 -0700

----------------------------------------------------------------------
 .../src/main/resources/oozie-workflow-0.5.xsd   |   1 +
 .../oozie/action/hadoop/JavaActionExecutor.java |   4 +
 .../action/hadoop/MapperReducerForTest.java     |  25 +++++
 .../src/site/twiki/WorkflowFunctionalSpec.twiki | 104 +++++++++++++++++--
 .../map-reduce/job-with-config-class.properties |  25 +++++
 .../src/main/apps/map-reduce/job.properties     |   2 +-
 .../map-reduce/workflow-with-config-class.xml   |  52 ++++++++++
 .../example/SampleOozieActionConfigurator.java  |  50 +++++++++
 release-log.txt                                 |   1 +
 .../oozie/action/hadoop/LauncherMain.java       |  25 +++++
 .../oozie/action/hadoop/LauncherMapper.java     |   5 +
 .../oozie/action/hadoop/MapReduceMain.java      |  19 ++--
 .../action/hadoop/OozieActionConfigurator.java  |  37 +++++++
 .../OozieActionConfiguratorException.java       |  41 ++++++++
 .../apache/oozie/action/hadoop/PipesMain.java   |  18 ++--
 .../hadoop/OozieActionConfiguratorForTest.java  |  34 ++++++
 .../hadoop/TestMapReduceActionExecutor.java     | 100 ++++++++++++++++++
 17 files changed, 516 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/client/src/main/resources/oozie-workflow-0.5.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-workflow-0.5.xsd 
b/client/src/main/resources/oozie-workflow-0.5.xsd
index 6620a4e..b01580c 100644
--- a/client/src/main/resources/oozie-workflow-0.5.xsd
+++ b/client/src/main/resources/oozie-workflow-0.5.xsd
@@ -173,6 +173,7 @@
             </xs:choice>
             <xs:element name="job-xml" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="configuration" type="workflow:CONFIGURATION" 
minOccurs="0" maxOccurs="1"/>
+            <xs:element name="config-class" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
             <xs:element name="file" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="archive" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
         </xs:sequence>

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 53f979c..201cfa3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -253,6 +253,10 @@ public class JavaActionExecutor extends ActionExecutor {
                 injectLauncherUseUberMode(launcherConf);
                 XConfiguration.copy(launcherConf, conf);
             }
+            e = actionXml.getChild("config-class", actionXml.getNamespace());
+            if (e != null) {
+                conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, 
e.getTextTrim());
+            }
             return conf;
         }
         catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
index d89990c..8f08ddd 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.oozie.action.hadoop;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
@@ -30,12 +33,34 @@ import java.util.Iterator;
 public class MapperReducerForTest implements Mapper, Reducer {
     public static final String GROUP = "g";
     public static final String NAME = "c";
+    /**
+     * If specified in the job conf, the mapper will write out the job.xml 
file here.
+     */
+    public static final String JOB_XML_OUTPUT_LOCATION = 
"oozie.job.xml.output.location";
 
     public static void main(String[] args) {
         System.out.println("hello!");
     }
 
+    @Override
     public void configure(JobConf jobConf) {
+        try {
+            String loc = jobConf.get(JOB_XML_OUTPUT_LOCATION);
+            if (loc != null) {
+                Path p = new Path(loc);
+                FileSystem fs = p.getFileSystem(jobConf);
+                if (!fs.exists(p)) {
+                    FSDataOutputStream out = fs.create(p);
+                    try {
+                        jobConf.writeXml(out);
+                    } finally {
+                        out.close();
+                    }
+                }
+            }
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
     }
 
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki 
b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
index 3319bcc..43fddbb 100644
--- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
+++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -568,10 +568,11 @@ Hadoop JobConf properties can be specified as part of
    * the =config-default.xml= or
    * JobConf XML file bundled with the workflow application or
    * <global> tag in workflow definition or
-   * Inline =map-reduce= action configuration. 
+   * Inline =map-reduce= action configuration or
+   * An implementation of OozieActionConfigurator specified by the 
<config-class> tag in workflow definition.
 
-The configuration properties are loaded in the following above order i.e. 
=streaming=, =job-xml= and =configuration=, and 
-the precedence order is later values override earlier values.
+The configuration properties are loaded in the following above order i.e. 
=streaming=, =job-xml=, =configuration=,
+and =config-class=, and the precedence order is later values override earlier 
values.
 
 Streaming and inline property values can be parameterized (templatized) using 
EL expressions.
 
@@ -579,7 +580,7 @@ The Hadoop =mapred.job.tracker= and =fs.default.name= 
properties must not be pre
 configuration.
 
 #FilesAchives
----++++ 3.2.2.1 Adding Files and Archives for the Job
+---+++++ 3.2.2.1 Adding Files and Archives for the Job
 
 The =file=, =archive= elements make available, to map-reduce jobs, files and 
archives. If the specified path is
 relative, it is assumed the file or archiver are within the application 
directory, in the corresponding sub-path.
@@ -595,8 +596,88 @@ To force a symlink for a file on the task running 
directory, use a '#' followed
 
 Refer to Hadoop distributed cache documentation for details more details on 
files and archives.
 
+---+++++ 3.2.2.2 Configuring the MapReduce action with Java code
+
+Java code can be used to further configure the MapReduce action.  This can be 
useful if you already have "driver" code for your
+MapReduce action, if you're more familiar with MapReduce's Java API, if 
there's some configuration that requires logic, or some
+configuration that's difficult to do in straight XML (e.g. Avro).
+
+Create a class that implements the 
org.apache.oozie.action.hadoop.OozieActionConfigurator interface from the 
"oozie-sharelib-oozie"
+artifact.  It contains a single method that recieves a =JobConf= as an 
argument.  Any configuration properties set on this =JobConf=
+will be used by the MapReduce action.
+
+The OozieActionConfigurator has this signature:
+<verbatim>
+public interface OozieActionConfigurator {
+    public void configure(JobConf actionConf) throws 
OozieActionConfiguratorException;
+}
+</verbatim>
+where =actionConf= is the =JobConf= you can update.  If you need to throw an 
Exception, you can wrap it in
+an =OozieActionConfiguratorException=, also in the "oozie-sharelib-oozie" 
artifact.
+
+For example:
+<verbatim>
+package com.example;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.action.hadoop.OozieActionConfigurator;
+import org.apache.oozie.action.hadoop.OozieActionConfiguratorException;
+import org.apache.oozie.example.SampleMapper;
+import org.apache.oozie.example.SampleReducer;
+
+public class MyConfigClass implements OozieActionConfigurator {
+
+    @Override
+    public void configure(JobConf actionConf) throws 
OozieActionConfiguratorException {
+        if (actionConf.getUser() == null) {
+            throw new OozieActionConfiguratorException("No user set");
+        }
+        actionConf.setMapperClass(SampleMapper.class);
+        actionConf.setReducerClass(SampleReducer.class);
+        FileInputFormat.setInputPaths(actionConf, new Path("/user/" + 
actionConf.getUser() + "/input-data"));
+        FileOutputFormat.setOutputPath(actionConf, new Path("/user/" + 
actionConf.getUser() + "/output"));
+        ...
+    }
+}
+</verbatim>
+
+To use your config class in your MapReduce action, simply compile it into a 
jar, make the jar available to your action, and specify
+the class name in the =config-class= element (this requires at least schema 
0.5):
+<verbatim>
+<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
+    ...
+    <action name="[NODE-NAME]">
+        <map-reduce>
+            ...
+            <job-xml>[JOB-XML-FILE]</job-xml>
+            <configuration>
+                <property>
+                    <name>[PROPERTY-NAME]</name>
+                    <value>[PROPERTY-VALUE]</value>
+                </property>
+                ...
+            </configuration>
+            <config-class>com.example.MyConfigClass</config-class>
+            ...
+        </map-reduce>
+        <ok to="[NODE-NAME]"/>
+        <error to="[NODE-NAME]"/>
+    </action>
+    ...
+</workflow-app>
+</verbatim>
+
+Another example of this can be found in the "map-reduce" example that comes 
with Oozie.
+
+A useful tip: The initial =JobConf= passed to the =configure= method includes 
all of the properties listed in the =configuration=
+section of the MR action in a workflow.  If you need to pass any information 
to your OozieActionConfigurator, you can simply put
+them here.
+
 #StreamingMapReduceAction
----+++++ 3.2.2.2 Streaming
+---+++++ 3.2.2.3 Streaming
 
 Streaming information can be specified in the =streaming= element.
 
@@ -613,7 +694,7 @@ The Mapper/Reducer can be overridden by a 
=mapred.mapper.class= or =mapred.reduc
 file or =configuration= elements.
 
 #PipesMapReduceAction
----+++++ 3.2.2.3 Pipes
+---+++++ 3.2.2.4 Pipes
 
 Pipes information can be specified in the =pipes= element.
 
@@ -629,10 +710,10 @@ the =file= and =archive= elements described in the 
previous section.
 
 Pipe properties can be overridden by specifying them in the =job-xml= file or 
=configuration= element.
 
----+++++ 3.2.2.4 Syntax
+---+++++ 3.2.2.5 Syntax
 
 <verbatim>
-<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
+<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
     ...
     <action name="[NODE-NAME]">
         <map-reduce>
@@ -670,6 +751,7 @@ Pipe properties can be overridden by specifying them in the 
=job-xml= file or =c
                 </property>
                 ...
             </configuration>
+            <config-class>com.example.MyConfigClass</config-class>
             <file>[FILE-PATH]</file>
             ...
             <archive>[FILE-PATH]</archive>
@@ -700,6 +782,11 @@ The =configuration= element, if present, contains JobConf 
properties for the Had
 Properties specified in the =configuration= element override properties 
specified in the file specified in the
  =job-xml= element.
 
+As of schema 0.5, the =config-class= element, if present, contains a class 
that implements OozieActionConfigurator that can be used
+to further configure the MapReduce job.
+
+Properties specified in the =config-class= class override properties specified 
in =configuration= element.
+
 External Stats can be turned on/off by specifying the property 
_oozie.action.external.stats.write_ as _true_ or _false_ in the configuration 
element of workflow.xml. The default value for this property is _false_.
 
 The =file= element, if present, must specify the target sybolic link for 
binaries by separating the original file and target with a # 
(file#target-sym-link). This is not required for libraries.
@@ -2639,6 +2726,7 @@ to be executed.
             </xs:choice>
             <xs:element name="job-xml" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="configuration" type="workflow:CONFIGURATION" 
minOccurs="0" maxOccurs="1"/>
+            <xs:element name="config-class" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
             <xs:element name="file" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="archive" type="xs:string" minOccurs="0" 
maxOccurs="unbounded"/>
         </xs:sequence>

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/examples/src/main/apps/map-reduce/job-with-config-class.properties
----------------------------------------------------------------------
diff --git a/examples/src/main/apps/map-reduce/job-with-config-class.properties 
b/examples/src/main/apps/map-reduce/job-with-config-class.properties
new file mode 100644
index 0000000..0b14cb7
--- /dev/null
+++ b/examples/src/main/apps/map-reduce/job-with-config-class.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+nameNode=hdfs://localhost:8020
+jobTracker=localhost:8021
+queueName=default
+examplesRoot=examples
+
+oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce/workflow-with-config-class.xml
+outputDir=map-reduce

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/examples/src/main/apps/map-reduce/job.properties
----------------------------------------------------------------------
diff --git a/examples/src/main/apps/map-reduce/job.properties 
b/examples/src/main/apps/map-reduce/job.properties
index 7b7a24c..7115229 100644
--- a/examples/src/main/apps/map-reduce/job.properties
+++ b/examples/src/main/apps/map-reduce/job.properties
@@ -21,5 +21,5 @@ jobTracker=localhost:8021
 queueName=default
 examplesRoot=examples
 
-oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
+oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce/workflow.xml
 outputDir=map-reduce

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/examples/src/main/apps/map-reduce/workflow-with-config-class.xml
----------------------------------------------------------------------
diff --git a/examples/src/main/apps/map-reduce/workflow-with-config-class.xml 
b/examples/src/main/apps/map-reduce/workflow-with-config-class.xml
new file mode 100644
index 0000000..0deab66
--- /dev/null
+++ b/examples/src/main/apps/map-reduce/workflow-with-config-class.xml
@@ -0,0 +1,52 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf">
+    <start to="mr-node"/>
+    <action name="mr-node">
+        <map-reduce>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <prepare>
+                <delete 
path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
+            </prepare>
+            <!-- most of the <configuration> properties are being set by 
SampleOozieActionConfigurator -->
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <!-- These two are not Hadoop properties, but 
SampleOozieActionConfigurator can use them -->
+                <property>
+                    <name>examples.root</name>
+                    <value>${examplesRoot}</value>
+                </property>
+                <property>
+                    <name>output.dir.name</name>
+                    <value>${outputDir}</value>
+                </property>
+            </configuration>
+            
<config-class>org.apache.oozie.example.SampleOozieActionConfigurator</config-class>
+        </map-reduce>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>Map/Reduce failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/examples/src/main/java/org/apache/oozie/example/SampleOozieActionConfigurator.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/oozie/example/SampleOozieActionConfigurator.java
 
b/examples/src/main/java/org/apache/oozie/example/SampleOozieActionConfigurator.java
new file mode 100644
index 0000000..ff38a54
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/oozie/example/SampleOozieActionConfigurator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.example;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.action.hadoop.OozieActionConfigurator;
+import org.apache.oozie.action.hadoop.OozieActionConfiguratorException;
+
+public class SampleOozieActionConfigurator implements OozieActionConfigurator {
+
+    @Override
+    public void configure(JobConf actionConf) throws 
OozieActionConfiguratorException {
+        if (actionConf.getUser() == null) {
+            throw new OozieActionConfiguratorException("No user set");
+        }
+        if (actionConf.get("examples.root") == null) {
+            throw new OozieActionConfiguratorException("examples.root not 
set");
+        }
+        if (actionConf.get("output.dir.name") == null) {
+            throw new OozieActionConfiguratorException("output.dir.name not 
set");
+        }
+
+        actionConf.setMapperClass(SampleMapper.class);
+        actionConf.setReducerClass(SampleReducer.class);
+        actionConf.setNumMapTasks(1);
+        FileInputFormat.setInputPaths(actionConf,
+                new Path("/user/" + actionConf.getUser() + "/" + 
actionConf.get("examples.root") + "/input-data/text"));
+        FileOutputFormat.setOutputPath(actionConf,
+                new Path("/user/" + actionConf.getUser() + "/" + 
actionConf.get("examples.root") + "/output-data/"
+                        + actionConf.get("output.dir.name")));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index aad3764..a4bd20b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1954 Add a way for the MapReduce action to be configured by Java code 
(rkanter)
 OOZIE-2003 Checkstyle issues (rkanter via shwethags)
 OOZIE-1457 Create a Hive Server 2 action (rkanter)
 OOZIE-1950 Coordinator job info should support timestamp (nominal time) 
(shwethags)

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index ed5b88c..8cfefbb 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -30,6 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.hadoop.util.Shell;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapred.JobConf;
 
 public abstract class LauncherMain {
 
@@ -121,6 +122,30 @@ public abstract class LauncherMain {
         }
         return path;
     }
+
+    /**
+     * Will run the user specified OozieActionConfigurator subclass (if one is 
provided) to update the action configuration.
+     *
+     * @param actionConf The action configuration to update
+     * @throws OozieActionConfiguratorException
+     */
+    protected static void runConfigClass(JobConf actionConf) throws 
OozieActionConfiguratorException {
+        String configClass = 
System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
+        if (configClass != null) {
+            try {
+                Class<?> klass = Class.forName(configClass);
+                Class<? extends OozieActionConfigurator> 
actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class);
+                OozieActionConfigurator actionConfigurator = 
actionConfiguratorKlass.newInstance();
+                actionConfigurator.configure(actionConf);
+            } catch (ClassNotFoundException e) {
+                throw new OozieActionConfiguratorException("An Exception 
occured while instantiating the action config class", e);
+            } catch (InstantiationException e) {
+                throw new OozieActionConfiguratorException("An Exception 
occured while instantiating the action config class", e);
+            } catch (IllegalAccessException e) {
+                throw new OozieActionConfiguratorException("An Exception 
occured while instantiating the action config class", e);
+            }
+        }
+    }
 }
 
 class LauncherMainException extends Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index ada0706..4923fe3 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -58,6 +58,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements 
Mapper<K1, V1, K2, V2>, R
     static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = ACTION_PREFIX + 
"main.arg.count";
     static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + 
"main.arg.";
     static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = 
"oozie.external.stats.max.size";
+    static final String OOZIE_ACTION_CONFIG_CLASS = ACTION_PREFIX + 
"config.class";
     static final String CONF_OOZIE_ACTION_FS_GLOB_MAX = 
"oozie.action.fs.glob.max";
     static final int GLOB_MAX_DEFAULT = 1000;
 
@@ -434,6 +435,10 @@ public class LauncherMapper<K1, V1, K2, V2> implements 
Mapper<K1, V1, K2, V2>, R
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new 
File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new 
File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
         System.setProperty("oozie.job.launch.time", 
getJobConf().get("oozie.job.launch.time"));
+        String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
+        if (actionConfigClass != null) {
+            System.setProperty(OOZIE_ACTION_CONFIG_CLASS, actionConfigClass);
+        }
     }
 
     // Method to execute the prepare actions

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index 1a9a194..61cec7e 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -47,9 +47,15 @@ public class MapReduceMain extends LauncherMain {
         Configuration actionConf = new Configuration(false);
         actionConf.addResource(new Path("file:///", 
System.getProperty("oozie.action.conf.xml")));
 
-        logMasking("Map-Reduce job configuration:", new HashSet<String>(), 
actionConf);
+        JobConf jobConf = new JobConf();
+        addActionConf(jobConf, actionConf);
+
+        // Run a config class if given to update the job conf
+        runConfigClass(jobConf);
+
+        logMasking("Map-Reduce job configuration:", new HashSet<String>(), 
jobConf);
 
-        String jobId = 
LauncherMainHadoopUtils.getYarnJobForMapReduceAction(actionConf);
+        String jobId = 
LauncherMainHadoopUtils.getYarnJobForMapReduceAction(jobConf);
         File idFile = new File(System.getProperty(LauncherMapper.ACTION_PREFIX 
+ LauncherMapper.ACTION_DATA_NEW_ID));
         if (jobId != null) {
             if (!idFile.exists()) {
@@ -64,7 +70,7 @@ public class MapReduceMain extends LauncherMain {
             System.out.println("Submitting Oozie action Map-Reduce job");
             System.out.println();
             // submitting job
-            RunningJob runningJob = submitJob(actionConf);
+            RunningJob runningJob = submitJob(jobConf);
 
             jobId = runningJob.getID().toString();
             writeJobIdFile(idFile, jobId);
@@ -87,12 +93,9 @@ public class MapReduceMain extends LauncherMain {
         }
     }
 
-    protected RunningJob submitJob(Configuration actionConf) throws Exception {
-        JobConf jobConf = new JobConf();
-        addActionConf(jobConf, actionConf);
-
+    protected RunningJob submitJob(JobConf jobConf) throws Exception {
         // Set for uber jar
-        String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
+        String uberJar = jobConf.get(OOZIE_MAPREDUCE_UBER_JAR);
         if (uberJar != null && uberJar.trim().length() > 0) {
             jobConf.setJar(uberJar);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfigurator.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfigurator.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfigurator.java
new file mode 100644
index 0000000..46ae700
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfigurator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Users can implement this interface to provide a class for Oozie to 
configure the MapReduce action with.  Make sure that the jar
+ * with this class is available with the action and then simply specify the 
class name in the "config-class" field in the MapReduce
+ * action XML.
+ */
+public interface OozieActionConfigurator {
+    /**
+     * This method should update the passed in configuration with additional 
changes; it will be used by the action.  If any
+     * Exceptions need to be thrown, they should be wrapped in an 
OozieActionConfiguratorException
+     *
+     * @param actionConf The action configuration
+     * @throws OozieActionConfiguratorException
+     */
+    public void configure(JobConf actionConf) throws 
OozieActionConfiguratorException;
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorException.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorException.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorException.java
new file mode 100644
index 0000000..075b5e4
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+/**
+ * Thrown by implementations of the OozieActionConfigurator class.
+ */
+@SuppressWarnings("serial")
+public class OozieActionConfiguratorException extends Exception {
+
+    private OozieActionConfiguratorException() {
+    }
+
+    public OozieActionConfiguratorException(String message) {
+        super(message);
+    }
+
+    public OozieActionConfiguratorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public OozieActionConfiguratorException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
index ca32b5f..0d38040 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
@@ -32,34 +32,32 @@ public class PipesMain extends MapReduceMain {
     }
 
     @Override
-    protected RunningJob submitJob(Configuration actionConf) throws Exception {
-        JobConf jobConf = new JobConf();
-
-        String value = actionConf.get("oozie.pipes.map");
+    protected RunningJob submitJob(JobConf jobConf) throws Exception {
+        String value = jobConf.get("oozie.pipes.map");
         if (value != null) {
             jobConf.setBoolean("hadoop.pipes.java.mapper", true);
             jobConf.set("mapred.mapper.class", value);
         }
-        value = actionConf.get("oozie.pipes.reduce");
+        value = jobConf.get("oozie.pipes.reduce");
         if (value != null) {
             jobConf.setBoolean("hadoop.pipes.java.reducer", true);
             jobConf.set("mapred.reducer.class", value);
         }
-        value = actionConf.get("oozie.pipes.inputformat");
+        value = jobConf.get("oozie.pipes.inputformat");
         if (value != null) {
             jobConf.setBoolean("hadoop.pipes.java.recordreader", true);
             jobConf.set("mapred.input.format.class", value);
         }
-        value = actionConf.get("oozie.pipes.partitioner");
+        value = jobConf.get("oozie.pipes.partitioner");
         if (value != null) {
             jobConf.set("mapred.partitioner.class", value);
         }
-        value = actionConf.get("oozie.pipes.writer");
+        value = jobConf.get("oozie.pipes.writer");
         if (value != null) {
             jobConf.setBoolean("hadoop.pipes.java.recordwriter", true);
             jobConf.set("mapred.output.format.class", value);
         }
-        value = actionConf.get("oozie.pipes.program");
+        value = jobConf.get("oozie.pipes.program");
         if (value != null) {
             jobConf.set("hadoop.pipes.executable", value);
             if (value.contains("#")) {
@@ -67,7 +65,7 @@ public class PipesMain extends MapReduceMain {
             }
         }
 
-        addActionConf(jobConf, actionConf);
+        addActionConf(jobConf, jobConf);
 
         //propagate delegation related props from launcher job to MR job
         if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorForTest.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorForTest.java
 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorForTest.java
new file mode 100644
index 0000000..ba23d95
--- /dev/null
+++ 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/OozieActionConfiguratorForTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.JobConf;
+
+public class OozieActionConfiguratorForTest implements OozieActionConfigurator 
{
+
+    @Override
+    public void configure(JobConf actionConf) throws 
OozieActionConfiguratorException {
+        if (actionConf.getBoolean("oozie.test.throw.exception", false)) {
+            throw new OozieActionConfiguratorException("doh");
+        }
+
+        actionConf.set("A", "a");
+        actionConf.set("B", "c");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/90c06857/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index d43ddca..50927ce 100644
--- 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -56,6 +56,7 @@ import java.io.StringReader;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Scanner;
 import java.util.jar.JarOutputStream;
 import java.util.regex.Pattern;
@@ -63,6 +64,7 @@ import java.util.zip.ZipEntry;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.util.PropertiesUtils;
 
 public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
 
@@ -546,6 +548,104 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         _testSubmit("map-reduce", actionXml);
     }
 
+    public void testMapReduceWithConfigClass() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        Path jobXml = new Path(getFsTestCaseDir(), "job.xml");
+        XConfiguration conf = getMapReduceConfig(inputDir.toString(), 
outputDir.toString());
+        conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, 
jobXml.toUri().toString());
+        conf.set("B", "b");
+        String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + conf.toXmlString(false)
+                + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
+
+        _testSubmit("map-reduce", actionXml);
+        Configuration conf2 = new Configuration(false);
+        conf2.addResource(fs.open(jobXml));
+        assertEquals("a", conf2.get("A"));
+        assertEquals("c", conf2.get("B"));
+    }
+
+    public void testMapReduceWithConfigClassNotFound() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + getMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false)
+                + 
"<config-class>org.apache.oozie.does.not.exist</config-class>" + 
"</map-reduce>";
+
+        Context context = createContext("map-reduce", actionXml);
+        final RunningJob launcherJob = submitAction(context);
+        waitFor(120 * 2000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+
+        final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
+                context.getProtoActionConf());
+        Properties errorProps = 
PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
+        assertEquals("An Exception occured while instantiating the action 
config class",
+                errorProps.getProperty("exception.message"));
+        
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
+    }
+
+    public void testMapReduceWithConfigClassThrowException() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        XConfiguration conf = getMapReduceConfig(inputDir.toString(), 
outputDir.toString());
+        conf.setBoolean("oozie.test.throw.exception", true);        // causes 
OozieActionConfiguratorForTest to throw an exception
+        String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + conf.toXmlString(false)
+                + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
+
+        Context context = createContext("map-reduce", actionXml);
+        final RunningJob launcherJob = submitAction(context);
+        waitFor(120 * 2000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+
+        final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
+                context.getProtoActionConf());
+        Properties errorProps = 
PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
+        assertEquals("doh", errorProps.getProperty("exception.message"));
+        
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
+    }
+
     public void testMapReduceWithCredentials() throws Exception {
         FileSystem fs = getFileSystem();
 

Reply via email to