Repository: incubator-falcon
Updated Branches:
  refs/heads/master 86e0ccfa0 -> 8cbd38551


FALCON-813. Expose job id for running jobs in Falcon. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/8cbd3855
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/8cbd3855
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/8cbd3855

Branch: refs/heads/master
Commit: 8cbd3855152edb809015d6c71d7dd3ece5d95582
Parents: 86e0ccf
Author: Suhas V <suha...@inmobi.com>
Authored: Thu Nov 6 14:59:04 2014 +0530
Committer: Suhas V <suha...@inmobi.com>
Committed: Thu Nov 6 14:59:04 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/falcon/client/FalconClient.java  |  7 +++
 docs/src/site/twiki/FalconCLI.twiki             |  3 +-
 .../src/site/twiki/restapi/InstanceStatus.twiki | 23 ++++++---
 .../workflow/engine/OozieWorkflowEngine.java    | 54 +++++++++++++++++---
 5 files changed, 71 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cc4a35..1321734 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -37,6 +37,7 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via 
Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-813 Expose job id for running jobs in Falcon (Suhas Vasu)
    FALCON-834 Propagate request id in the response to help trace and debug
    failures in merlin (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java 
b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 6aeb59b..7ac2981 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -961,6 +961,13 @@ public class FalconClient {
                     sb.append("\n");
                 }
 
+                if (instance.actions != null) {
+                    sb.append("actions:\n");
+                    for (InstancesResult.InstanceAction action : 
instance.actions) {
+                        sb.append(" ").append(action.getAction()).append("\t");
+                        
sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n");
+                    }
+                }
             }
         }
         sb.append("\nAdditional Information:\n");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki 
b/docs/src/site/twiki/FalconCLI.twiki
index 6470c0a..7bdac5d 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -149,7 +149,8 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> 
-name <<name>> -resume -
 ---+++Status
 
 Status option via CLI can be used to get the status of a single or multiple 
instances.  If the instance is not yet materialized but is within the process 
validity range, WAITING is returned as the state. Along with the status of the 
instance time is also returned. Log location gives the oozie workflow url
-If the instance is in WAITING state, missing dependencies are listed
+If the instance is in WAITING state, missing dependencies are listed.
+The job urls are populated for all actions of user workflow and non-succeeded 
actions of the main-workflow. The user then need not go to the underlying 
scheduler to get the job urls when needed to debug an issue in the job.
 
 Example : Suppose a process has 3 instance, one has succeeded,one is in 
running state and other one is waiting, the expected output is:
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki 
b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index ece8c3f..cebc9c8 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -27,12 +27,12 @@ Get status of a specific instance of an entity.
 
    
 ---++ Results
-Status of the specified instance.
+Status of the specified instance along with job urls for all actions of user 
workflow and non-succeeded actions of the main-workflow.
 
 ---++ Examples
 ---+++ Rest Call
 <verbatim>
-GET 
http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
+GET 
https://localhost:15443/api/instance/status/process/WordCount?start=2014-11-04T16:00Z&colo=*
 </verbatim>
 ---+++ Result
 <verbatim>
@@ -40,15 +40,22 @@ GET 
http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&star
     "instances": [
         {
             "details": "",
-            "endTime": "2013-10-21T14:40:26-07:00",
-            "startTime": "2013-10-21T14:39:56-07:00",
-            "cluster": "primary-cluster",
-            "logFile": 
"http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+            "endTime": "2014-11-05T16:08:10+05:30",
+            "startTime": "2014-11-05T16:07:29+05:30",
+            "cluster": "local",
+            "logFile": 
"http:\/\/localhost:11000\/oozie?job=0000011-141105155430303-oozie-oozi-W",
             "status": "SUCCEEDED",
-            "instance": "2012-04-03T07:00Z"
+            "instance": "2014-11-04T16:00Z",
+            "actions": [
+                {
+                    "action": "wordcount-mr",
+                    "status": "SUCCEEDED",
+                    "logFile": 
"http:\/\/localhost:50030\/jobdetails.jsp?jobid=job_201411051553_0005"
+                }
+            ]
         }
     ],
-    "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+    "requestId": "default\/b9fc3cba-1b46-4d1f-8196-52c795ea3580\n",
     "message": "default\/STATUS\n",
     "status": "SUCCEEDED"
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7032182..89bebe7 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -51,16 +51,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.*;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.client.ProxyOozieClient;
-import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +107,13 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = 
"falcon.instance.action.clusters";
     private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = 
"falcon.instance.source.clusters";
 
+    private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList(
+            "pre-processing",
+            "should-record",
+            "succeeded-post-processing",
+            "failed-post-processing"
+    );
+
     private static final String[] BUNDLE_UPDATEABLE_PROPS =
         new String[]{"parallel", "clusters.clusters[\\d+].validity.end", };
 
@@ -591,6 +591,9 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                     if (action == JobAction.PARAMS) {
                         instance.wfParams = getWFParams(jobInfo);
                     }
+                    if (action == JobAction.STATUS) {
+                        populateInstanceActions(cluster, jobInfo, instance);
+                    }
                 }
                 instance.details = coordinatorAction.getMissingDependencies();
                 instances.add(instance);
@@ -679,6 +682,41 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
         return instancesSummaryResult;
     }
 
+    private void populateInstanceActions(String cluster, WorkflowJob wfJob, 
Instance instance)
+        throws FalconException {
+
+        List<InstancesResult.InstanceAction> instanceActions = new 
ArrayList<InstancesResult.InstanceAction>();
+
+        List<WorkflowAction> wfActions = wfJob.getActions();
+        for (WorkflowAction action : wfActions) {
+            if (action.getType().equalsIgnoreCase("sub-workflow") && 
StringUtils.isNotEmpty(action.getExternalId())) {
+                List<WorkflowAction> subWorkFlowActions = 
getWorkflowInfo(cluster, action.getExternalId()).getActions();
+                for (WorkflowAction subWfAction : subWorkFlowActions) {
+                    if (!subWfAction.getType().startsWith(":")) {
+                        InstancesResult.InstanceAction instanceAction =
+                                new 
InstancesResult.InstanceAction(subWfAction.getName(),
+                                        subWfAction.getExternalStatus(), 
subWfAction.getConsoleUrl());
+                        instanceActions.add(instanceAction);
+                    }
+                }
+            } else if (!action.getType().startsWith(":")) {
+                if (PARENT_WF_ACTION_NAMES.contains(action.getName())
+                        && 
!Status.SUCCEEDED.toString().equals(action.getExternalStatus())) {
+                    InstancesResult.InstanceAction instanceAction =
+                            new 
InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                    action.getConsoleUrl());
+                    instanceActions.add(instanceAction);
+                } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())) 
{
+                    InstancesResult.InstanceAction instanceAction =
+                            new 
InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                    action.getConsoleUrl());
+                    instanceActions.add(instanceAction);
+                }
+            }
+        }
+        instance.actions = instanceActions.toArray(new 
InstancesResult.InstanceAction[instanceActions.size()]);
+    }
+
     private Map<String, String> getWFParams(WorkflowJob jobInfo) {
         Map<String, String> wfParams = new HashMap<String, String>();
         Configuration conf = new Configuration(false);

Reply via email to