Repository: oozie
Updated Branches:
  refs/heads/master 4c898349c -> f32a0fc5f


OOZIE-2346 Add sub-workflow information like the super-parent id and workflow 
depth into the 'oozie.job.info' property


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f32a0fc5
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f32a0fc5
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f32a0fc5

Branch: refs/heads/master
Commit: f32a0fc5f439fe471ef21691be7846640f40eae8
Parents: 4c89834
Author: Purshotam Shah <[email protected]>
Authored: Fri Nov 6 09:06:59 2015 -0800
Committer: Purshotam Shah <[email protected]>
Committed: Fri Nov 6 09:06:59 2015 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/client/WorkflowJob.java    |  4 +--
 .../java/org/apache/oozie/WorkflowJobBean.java  | 10 +++---
 .../oozie/action/hadoop/JavaActionExecutor.java |  3 +-
 .../oozie/action/hadoop/OozieJobInfo.java       | 25 ++++++++++++--
 .../action/oozie/SubWorkflowActionExecutor.java | 35 +++++++++++++++++++-
 .../command/coord/CoordActionStartXCommand.java |  1 +
 .../oozie/action/hadoop/TestOozieJobInfo.java   |  9 +++--
 release-log.txt                                 |  1 +
 8 files changed, 75 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/client/src/main/java/org/apache/oozie/client/WorkflowJob.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/WorkflowJob.java 
