Repository: oozie Updated Branches: refs/heads/master 4c898349c -> f32a0fc5f
OOZIE-2346 Add sub-workflow information like the super-parent id and workflow depth into the 'oozie.job.info' property Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f32a0fc5 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f32a0fc5 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f32a0fc5 Branch: refs/heads/master Commit: f32a0fc5f439fe471ef21691be7846640f40eae8 Parents: 4c89834 Author: Purshotam Shah <[email protected]> Authored: Fri Nov 6 09:06:59 2015 -0800 Committer: Purshotam Shah <[email protected]> Committed: Fri Nov 6 09:06:59 2015 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/client/WorkflowJob.java | 4 +-- .../java/org/apache/oozie/WorkflowJobBean.java | 10 +++--- .../oozie/action/hadoop/JavaActionExecutor.java | 3 +- .../oozie/action/hadoop/OozieJobInfo.java | 25 ++++++++++++-- .../action/oozie/SubWorkflowActionExecutor.java | 35 +++++++++++++++++++- .../command/coord/CoordActionStartXCommand.java | 1 + .../oozie/action/hadoop/TestOozieJobInfo.java | 9 +++-- release-log.txt | 1 + 8 files changed, 75 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/client/src/main/java/org/apache/oozie/client/WorkflowJob.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/WorkflowJob.java b/client/src/main/java/org/apache/oozie/client/WorkflowJob.java index e2ad6f2..d7b7daf 100644 --- a/client/src/main/java/org/apache/oozie/client/WorkflowJob.java +++ b/client/src/main/java/org/apache/oozie/client/WorkflowJob.java @@ -137,9 +137,9 @@ public interface WorkflowJob { String getConsoleUrl(); /** - * Return the coordinator action ID. + * Return the coordinator action ID or the parent workflow ID * - * @return the coordinator action ID. + * @return the coordinator action ID/ Parent Workflow ID. */ String getParentId(); http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/WorkflowJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java index ef1f452..55d79a5 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java @@ -575,18 +575,20 @@ public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean { } /** - * Return the corresponding Action ID, if any. + * For a sub-workflow, return the Parent Workflow ID and for a top level workflow + * return the Coordinator action id, if any. * - * @return the coordinator Action Id. + * @return the Parent Workflow Id/Coordinator Id if any. */ public String getParentId() { return parentId; } /** - * Set coordinator action id + * Set parent id for the workflow. + * For a top level workflow it is the coordinator action id if submitted through coordinator * - * @param parentId : coordinator action id + * @param parentId the Parent Action id */ public void setParentId(String parentId) { this.parentId = parentId; http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index e83b3b5..f92d18c 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -918,6 +918,8 @@ public class JavaActionExecutor extends ActionExecutor { // Inject Oozie job information if enabled. injectJobInfo(launcherJobConf, actionConf, context, action); + injectLauncherCallback(context, launcherJobConf); + String jobId = context.getWorkflow().getId(); String actionId = action.getId(); Path actionDir = context.getActionDir(); @@ -1084,7 +1086,6 @@ public class JavaActionExecutor extends ActionExecutor { JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); - injectLauncherCallback(context, launcherJobConf); LOG.debug("Creating Job Client for action " + action.getId()); jobClient = createJobClient(context, launcherJobConf); String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java index 4b13daa..581d3b3 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java @@ -26,8 +26,10 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.action.ActionExecutor.Context; +import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.command.wf.JobXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.Services; import org.apache.oozie.util.XConfiguration; @@ -43,6 +45,8 @@ public class OozieJobInfo { public static final String COORD_NOMINAL_TIME = "coord.nominal.time"; public static final String WORKFLOW_ID = "wf.id"; public static final String WORKFLOW_NAME = "wf.name"; + public static final String WORKFLOW_DEPTH = "wf.depth"; + public static final String WORKFLOW_SUPER_PARENT = "wf.superparent.id"; public static final String ACTION_TYPE = "action.type"; public static final String ACTION_NAME = "action.name"; public static final String JOB_INFO_KEY = "oozie.job.info"; @@ -105,14 +109,14 @@ public class OozieJobInfo { private void addCoordInfo(StringBuffer sb) throws IOException { addJobInfo(sb, COORD_NAME, contextConf.get(OozieJobInfo.COORD_NAME)); addJobInfo(sb, COORD_NOMINAL_TIME, contextConf.get(OozieJobInfo.COORD_NOMINAL_TIME)); - addJobInfo(sb, COORD_ID, context.getWorkflow().getParentId()); - + addJobInfo(sb, COORD_ID, contextConf.get(OozieJobInfo.COORD_ID)); } private void addWorkflowInfo(StringBuffer sb) { addJobInfo(sb, WORKFLOW_ID, context.getWorkflow().getId()); addJobInfo(sb, WORKFLOW_NAME, context.getWorkflow().getAppName()); - + addJobInfo(sb, WORKFLOW_DEPTH, contextConf.get(SubWorkflowActionExecutor.SUBWORKFLOW_DEPTH, "0")); + addJobInfo(sb, WORKFLOW_SUPER_PARENT, computeSuperParent()); } private void addActionInfo(StringBuffer sb) { @@ -140,4 +144,19 @@ public class OozieJobInfo { } } + + private String computeSuperParent() { + String superParentId = contextConf.get(SubWorkflowActionExecutor.SUPER_PARENT_ID); + if (superParentId == null) { + // Not a sub-workflow + if (context.getWorkflow().getParentId() != null) { + // return coord id as the super parent id + return context.getWorkflow().getParentId(); + } else { + // return the current workflow id as the super parent id. + return context.getWorkflow().getId(); + } + } + return superParentId; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java index 854d621..33efc60 100644 --- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.action.oozie; +import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; @@ -51,8 +52,9 @@ public class SubWorkflowActionExecutor extends ActionExecutor { public static final String ACTION_TYPE = "sub-workflow"; public static final String LOCAL = "local"; public static final String PARENT_ID = "oozie.wf.parent.id"; + public static final String SUPER_PARENT_ID = "oozie.wf.superparent.id"; public static final String SUBWORKFLOW_MAX_DEPTH = "oozie.action.subworkflow.max.depth"; - private static final String SUBWORKFLOW_DEPTH = "oozie.action.subworkflow.depth"; + public static final String SUBWORKFLOW_DEPTH = "oozie.action.subworkflow.depth"; public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun"; private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); @@ -125,6 +127,23 @@ public class SubWorkflowActionExecutor extends ActionExecutor { conf.set(PARENT_ID, parentId); } + protected void injectSuperParent(WorkflowJob parentWorkflow, Configuration parentConf, Configuration conf) { + String superParentId = parentConf.get(SUPER_PARENT_ID); + if (superParentId == null) { + // This is a sub-workflow at depth 1 + superParentId = parentWorkflow.getParentId(); + + // If the parent workflow is not submitted through a coordinator then the parentId will be the super parent id. + if (superParentId == null) { + superParentId = parentWorkflow.getId(); + } + conf.set(SUPER_PARENT_ID, superParentId); + } else { + // Sub-workflow at depth 2 or more. + conf.set(SUPER_PARENT_ID, superParentId); + } + } + protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, Configuration conf) throws ActionExecutorException { int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0); int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH); @@ -165,6 +184,19 @@ public class SubWorkflowActionExecutor extends ActionExecutor { XConfiguration.copy(parentConf, subWorkflowConf); } + // Propagate coordinator and bundle info to subworkflow + if (OozieJobInfo.isJobInfoEnabled()) { + if (parentConf.get(OozieJobInfo.COORD_ID) != null) { + subWorkflowConf.set(OozieJobInfo.COORD_ID, parentConf.get(OozieJobInfo.COORD_ID)); + subWorkflowConf.set(OozieJobInfo.COORD_NAME, parentConf.get(OozieJobInfo.COORD_NAME)); + subWorkflowConf.set(OozieJobInfo.COORD_NOMINAL_TIME, parentConf.get(OozieJobInfo.COORD_NOMINAL_TIME)); + } + if (parentConf.get(OozieJobInfo.BUNDLE_ID) != null) { + subWorkflowConf.set(OozieJobInfo.BUNDLE_ID, parentConf.get(OozieJobInfo.BUNDLE_ID)); + subWorkflowConf.set(OozieJobInfo.BUNDLE_NAME, parentConf.get(OozieJobInfo.BUNDLE_NAME)); + } + } + // the proto has the necessary credentials Configuration protoActionConf = context.getProtoActionConf(); XConfiguration.copy(protoActionConf, subWorkflowConf); @@ -177,6 +209,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor { injectCallback(context, subWorkflowConf); injectRecovery(extId, subWorkflowConf); injectParent(context.getWorkflow().getId(), subWorkflowConf); + injectSuperParent(context.getWorkflow(), parentConf, subWorkflowConf); verifyAndInjectSubworkflowDepth(parentConf, subWorkflowConf); //TODO: this has to be refactored later to be done in a single place for REST calls and this http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java index ada8a30..45abe5a 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java @@ -199,6 +199,7 @@ public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { insertList.add(slaEvent); } if (OozieJobInfo.isJobInfoEnabled()) { + conf.set(OozieJobInfo.COORD_ID, actionId); conf.set(OozieJobInfo.COORD_NAME, appName); conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java index ceaef8b..5bc7d00 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java @@ -53,6 +53,7 @@ import org.apache.oozie.command.bundle.BundleStartXCommand; import org.apache.oozie.command.bundle.BundleSubmitXCommand; import org.apache.oozie.command.wf.ActionXCommand; import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; +import org.apache.oozie.command.wf.JobXCommand; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; @@ -64,6 +65,7 @@ import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.UUIDService.ApplicationType; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.IOUtils; @@ -171,13 +173,16 @@ public class TestOozieJobInfo extends XDataTestCase { Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile()))); String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY); - // BUNDLE_ID;BUNDLE_NAME;COORDNITOR_NAME;COORDNITOR_NOMINAL_TIME;WORKFLOW_ID;WORKFLOW_NAME; + // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME; + // WORKFLOW_ID;WORKFLOW_NAME;WORKFLOW_DEPTH;WORKFLOW_SUPERPARENT; // ACTION_TYPE;ACTION_NAME,JOB_INFO,custom_info; - assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 11); + assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 13); assertTrue(jobInfo.contains(bundleID)); assertTrue(jobInfo.contains("bundle.name=test_bundle,")); assertTrue(jobInfo.contains(cordID)); assertTrue(jobInfo.contains("action.type=map-reduce")); + assertTrue(jobInfo.contains("wf.depth=0")); + assertTrue(jobInfo.contains("wf.superparent.id=" + cordID)); assertTrue(jobInfo.contains(",testing=test,")); assertTrue(jobInfo.contains(",coord.nominal.time=")); assertTrue(jobInfo.contains("launcher=true")); http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d1b14a9..e6da494 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2346 Add sub-workflow information like the super-parent id and workflow depth into the 'oozie.job.info' property (akshayrai09 via puru) OOZIE-2303 Typo in documentation (lars_francke via rohini) OOZIE-2328 Coordinator endtime change should check if the last action is in database (kailongs via puru) OOZIE-2367 fs delete should support skipTrash option (jaydeepvishwakarma via rohini)
