Repository: incubator-falcon Updated Branches: refs/heads/master 86e0ccfa0 -> 8cbd38551
FALCON-813. Expose job id for running jobs in Falcon. Contributed by Suhas Vasu Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/8cbd3855 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/8cbd3855 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/8cbd3855 Branch: refs/heads/master Commit: 8cbd3855152edb809015d6c71d7dd3ece5d95582 Parents: 86e0ccf Author: Suhas V <suha...@inmobi.com> Authored: Thu Nov 6 14:59:04 2014 +0530 Committer: Suhas V <suha...@inmobi.com> Committed: Thu Nov 6 14:59:04 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/falcon/client/FalconClient.java | 7 +++ docs/src/site/twiki/FalconCLI.twiki | 3 +- .../src/site/twiki/restapi/InstanceStatus.twiki | 23 ++++++--- .../workflow/engine/OozieWorkflowEngine.java | 54 +++++++++++++++++--- 5 files changed, 71 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6cc4a35..1321734 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -37,6 +37,7 @@ Trunk (Unreleased) FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS) IMPROVEMENTS + FALCON-813 Expose job id for running jobs in Falcon (Suhas Vasu) FALCON-834 Propagate request id in the response to help trace and debug failures in merlin (Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 6aeb59b..7ac2981 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -961,6 +961,13 @@ public class FalconClient { sb.append("\n"); } + if (instance.actions != null) { + sb.append("actions:\n"); + for (InstancesResult.InstanceAction action : instance.actions) { + sb.append(" ").append(action.getAction()).append("\t"); + sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n"); + } + } } } sb.append("\nAdditional Information:\n"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 6470c0a..7bdac5d 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -149,7 +149,8 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -resume - ---+++Status Status option via CLI can be used to get the status of a single or multiple instances. If the instance is not yet materialized but is within the process validity range, WAITING is returned as the state. Along with the status of the instance time is also returned. Log location gives the oozie workflow url -If the instance is in WAITING state, missing dependencies are listed +If the instance is in WAITING state, missing dependencies are listed. +The job urls are populated for all actions of user workflow and non-succeeded actions of the main-workflow. The user then need not go to the underlying scheduler to get the job urls when needed to debug an issue in the job. Example : Suppose a process has 3 instance, one has succeeded,one is in running state and other one is waiting, the expected output is: http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/docs/src/site/twiki/restapi/InstanceStatus.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki index ece8c3f..cebc9c8 100644 --- a/docs/src/site/twiki/restapi/InstanceStatus.twiki +++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki @@ -27,12 +27,12 @@ Get status of a specific instance of an entity. ---++ Results -Status of the specified instance. +Status of the specified instance along with job urls for all actions of user workflow and non-succeeded actions of the main-workflow. ---++ Examples ---+++ Rest Call <verbatim> -GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z +GET https://localhost:15443/api/instance/status/process/WordCount?start=2014-11-04T16:00Z&colo=* </verbatim> ---+++ Result <verbatim> @@ -40,15 +40,22 @@ GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&star "instances": [ { "details": "", - "endTime": "2013-10-21T14:40:26-07:00", - "startTime": "2013-10-21T14:39:56-07:00", - "cluster": "primary-cluster", - "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W", + "endTime": "2014-11-05T16:08:10+05:30", + "startTime": "2014-11-05T16:07:29+05:30", + "cluster": "local", + "logFile": "http:\/\/localhost:11000\/oozie?job=0000011-141105155430303-oozie-oozi-W", "status": "SUCCEEDED", - "instance": "2012-04-03T07:00Z" + "instance": "2014-11-04T16:00Z", + "actions": [ + { + "action": "wordcount-mr", + "status": "SUCCEEDED", + "logFile": "http:\/\/localhost:50030\/jobdetails.jsp?jobid=job_201411051553_0005" + } + ] } ], - "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n", + "requestId": "default\/b9fc3cba-1b46-4d1f-8196-52c795ea3580\n", "message": "default\/STATUS\n", "status": "SUCCEEDED" } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8cbd3855/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 7032182..89bebe7 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -51,16 +51,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.oozie.client.BundleJob; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.*; import org.apache.oozie.client.CoordinatorJob.Timeunit; -import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.apache.oozie.client.ProxyOozieClient; -import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.RestConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +107,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; + private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList( + "pre-processing", + "should-record", + "succeeded-post-processing", + "failed-post-processing" + ); + private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[]{"parallel", "clusters.clusters[\\d+].validity.end", }; @@ -591,6 +591,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { if (action == JobAction.PARAMS) { instance.wfParams = getWFParams(jobInfo); } + if (action == JobAction.STATUS) { + populateInstanceActions(cluster, jobInfo, instance); + } } instance.details = coordinatorAction.getMissingDependencies(); instances.add(instance); @@ -679,6 +682,41 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return instancesSummaryResult; } + private void populateInstanceActions(String cluster, WorkflowJob wfJob, Instance instance) + throws FalconException { + + List<InstancesResult.InstanceAction> instanceActions = new ArrayList<InstancesResult.InstanceAction>(); + + List<WorkflowAction> wfActions = wfJob.getActions(); + for (WorkflowAction action : wfActions) { + if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId())) { + List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster, action.getExternalId()).getActions(); + for (WorkflowAction subWfAction : subWorkFlowActions) { + if (!subWfAction.getType().startsWith(":")) { + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(subWfAction.getName(), + subWfAction.getExternalStatus(), subWfAction.getConsoleUrl()); + instanceActions.add(instanceAction); + } + } + } else if (!action.getType().startsWith(":")) { + if (PARENT_WF_ACTION_NAMES.contains(action.getName()) + && !Status.SUCCEEDED.toString().equals(action.getExternalStatus())) { + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), + action.getConsoleUrl()); + instanceActions.add(instanceAction); + } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())) { + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), + action.getConsoleUrl()); + instanceActions.add(instanceAction); + } + } + } + instance.actions = instanceActions.toArray(new InstancesResult.InstanceAction[instanceActions.size()]); + } + private Map<String, String> getWFParams(WorkflowJob jobInfo) { Map<String, String> wfParams = new HashMap<String, String>(); Configuration conf = new Configuration(false);