Updated Branches:
  refs/heads/master 073b0eb13 -> 2694f15ca

OOZIE-1658 Add bundle, coord, wf and action related information to launched M/R 
jobs (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2694f15c
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2694f15c
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2694f15c

Branch: refs/heads/master
Commit: 2694f15ca6a444964a6ff8e8069d03ea2b678827
Parents: 073b0eb
Author: Rohini Palaniswamy <[email protected]>
Authored: Fri Jan 10 15:37:43 2014 -0800
Committer: Rohini Palaniswamy <[email protected]>
Committed: Fri Jan 10 15:37:43 2014 -0800

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java |  18 +-
 .../oozie/action/hadoop/OozieJobInfo.java       | 142 ++++++++++
 .../command/bundle/BundleStartXCommand.java     |   4 +
 .../command/coord/CoordActionStartXCommand.java |   7 +-
 core/src/main/resources/oozie-default.xml       |  15 +-
 .../oozie/action/hadoop/TestOozieJobInfo.java   | 280 +++++++++++++++++++
 release-log.txt                                 |   1 +
 7 files changed, 463 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 253456f..59ccfb8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -836,6 +836,8 @@ public class JavaActionExecutor extends ActionExecutor {
             }
 
             JobConf launcherJobConf = createLauncherConf(actionFs, context, 
action, actionXml, actionConf);
+
+            injectJobInfo(launcherJobConf, actionConf, context, action);
             injectLauncherCallback(context, launcherJobConf);
             LOG.debug("Creating Job Client for action " + action.getId());
             jobClient = createJobClient(context, launcherJobConf);
@@ -907,7 +909,6 @@ public class JavaActionExecutor extends ActionExecutor {
             }
         }
     }