b/client/src/main/java/org/apache/oozie/client/WorkflowJob.java
index e2ad6f2..d7b7daf 100644
--- a/client/src/main/java/org/apache/oozie/client/WorkflowJob.java
+++ b/client/src/main/java/org/apache/oozie/client/WorkflowJob.java
@@ -137,9 +137,9 @@ public interface WorkflowJob {
     String getConsoleUrl();
     
     /**
-     * Return the coordinator action ID.
+     * Return the coordinator action ID or the parent workflow ID
      *
-     * @return the coordinator action ID.
+     * @return the coordinator action ID/ Parent Workflow ID.
      */
     String getParentId();
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index ef1f452..55d79a5 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -575,18 +575,20 @@ public class WorkflowJobBean implements Writable, 
WorkflowJob, JsonBean {
     }
 
     /**
-     * Return the corresponding Action ID, if any.
+     * For a sub-workflow, return the Parent Workflow ID and for a top level 
workflow
+     * return the Coordinator action id, if any.
      *
-     * @return the coordinator Action Id.
+     * @return the Parent Workflow Id/Coordinator Id if any.
      */
     public String getParentId() {
         return parentId;
     }
 
     /**
-     * Set coordinator action id
+     * Set parent id for the workflow.
+     * For a top level workflow it is the coordinator action id if submitted 
through coordinator
      *
-     * @param parentId : coordinator action id
+     * @param parentId the Parent Action id
      */
     public void setParentId(String parentId) {
         this.parentId = parentId;

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index e83b3b5..f92d18c 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -918,6 +918,8 @@ public class JavaActionExecutor extends ActionExecutor {
             // Inject Oozie job information if enabled.
             injectJobInfo(launcherJobConf, actionConf, context, action);
 
+            injectLauncherCallback(context, launcherJobConf);
+
             String jobId = context.getWorkflow().getId();
             String actionId = action.getId();
             Path actionDir = context.getActionDir();
@@ -1084,7 +1086,6 @@ public class JavaActionExecutor extends ActionExecutor {
 
             JobConf launcherJobConf = createLauncherConf(actionFs, context, 
action, actionXml, actionConf);
 
-            injectLauncherCallback(context, launcherJobConf);
             LOG.debug("Creating Job Client for action " + action.getId());
             jobClient = createJobClient(context, launcherJobConf);
             String launcherId = 
LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), 
context

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
index 4b13daa..581d3b3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
@@ -26,8 +26,10 @@ import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.action.ActionExecutor.Context;
+import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.wf.JobXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XConfiguration;
@@ -43,6 +45,8 @@ public class OozieJobInfo {
     public static final String COORD_NOMINAL_TIME = "coord.nominal.time";
     public static final String WORKFLOW_ID = "wf.id";
     public static final String WORKFLOW_NAME = "wf.name";
+    public static final String WORKFLOW_DEPTH = "wf.depth";
+    public static final String WORKFLOW_SUPER_PARENT = "wf.superparent.id";
     public static final String ACTION_TYPE = "action.type";
     public static final String ACTION_NAME = "action.name";
     public static final String JOB_INFO_KEY = "oozie.job.info";
@@ -105,14 +109,14 @@ public class OozieJobInfo {
     private void addCoordInfo(StringBuffer sb) throws IOException {
         addJobInfo(sb, COORD_NAME, contextConf.get(OozieJobInfo.COORD_NAME));
         addJobInfo(sb, COORD_NOMINAL_TIME, 
contextConf.get(OozieJobInfo.COORD_NOMINAL_TIME));
-        addJobInfo(sb, COORD_ID, context.getWorkflow().getParentId());
-
+        addJobInfo(sb, COORD_ID, contextConf.get(OozieJobInfo.COORD_ID));
     }
 
     private void addWorkflowInfo(StringBuffer sb) {
         addJobInfo(sb, WORKFLOW_ID, context.getWorkflow().getId());
         addJobInfo(sb, WORKFLOW_NAME, context.getWorkflow().getAppName());
-
+        addJobInfo(sb, WORKFLOW_DEPTH, 
contextConf.get(SubWorkflowActionExecutor.SUBWORKFLOW_DEPTH, "0"));
+        addJobInfo(sb, WORKFLOW_SUPER_PARENT, computeSuperParent());
     }
 
     private void addActionInfo(StringBuffer sb) {
@@ -140,4 +144,19 @@ public class OozieJobInfo {
         }
 
     }
+
+    private String computeSuperParent() {
+        String superParentId = 
contextConf.get(SubWorkflowActionExecutor.SUPER_PARENT_ID);
+        if (superParentId == null) {
+            // Not a sub-workflow
+            if (context.getWorkflow().getParentId() != null) {
+                // return coord id as the super parent id
+                return context.getWorkflow().getParentId();
+            } else {
+                // return the current workflow id as the super parent id.
+                return context.getWorkflow().getId();
+            }
+        }
+        return superParentId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index 854d621..33efc60 100644
--- 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.action.oozie;
 
+import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
@@ -51,8 +52,9 @@ public class SubWorkflowActionExecutor extends ActionExecutor 
{
     public static final String ACTION_TYPE = "sub-workflow";
     public static final String LOCAL = "local";
     public static final String PARENT_ID = "oozie.wf.parent.id";
+    public static final String SUPER_PARENT_ID = "oozie.wf.superparent.id";
     public static final String SUBWORKFLOW_MAX_DEPTH = 
"oozie.action.subworkflow.max.depth";
-    private static final String SUBWORKFLOW_DEPTH = 
"oozie.action.subworkflow.depth";
+    public static final String SUBWORKFLOW_DEPTH = 
"oozie.action.subworkflow.depth";
     public static final String SUBWORKFLOW_RERUN = 
"oozie.action.subworkflow.rerun";
 
     private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new 
HashSet<String>();
@@ -125,6 +127,23 @@ public class SubWorkflowActionExecutor extends 
ActionExecutor {
         conf.set(PARENT_ID, parentId);
     }
 
+    protected void injectSuperParent(WorkflowJob parentWorkflow, Configuration 
parentConf, Configuration conf) {
+        String superParentId = parentConf.get(SUPER_PARENT_ID);
+        if (superParentId == null) {
+            // This is a sub-workflow at depth 1
+            superParentId = parentWorkflow.getParentId();
+
+            // If the parent workflow is not submitted through a coordinator 
then the parentId will be the super parent id.
+            if (superParentId == null) {
+                superParentId = parentWorkflow.getId();
+            }
+            conf.set(SUPER_PARENT_ID, superParentId);
+        } else {
+            // Sub-workflow at depth 2 or more.
+            conf.set(SUPER_PARENT_ID, superParentId);
+        }
+    }
+
     protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, 
Configuration conf) throws ActionExecutorException {
         int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0);
         int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH);
@@ -165,6 +184,19 @@ public class SubWorkflowActionExecutor extends 
ActionExecutor {
                     XConfiguration.copy(parentConf, subWorkflowConf);
                 }
 
+                // Propagate coordinator and bundle info to subworkflow
+                if (OozieJobInfo.isJobInfoEnabled()) {
+                  if (parentConf.get(OozieJobInfo.COORD_ID) != null) {
+                    subWorkflowConf.set(OozieJobInfo.COORD_ID, 
parentConf.get(OozieJobInfo.COORD_ID));
+                    subWorkflowConf.set(OozieJobInfo.COORD_NAME, 
parentConf.get(OozieJobInfo.COORD_NAME));
+                    subWorkflowConf.set(OozieJobInfo.COORD_NOMINAL_TIME, 
parentConf.get(OozieJobInfo.COORD_NOMINAL_TIME));
+                  }
+                  if (parentConf.get(OozieJobInfo.BUNDLE_ID) != null) {
+                    subWorkflowConf.set(OozieJobInfo.BUNDLE_ID, 
parentConf.get(OozieJobInfo.BUNDLE_ID));
+                    subWorkflowConf.set(OozieJobInfo.BUNDLE_NAME, 
parentConf.get(OozieJobInfo.BUNDLE_NAME));
+                  }
+                }
+
                 // the proto has the necessary credentials
                 Configuration protoActionConf = context.getProtoActionConf();
                 XConfiguration.copy(protoActionConf, subWorkflowConf);
@@ -177,6 +209,7 @@ public class SubWorkflowActionExecutor extends 
ActionExecutor {
                 injectCallback(context, subWorkflowConf);
                 injectRecovery(extId, subWorkflowConf);
                 injectParent(context.getWorkflow().getId(), subWorkflowConf);
+                injectSuperParent(context.getWorkflow(), parentConf, 
subWorkflowConf);
                 verifyAndInjectSubworkflowDepth(parentConf, subWorkflowConf);
 
                 //TODO: this has to be refactored later to be done in a single 
place for REST calls and this

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
index ada8a30..45abe5a 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
@@ -199,6 +199,7 @@ public class CoordActionStartXCommand extends 
CoordinatorXCommand<Void> {
                     insertList.add(slaEvent);
                 }
                 if (OozieJobInfo.isJobInfoEnabled()) {
+                    conf.set(OozieJobInfo.COORD_ID, actionId);
                     conf.set(OozieJobInfo.COORD_NAME, appName);
                     conf.set(OozieJobInfo.COORD_NOMINAL_TIME, 
coordAction.getNominalTimestamp().toString());
                 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
index ceaef8b..5bc7d00 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
@@ -53,6 +53,7 @@ import org.apache.oozie.command.bundle.BundleStartXCommand;
 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.command.wf.JobXCommand;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
@@ -64,6 +65,7 @@ import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
@@ -171,13 +173,16 @@ public class TestOozieJobInfo extends XDataTestCase {
         Configuration jobXmlConf = new XConfiguration(fs.open(new 
Path(launcherJob.getJobFile())));
         String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY);
 
-        // 
BUNDLE_ID;BUNDLE_NAME;COORDNITOR_NAME;COORDNITOR_NOMINAL_TIME;WORKFLOW_ID;WORKFLOW_NAME;
+        // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME;
+        // WORKFLOW_ID;WORKFLOW_NAME;WORKFLOW_DEPTH;WORKFLOW_SUPERPARENT;
         // ACTION_TYPE;ACTION_NAME,JOB_INFO,custom_info;
-        assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 11);
+        assertEquals(jobInfo.split(OozieJobInfo.SEPARATOR).length, 13);
         assertTrue(jobInfo.contains(bundleID));
         assertTrue(jobInfo.contains("bundle.name=test_bundle,"));
         assertTrue(jobInfo.contains(cordID));
         assertTrue(jobInfo.contains("action.type=map-reduce"));
+        assertTrue(jobInfo.contains("wf.depth=0"));
+        assertTrue(jobInfo.contains("wf.superparent.id=" + cordID));
         assertTrue(jobInfo.contains(",testing=test,"));
         assertTrue(jobInfo.contains(",coord.nominal.time="));
         assertTrue(jobInfo.contains("launcher=true"));

http://git-wip-us.apache.org/repos/asf/oozie/blob/f32a0fc5/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d1b14a9..e6da494 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2346 Add sub-workflow information like the super-parent id and workflow 
depth into the 'oozie.job.info' property (akshayrai09 via puru)
 OOZIE-2303 Typo in documentation (lars_francke via rohini)
 OOZIE-2328 Coordinator endtime change should check if the last action is in 
database (kailongs via puru) 
 OOZIE-2367 fs delete should support skipTrash option (jaydeepvishwakarma via 
rohini)

Reply via email to