Updated Branches: refs/heads/master 073b0eb13 -> 2694f15ca
OOZIE-1658 Add bundle, coord, wf and action related information to launched M/R jobs (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2694f15c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2694f15c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2694f15c Branch: refs/heads/master Commit: 2694f15ca6a444964a6ff8e8069d03ea2b678827 Parents: 073b0eb Author: Rohini Palaniswamy <[email protected]> Authored: Fri Jan 10 15:37:43 2014 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Fri Jan 10 15:37:43 2014 -0800 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 18 +- .../oozie/action/hadoop/OozieJobInfo.java | 142 ++++++++++ .../command/bundle/BundleStartXCommand.java | 4 + .../command/coord/CoordActionStartXCommand.java | 7 +- core/src/main/resources/oozie-default.xml | 15 +- .../oozie/action/hadoop/TestOozieJobInfo.java | 280 +++++++++++++++++++ release-log.txt | 1 + 7 files changed, 463 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 253456f..59ccfb8 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -836,6 +836,8 @@ public class JavaActionExecutor extends ActionExecutor { } JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); + + injectJobInfo(launcherJobConf, actionConf, context, action); injectLauncherCallback(context, launcherJobConf); LOG.debug("Creating Job Client for action " + action.getId()); jobClient = createJobClient(context, launcherJobConf); @@ -907,7 +909,6 @@ public class JavaActionExecutor extends ActionExecutor { } } } - private boolean needInjectCredentials() { boolean methodExists = true; @@ -1368,4 +1369,19 @@ public class JavaActionExecutor extends ActionExecutor { protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException, URISyntaxException { } + + private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) { + if (OozieJobInfo.isJobInfoEnabled()) { + try { + OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action); + String jobInfoStr = jobInfo.getJobInfo(); + launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true"); + actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false"); + } + catch (Exception e) { + // Just job info, should not impact the execution. + LOG.error("Error while populating job info", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java new file mode 100644 index 0000000..411b122 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.action.ActionExecutor.Context; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.Services; +import org.apache.oozie.util.XConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +public class OozieJobInfo { + + public static final String BUNDLE_ID = "bundle.id"; + public static final String BUNDLE_NAME = "bundle.name"; + public static final String COORD_NAME = "coord.name"; + public static final String COORD_ID = "coord.id"; + public static final String COORD_NOMINAL_TIME = "coord.nominal.time"; + public static final String WORKFLOW_ID = "wf.id"; + public static final String WORKFLOW_NAME = "wf.name"; + public static final String ACTION_TYPE = "action.type"; + public static final String ACTION_NAME = "action.name"; + public static final String JOB_INFO_KEY = "oozie.job.info"; + public static final String CONF_JOB_INFO = "oozie.action.jobinfo.enable"; + public final static String SEPARATOR = ","; + + private Context context; + XConfiguration contextConf; + private WorkflowAction action; + private Configuration actionConf; + private static boolean jobInfo = Services.get().getConf().getBoolean(OozieJobInfo.CONF_JOB_INFO, false); + + /** + * Instantiates a new oozie job info. + * + * @param jobconf the jobconf + * @param actionConf the action conf + * @param context the context + * @param action the action + * @throws IOException + */ + public OozieJobInfo(Configuration actionConf, Context context, WorkflowAction action) throws IOException { + this.context = context; + contextConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); + this.action = action; + this.actionConf = actionConf; + } + + public static boolean isJobInfoEnabled() { + return jobInfo; + } + + @VisibleForTesting + public static void setJobInfo(boolean jobInfo) { + OozieJobInfo.jobInfo = jobInfo; + } + + /** + * Get the job info. + * + * @return job info + * @throws IOException Signals that an I/O exception has occurred. + */ + public String getJobInfo() throws IOException { + StringBuffer sb = new StringBuffer(); + addBundleInfo(sb); + addCoordInfo(sb); + addWorkflowInfo(sb); + addActionInfo(sb); + addCustomInfo(sb); + return sb.toString(); + + } + + private void addBundleInfo(StringBuffer sb) throws IOException { + addJobInfo(sb, BUNDLE_ID, contextConf.get(OozieClient.BUNDLE_ID)); + addJobInfo(sb, BUNDLE_NAME, contextConf.get(OozieJobInfo.BUNDLE_NAME)); + + } + + private void addCoordInfo(StringBuffer sb) throws IOException { + addJobInfo(sb, COORD_NAME, contextConf.get(OozieJobInfo.COORD_NAME)); + addJobInfo(sb, COORD_NOMINAL_TIME, contextConf.get(OozieJobInfo.COORD_NOMINAL_TIME)); + addJobInfo(sb, COORD_ID, context.getWorkflow().getParentId()); + + } + + private void addWorkflowInfo(StringBuffer sb) { + addJobInfo(sb, WORKFLOW_ID, context.getWorkflow().getId()); + addJobInfo(sb, WORKFLOW_NAME, context.getWorkflow().getAppName()); + + } + + private void addActionInfo(StringBuffer sb) { + addJobInfo(sb, ACTION_NAME, action.getName()); + addJobInfo(sb, ACTION_TYPE, action.getType()); + } + + private void addCustomInfo(StringBuffer sb) throws IOException { + addfromConf(actionConf, sb); + } + + public void addfromConf(Configuration conf, StringBuffer sb) { + Iterator<Map.Entry<String, String>> it = conf.iterator(); + while (it.hasNext()) { + Entry<String, String> entry = it.next(); + if (entry.getKey().startsWith("oozie.job.info.")) { + addJobInfo(sb, entry.getKey().substring("oozie.job.info.".length()), entry.getValue()); + } + } + } + + private void addJobInfo(StringBuffer sb, String key, String value) { + if (value != null) { + sb.append(key).append("=").append(value).append(OozieJobInfo.SEPARATOR); + } + + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java index ef290ee..74fbcab 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java @@ -30,6 +30,7 @@ import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; +import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.JsonBean; @@ -242,6 +243,9 @@ public class BundleStartXCommand extends StartTransitionXCommand { Attribute name = coordElem.getAttribute("name"); Configuration coordConf = mergeConfig(coordElem); coordConf.set(OozieClient.BUNDLE_ID, jobId); + if (OozieJobInfo.isJobInfoEnabled()) { + coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName()); + } queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java index bf52036..1ca055d 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java @@ -18,7 +18,7 @@ package org.apache.oozie.command.coord; import org.apache.hadoop.conf.Configuration; - +import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; import org.apache.oozie.CoordinatorActionBean; @@ -176,7 +176,10 @@ public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { if(slaEvent != null) { insertList.add(slaEvent); } - + if (OozieJobInfo.isJobInfoEnabled()) { + conf.set(OozieJobInfo.COORD_NAME, appName); + conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString()); + } // Normalize workflow appPath here; JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); String wfId = dagEngine.submitJobFromCoordinator(conf, actionId); http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 1009b85..26e3f09 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1987,5 +1987,18 @@ </description> </property> - + <property> + <name>oozie.action.jobinfo.enable</name> + <value>false</value> + <description> + JobInfo will contain information of bundle, coordinator, workflow and actions. If enabled, hadoop job will have + property(oozie.job.info) which value is multiple key/value pair separated by ",". This information can be used for + analytics like how many oozie jobs are submitted for a particular period, what is the total number of failed pig jobs, + etc from mapreduce job history logs and configuration. + User can also add custom workflow property to jobinfo by adding property which prefix with "oozie.job.info." + Eg. + oozie.job.info="bundle.id=,bundle.name=,coord.name=,coord.nominal.time=,coord.name=,wf.id=, + wf.name=,action.name=,action.type=,launcher=true" + </description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java new file mode 100644 index 0000000..5aa9e6f --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.StringReader; +import java.io.Writer; +import java.util.Date; +import java.util.List; +import java.util.regex.Matcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.BundleActionBean; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.hadoop.MapReduceActionExecutor; +import org.apache.oozie.action.hadoop.MapperReducerForTest; +import org.apache.oozie.action.hadoop.OozieJobInfo; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.bundle.BundleStartXCommand; +import org.apache.oozie.command.bundle.BundleSubmitXCommand; +import org.apache.oozie.command.wf.ActionXCommand; +import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.UUIDService; +import org.apache.oozie.service.UUIDService.ApplicationType; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; + +public class TestOozieJobInfo extends XDataTestCase { + + Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testInfoWithBundle() throws Exception { + + Services.get().getConf().setBoolean(OozieJobInfo.CONF_JOB_INFO, true); + OozieJobInfo.setJobInfo(true); + BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.PREP, false); + final JPAService jpaService = Services.get().get(JPAService.class); + Configuration jobConf = null; + try { + jobConf = new XConfiguration(new StringReader(job.getConf())); + } + catch (IOException ioe) { + log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe); + throw new CommandException(ErrorCode.E1005, ioe); + } + setCoordConf(jobConf); + Path appPath = new Path(jobConf.get(OozieClient.BUNDLE_APP_PATH), "bundle.xml"); + jobConf.set(OozieClient.BUNDLE_APP_PATH, appPath.toString()); + BundleSubmitXCommand submitCmd = new BundleSubmitXCommand(jobConf); + submitCmd.call(); + BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(submitCmd.getJob().getId()); + job = jpaService.execute(bundleJobGetExecutor); + + assertEquals(job.getStatus(), Job.Status.PREP); + new BundleStartXCommand(job.getId()).call(); + sleep(2000); + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId()); + assertEquals(1, actions.size()); + final String bundleID = job.getId(); + waitFor(200000, new Predicate() { + public boolean evaluate() throws Exception { + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, bundleID); + return actions.get(0).getStatus().equals(Job.Status.RUNNING); + } + }); + + actions = BundleActionQueryExecutor.getInstance().getList(BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, + job.getId()); + final String cordID = actions.get(0).getCoordId(); + waitFor(200000, new Predicate() { + public boolean evaluate() throws Exception { + CoordJobGetJPAExecutor coordGetCmd2 = new CoordJobGetJPAExecutor(cordID); + CoordinatorJobBean cc = jpaService.execute(coordGetCmd2); + return cc.getStatus().equals(Job.Status.RUNNING); + + } + }); + + final String jobID = jpaService.execute(new WorkflowJobsGetFromCoordParentIdJPAExecutor(cordID, 1)).get(0); + final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobID); + waitFor(200000, new Predicate() { + public boolean evaluate() throws Exception { + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getName().contains("hadoop")) { + action = bean; + break; + } + } + return action.getStatus().toString().equalsIgnoreCase(Job.Status.RUNNING.toString()); + } + }); + + final WorkflowJobGetJPAExecutor wfeExc = new WorkflowJobGetJPAExecutor(jobID); + + WorkflowJobBean wfbean = jpaService.execute(wfeExc); + + List<WorkflowActionBean> actionList = jpaService.execute(actionsGetExecutor); + + ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfbean, actionList.get(1), false, + false); + MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); + JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf())); + String user = conf.get("user.name"); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + String launcherId = actionList.get(1).getExternalId(); + + final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); + FileSystem fs = context.getAppFileSystem(); + Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile()))); + String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY); + + // BUNDLE_ID;BUNDLE_NAME;COORDNITOR_NAME;COORDNITOR_NOMINAL_TIME;WORKFLOW_ID;WORKFLOW_NAME; + // ACTION_TYPE;ACTION_NAME,JOB_INFO,custom_info; + assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 11); + assertTrue(jobInfo.contains(bundleID)); + assertTrue(jobInfo.contains("bundle.name=test_bundle,")); + assertTrue(jobInfo.contains(cordID)); + assertTrue(jobInfo.contains("action.type=map-reduce")); + assertTrue(jobInfo.contains(",testing=test,")); + assertTrue(jobInfo.contains(",coord.nominal.time=")); + assertTrue(jobInfo.contains("launcher=true")); + + } + + protected void setCoordConf(Configuration jobConf) throws IOException { + Path wfAppPath = new Path(getFsTestCaseDir(), "app"); + FileSystem fs = getFileSystem(); + fs.mkdirs(new Path(wfAppPath, "lib")); + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class); + InputStream is = new FileInputStream(jarFile); + OutputStream os = fs.create(new Path(wfAppPath, "lib/test.jar")); + IOUtils.copyStream(is, os); + Path input = new Path(wfAppPath, "input"); + fs.mkdirs(input); + Writer writer = new OutputStreamWriter(fs.create(new Path(input, "test.txt"))); + writer.write("hello"); + writer.close(); + + final String APP1 = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app'>" + "<start to='hadoop'/>" + + "<action name=\"hadoop\">" + "<map-reduce>" + "<job-tracker>${jobTracker}</job-tracker>" + + "<name-node>${nameNode}</name-node>" + "<configuration>" + + "<property><name>mapred.map.tasks</name><value>1</value></property>" + + "<property><name>mapred.reduce.tasks</name><value>0</value></property>" + + "<property><name>oozie.job.info.testing</name><value>test</value></property>" + "</configuration>" + + "</map-reduce>" + "<ok to=\"end\"/>" + "<error to=\"k\"/>" + "</action>" + "<kill name=\"k\">" + + "<message>kill</message>" + "</kill><end name=\"end\"/>" + "</workflow-app>"; + Writer writer2 = new OutputStreamWriter(fs.create(new Path(wfAppPath, "workflow.xml"))); + writer2.write(APP1); + writer2.close(); + jobConf.set(OozieClient.USER_NAME, getTestUser()); + jobConf.set("myJobTracker", getJobTrackerUri()); + jobConf.set("myNameNode", getNameNodeUri()); + jobConf.set("wfAppPath", new Path(wfAppPath, "workflow.xml").toString()); + jobConf.set("mrclass", MapperReducerForTest.class.getName()); + } + + protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean pending) throws Exception { + Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); + + String coord = "<coordinator-app name='COORD-TEST' frequency='${coord:days(1)}' " + + "start=\"${START_TIME}\" end=\"${END_TIME}\" timezone=\"UTC\" " + + "xmlns=\"uri:oozie:coordinator:0.2\">" + "<controls>" + "<concurrency>2</concurrency>" + + "<execution>LIFO</execution>" + "</controls>" + "<action>" + "<workflow>" + + "<app-path>${wfAppPath}</app-path>" + "<configuration>" + "<property>" + "<name>inputA</name>" + + "<value>aaaa</value>" + "</property>" + "</configuration>" + "</workflow> " + "</action>" + + "</coordinator-app>"; + + writeToFile(coord, coordPath1, "coordinator.xml"); + + Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle"); + String bundleAppXml = "<bundle-app name='test_bundle' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + + "xmlns='uri:oozie:bundle:0.1'> " + + "<controls> <kick-off-time>2009-02-02T00:00Z</kick-off-time> </controls> " + + "<coordinator name='c12'> " + + "<app-path>#app_path1</app-path>" + + "<configuration> " + + "<property> <name>START_TIME</name> <value>2009-02-01T00:00Z</value> </property> </configuration> " + + "</coordinator></bundle-app>"; + + bundleAppXml = bundleAppXml.replaceAll("#app_path1", + Matcher.quoteReplacement(new Path(coordPath1.toString(), "coordinator.xml").toString())); + + writeToFile(bundleAppXml, bundleAppPath, "bundle.xml"); + + Configuration conf = new XConfiguration(); + conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("jobTracker", getJobTrackerUri()); + conf.set("nameNode", getNameNodeUri()); + conf.set("appName", "bundle-app-name"); + conf.set("start", "2009-02-01T00:00Z"); + conf.set("end", "2009-02-01T00:00Z"); + + conf.set("START_TIME", "2009-02-01T00:00Z"); + conf.set("END_TIME", "2009-03-01T00:00Z"); + + setCoordConf(conf); + + BundleJobBean bundle = new BundleJobBean(); + bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); + bundle.setAppName("BUNDLE-TEST"); + bundle.setAppPath(bundleAppPath.toString()); + bundle.setConf(XmlUtils.prettyPrint(conf).toString()); + bundle.setConsoleUrl("consoleUrl"); + bundle.setCreatedTime(new Date()); + bundle.setJobXml(bundleAppXml); + bundle.setLastModifiedTime(new Date()); + bundle.setOrigJobXml(bundleAppXml); + if (pending) { + bundle.setPending(); + } + else { + bundle.resetPending(); + } + bundle.setStatus(jobStatus); + bundle.setUser(conf.get(OozieClient.USER_NAME)); + return bundle; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/2694f15c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 62572e9..a1f5d73 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1658 Add bundle, coord, wf and action related information to launched M/R jobs (puru via rohini) OOZIE-1664 PollablePriorityDelayQueue.poll() returns elements with +ve delay (shwethags via rohini) OOZIE-1661 Stream logs in oozie UI (puru via rohini) OOZIE-1610 UnitTests fail on Windows because of wrong paths (omaliuvanchuk via rohini)