-
     private boolean needInjectCredentials() {
         boolean methodExists = true;
 
@@ -1368,4 +1369,19 @@ public class JavaActionExecutor extends ActionExecutor {
     protected void setActionCompletionData(Context context, FileSystem 
actionFs) throws IOException,
             HadoopAccessorException, URISyntaxException {
     }
+
+    private void injectJobInfo(JobConf launcherJobConf, Configuration 
actionConf, Context context, WorkflowAction action) {
+        if (OozieJobInfo.isJobInfoEnabled()) {
+            try {
+                OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, 
action);
+                String jobInfoStr = jobInfo.getJobInfo();
+                launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + 
"launcher=true");
+                actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + 
"launcher=false");
+            }
+            catch (Exception e) {
+                // Just job info, should not impact the execution.
+                LOG.error("Error while populating job info", e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
new file mode 100644
index 0000000..411b122
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.action.ActionExecutor.Context;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class OozieJobInfo {
+
+    public static final String BUNDLE_ID = "bundle.id";
+    public static final String BUNDLE_NAME = "bundle.name";
+    public static final String COORD_NAME = "coord.name";
+    public static final String COORD_ID = "coord.id";
+    public static final String COORD_NOMINAL_TIME = "coord.nominal.time";
+    public static final String WORKFLOW_ID = "wf.id";
+    public static final String WORKFLOW_NAME = "wf.name";
+    public static final String ACTION_TYPE = "action.type";
+    public static final String ACTION_NAME = "action.name";
+    public static final String JOB_INFO_KEY = "oozie.job.info";
+    public static final String CONF_JOB_INFO = "oozie.action.jobinfo.enable";
+    public final static String SEPARATOR = ",";
+
+    private Context context;
+    XConfiguration contextConf;
+    private WorkflowAction action;
+    private Configuration actionConf;
+    private static boolean jobInfo = 
Services.get().getConf().getBoolean(OozieJobInfo.CONF_JOB_INFO, false);
+
+    /**
+     * Instantiates a new oozie job info.
+     *
+     * @param jobconf the jobconf
+     * @param actionConf the action conf
+     * @param context the context
+     * @param action the action
+     * @throws IOException
+     */
+    public OozieJobInfo(Configuration actionConf, Context context, 
WorkflowAction action) throws IOException {
+        this.context = context;
+        contextConf = new XConfiguration(new 
StringReader(context.getWorkflow().getConf()));
+        this.action = action;
+        this.actionConf = actionConf;
+    }
+
+    public static boolean isJobInfoEnabled() {
+        return jobInfo;
+    }
+
+    @VisibleForTesting
+    public static void setJobInfo(boolean jobInfo) {
+        OozieJobInfo.jobInfo = jobInfo;
+    }
+
+    /**
+     * Get the job info.
+     *
+     * @return job info
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public String getJobInfo() throws IOException {
+        StringBuffer sb = new StringBuffer();
+        addBundleInfo(sb);
+        addCoordInfo(sb);
+        addWorkflowInfo(sb);
+        addActionInfo(sb);
+        addCustomInfo(sb);
+        return sb.toString();
+
+    }
+
+    private void addBundleInfo(StringBuffer sb) throws IOException {
+        addJobInfo(sb, BUNDLE_ID, contextConf.get(OozieClient.BUNDLE_ID));
+        addJobInfo(sb, BUNDLE_NAME, contextConf.get(OozieJobInfo.BUNDLE_NAME));
+
+    }
+
+    private void addCoordInfo(StringBuffer sb) throws IOException {
+        addJobInfo(sb, COORD_NAME, contextConf.get(OozieJobInfo.COORD_NAME));
+        addJobInfo(sb, COORD_NOMINAL_TIME, 
contextConf.get(OozieJobInfo.COORD_NOMINAL_TIME));
+        addJobInfo(sb, COORD_ID, context.getWorkflow().getParentId());
+
+    }
+
+    private void addWorkflowInfo(StringBuffer sb) {
+        addJobInfo(sb, WORKFLOW_ID, context.getWorkflow().getId());
+        addJobInfo(sb, WORKFLOW_NAME, context.getWorkflow().getAppName());
+
+    }
+
+    private void addActionInfo(StringBuffer sb) {
+        addJobInfo(sb, ACTION_NAME, action.getName());
+        addJobInfo(sb, ACTION_TYPE, action.getType());
+    }
+
+    private void addCustomInfo(StringBuffer sb) throws IOException {
+        addfromConf(actionConf, sb);
+    }
+
+    public void addfromConf(Configuration conf, StringBuffer sb) {
+        Iterator<Map.Entry<String, String>> it = conf.iterator();
+        while (it.hasNext()) {
+            Entry<String, String> entry = it.next();
+            if (entry.getKey().startsWith("oozie.job.info.")) {
+                addJobInfo(sb, 
entry.getKey().substring("oozie.job.info.".length()), entry.getValue());
+            }
+        }
+    }
+
+    private void addJobInfo(StringBuffer sb, String key, String value) {
+        if (value != null) {
+            
sb.append(key).append("=").append(value).append(OozieJobInfo.SEPARATOR);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
index ef290ee..74fbcab 100644
--- 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
@@ -30,6 +30,7 @@ import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
+import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.JsonBean;
@@ -242,6 +243,9 @@ public class BundleStartXCommand extends 
StartTransitionXCommand {
                     Attribute name = coordElem.getAttribute("name");
                     Configuration coordConf = mergeConfig(coordElem);
                     coordConf.set(OozieClient.BUNDLE_ID, jobId);
+                    if (OozieJobInfo.isJobInfoEnabled()) {
+                        coordConf.set(OozieJobInfo.BUNDLE_NAME, 
bundleJob.getAppName());
+                    }
 
                     queue(new CoordSubmitXCommand(coordConf, 
bundleJob.getId(), name.getValue()));
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
index bf52036..1ca055d 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
@@ -18,7 +18,7 @@
 package org.apache.oozie.command.coord;
 
 import org.apache.hadoop.conf.Configuration;
-
+import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.CoordinatorActionBean;
@@ -176,7 +176,10 @@ public class CoordActionStartXCommand extends 
CoordinatorXCommand<Void> {
                 if(slaEvent != null) {
                     insertList.add(slaEvent);
                 }
-
+                if (OozieJobInfo.isJobInfoEnabled()) {
+                    conf.set(OozieJobInfo.COORD_NAME, appName);
+                    conf.set(OozieJobInfo.COORD_NOMINAL_TIME, 
coordAction.getNominalTimestamp().toString());
+                }
                 // Normalize workflow appPath here;
                 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), 
conf.get(OozieClient.GROUP_NAME), conf);
                 String wfId = dagEngine.submitJobFromCoordinator(conf, 
actionId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 1009b85..26e3f09 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1987,5 +1987,18 @@
         </description>
     </property>
 
-
+    <property>
+        <name>oozie.action.jobinfo.enable</name>
+        <value>false</value>
+        <description>
+        JobInfo will contain information of bundle, coordinator, workflow and 
actions. If enabled, hadoop job will have
+        property(oozie.job.info) which value is multiple key/value pair 
separated by ",". This information can be used for
+        analytics like how many oozie jobs are submitted for a particular 
period, what is the total number of failed pig jobs,
+        etc from mapreduce job history logs and configuration.
+        User can also add custom workflow property to jobinfo by adding 
property which prefix with "oozie.job.info."
+        Eg.
+        
oozie.job.info="bundle.id=,bundle.name=,coord.name=,coord.nominal.time=,coord.name=,wf.id=,
+        wf.name=,action.name=,action.type=,launcher=true"
+        </description>
+    </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
new file mode 100644
index 0000000..5aa9e6f
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.StringReader;
+import java.io.Writer;
+import java.util.Date;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
+import org.apache.oozie.action.hadoop.MapperReducerForTest;
+import org.apache.oozie.action.hadoop.OozieJobInfo;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.bundle.BundleStartXCommand;
+import org.apache.oozie.command.bundle.BundleSubmitXCommand;
+import org.apache.oozie.command.wf.ActionXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
+import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.service.UUIDService.ApplicationType;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+
+public class TestOozieJobInfo extends XDataTestCase {
+
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testInfoWithBundle() throws Exception {
+
+        Services.get().getConf().setBoolean(OozieJobInfo.CONF_JOB_INFO, true);
+        OozieJobInfo.setJobInfo(true);
+        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.PREP, 
false);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        Configuration jobConf = null;
+        try {
+            jobConf = new XConfiguration(new StringReader(job.getConf()));
+        }
+        catch (IOException ioe) {
+            log.warn("Configuration parse error. read from DB :" + 
job.getConf(), ioe);
+            throw new CommandException(ErrorCode.E1005, ioe);
+        }
+        setCoordConf(jobConf);
+        Path appPath = new Path(jobConf.get(OozieClient.BUNDLE_APP_PATH), 
"bundle.xml");
+        jobConf.set(OozieClient.BUNDLE_APP_PATH, appPath.toString());
+        BundleSubmitXCommand submitCmd = new BundleSubmitXCommand(jobConf);
+        submitCmd.call();
+        BundleJobGetJPAExecutor bundleJobGetExecutor = new 
BundleJobGetJPAExecutor(submitCmd.getJob().getId());
+        job = jpaService.execute(bundleJobGetExecutor);
+
+        assertEquals(job.getStatus(), Job.Status.PREP);
+        new BundleStartXCommand(job.getId()).call();
+        sleep(2000);
+        List<BundleActionBean> actions = 
BundleActionQueryExecutor.getInstance().getList(
+                BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId());
+        assertEquals(1, actions.size());
+        final String bundleID = job.getId();
+        waitFor(200000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<BundleActionBean> actions = 
BundleActionQueryExecutor.getInstance().getList(
+                        BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, 
bundleID);
+                return actions.get(0).getStatus().equals(Job.Status.RUNNING);
+            }
+        });
+
+        actions = 
BundleActionQueryExecutor.getInstance().getList(BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE,
+                job.getId());
+        final String cordID = actions.get(0).getCoordId();
+        waitFor(200000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordJobGetJPAExecutor coordGetCmd2 = new 
CoordJobGetJPAExecutor(cordID);
+                CoordinatorJobBean cc = jpaService.execute(coordGetCmd2);
+                return cc.getStatus().equals(Job.Status.RUNNING);
+
+            }
+        });
+
+        final String jobID = jpaService.execute(new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(cordID, 1)).get(0);
+        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new 
WorkflowActionsGetForJobJPAExecutor(jobID);
+        waitFor(200000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<WorkflowActionBean> actions = 
jpaService.execute(actionsGetExecutor);
+                WorkflowActionBean action = null;
+                for (WorkflowActionBean bean : actions) {
+                    if (bean.getName().contains("hadoop")) {
+                        action = bean;
+                        break;
+                    }
+                }
+                return 
action.getStatus().toString().equalsIgnoreCase(Job.Status.RUNNING.toString());
+            }
+        });
+
+        final WorkflowJobGetJPAExecutor wfeExc = new 
WorkflowJobGetJPAExecutor(jobID);
+
+        WorkflowJobBean wfbean = jpaService.execute(wfeExc);
+
+        List<WorkflowActionBean> actionList = 
jpaService.execute(actionsGetExecutor);
+
+        ActionExecutorContext context = new 
ActionXCommand.ActionExecutorContext(wfbean, actionList.get(1), false,
+                false);
+        MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+        JobConf conf = actionExecutor.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionList.get(1).getConf()));
+        String user = conf.get("user.name");
+        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+        String launcherId = actionList.get(1).getExternalId();
+
+        final RunningJob launcherJob = 
jobClient.getJob(JobID.forName(launcherId));
+        FileSystem fs = context.getAppFileSystem();
+        Configuration jobXmlConf = new XConfiguration(fs.open(new 
Path(launcherJob.getJobFile())));
+        String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY);
+
+        // 
BUNDLE_ID;BUNDLE_NAME;COORDNITOR_NAME;COORDNITOR_NOMINAL_TIME;WORKFLOW_ID;WORKFLOW_NAME;
+        // ACTION_TYPE;ACTION_NAME,JOB_INFO,custom_info;
+        assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 11);
+        assertTrue(jobInfo.contains(bundleID));
+        assertTrue(jobInfo.contains("bundle.name=test_bundle,"));
+        assertTrue(jobInfo.contains(cordID));
+        assertTrue(jobInfo.contains("action.type=map-reduce"));
+        assertTrue(jobInfo.contains(",testing=test,"));
+        assertTrue(jobInfo.contains(",coord.nominal.time="));
+        assertTrue(jobInfo.contains("launcher=true"));
+
+    }
+
+    protected void setCoordConf(Configuration jobConf) throws IOException {
+        Path wfAppPath = new Path(getFsTestCaseDir(), "app");
+        FileSystem fs = getFileSystem();
+        fs.mkdirs(new Path(wfAppPath, "lib"));
+        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), 
"test.jar", MapperReducerForTest.class);
+        InputStream is = new FileInputStream(jarFile);
+        OutputStream os = fs.create(new Path(wfAppPath, "lib/test.jar"));
+        IOUtils.copyStream(is, os);
+        Path input = new Path(wfAppPath, "input");
+        fs.mkdirs(input);
+        Writer writer = new OutputStreamWriter(fs.create(new Path(input, 
"test.txt")));
+        writer.write("hello");
+        writer.close();
+
+        final String APP1 = "<workflow-app xmlns='uri:oozie:workflow:0.1' 
name='app'>" + "<start to='hadoop'/>"
+                + "<action name=\"hadoop\">" + "<map-reduce>" + 
"<job-tracker>${jobTracker}</job-tracker>"
+                + "<name-node>${nameNode}</name-node>" + "<configuration>"
+                + 
"<property><name>mapred.map.tasks</name><value>1</value></property>"
+                + 
"<property><name>mapred.reduce.tasks</name><value>0</value></property>"
+                + 
"<property><name>oozie.job.info.testing</name><value>test</value></property>" + 
"</configuration>"
+                + "</map-reduce>" + "<ok to=\"end\"/>" + "<error to=\"k\"/>" + 
"</action>" + "<kill name=\"k\">"
+                + "<message>kill</message>" + "</kill><end name=\"end\"/>" + 
"</workflow-app>";
+        Writer writer2 = new OutputStreamWriter(fs.create(new Path(wfAppPath, 
"workflow.xml")));
+        writer2.write(APP1);
+        writer2.close();
+        jobConf.set(OozieClient.USER_NAME, getTestUser());
+        jobConf.set("myJobTracker", getJobTrackerUri());
+        jobConf.set("myNameNode", getNameNodeUri());
+        jobConf.set("wfAppPath", new Path(wfAppPath, 
"workflow.xml").toString());
+        jobConf.set("mrclass", MapperReducerForTest.class.getName());
+    }
+
+    protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean 
pending) throws Exception {
+        Path coordPath1 = new Path(getFsTestCaseDir(), "coord1");
+
+        String coord = "<coordinator-app name='COORD-TEST' 
frequency='${coord:days(1)}' "
+                + "start=\"${START_TIME}\" end=\"${END_TIME}\" 
timezone=\"UTC\" "
+                + "xmlns=\"uri:oozie:coordinator:0.2\">" + "<controls>" + 
"<concurrency>2</concurrency>"
+                + "<execution>LIFO</execution>" + "</controls>" + "<action>" + 
"<workflow>"
+                + "<app-path>${wfAppPath}</app-path>" + "<configuration>" + 
"<property>" + "<name>inputA</name>"
+                + "<value>aaaa</value>" + "</property>" + "</configuration>" + 
"</workflow> " + "</action>"
+                + "</coordinator-app>";
+
+        writeToFile(coord, coordPath1, "coordinator.xml");
+
+        Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle");
+        String bundleAppXml = "<bundle-app name='test_bundle' 
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' "
+                + "xmlns='uri:oozie:bundle:0.1'> "
+                + "<controls> <kick-off-time>2009-02-02T00:00Z</kick-off-time> 
</controls> "
+                + "<coordinator name='c12'> "
+                + "<app-path>#app_path1</app-path>"
+                + "<configuration> "
+                + "<property> <name>START_TIME</name> 
<value>2009-02-01T00:00Z</value> </property> </configuration> "
+                + "</coordinator></bundle-app>";
+
+        bundleAppXml = bundleAppXml.replaceAll("#app_path1",
+                Matcher.quoteReplacement(new Path(coordPath1.toString(), 
"coordinator.xml").toString()));
+
+        writeToFile(bundleAppXml, bundleAppPath, "bundle.xml");
+
+        Configuration conf = new XConfiguration();
+        conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("jobTracker", getJobTrackerUri());
+        conf.set("nameNode", getNameNodeUri());
+        conf.set("appName", "bundle-app-name");
+        conf.set("start", "2009-02-01T00:00Z");
+        conf.set("end", "2009-02-01T00:00Z");
+
+        conf.set("START_TIME", "2009-02-01T00:00Z");
+        conf.set("END_TIME", "2009-03-01T00:00Z");
+
+        setCoordConf(conf);
+
+        BundleJobBean bundle = new BundleJobBean();
+        
bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE));
+        bundle.setAppName("BUNDLE-TEST");
+        bundle.setAppPath(bundleAppPath.toString());
+        bundle.setConf(XmlUtils.prettyPrint(conf).toString());
+        bundle.setConsoleUrl("consoleUrl");
+        bundle.setCreatedTime(new Date());
+        bundle.setJobXml(bundleAppXml);
+        bundle.setLastModifiedTime(new Date());
+        bundle.setOrigJobXml(bundleAppXml);
+        if (pending) {
+            bundle.setPending();
+        }
+        else {
+            bundle.resetPending();
+        }
+        bundle.setStatus(jobStatus);
+        bundle.setUser(conf.get(OozieClient.USER_NAME));
+        return bundle;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 62572e9..a1f5d73 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1658 Add bundle, coord, wf and action related information to launched 
M/R jobs (puru via rohini)
 OOZIE-1664 PollablePriorityDelayQueue.poll() returns elements with +ve delay 
(shwethags via rohini)
 OOZIE-1661 Stream logs in oozie UI (puru via rohini)
 OOZIE-1610 UnitTests fail on Windows because of wrong paths (omaliuvanchuk via 
rohini)

Reply via email to