Repository: falcon Updated Branches: refs/heads/master fc27ebb84 -> 470f34313
FALCON-2113 Falcon retry happens in few cases inspite of a manual kill from the user. move it to a different state(ignore) and check for that state before retrying to stop it from retrying. Author: sandeep <[email protected]> Reviewers: @pallavi-rao, @PraveenAdlakha Closes #265 from sandeepSamudrala/FALCON-2113 and squashes the following commits: 2a00bef [sandeep] fixing failures 285c796 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2113 4f585a1 [sandeep] Incorporated review comments. Removed parent Id from passing into method to check for manual kill 2d0d9a3 [sandeep] FALCON-2113. Falcon retry happens in few cases inspite of a manual kill from the user. move it to a different state(ignore) and check for that state before retrying to stop it from retrying. 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/470f3431 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/470f3431 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/470f3431 Branch: refs/heads/master Commit: 470f34313b0bf8210f95f6969712b12e55a9e169 Parents: fc27ebb Author: sandeep <[email protected]> Authored: Wed Aug 24 13:08:04 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Aug 24 13:08:04 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/resource/InstancesResult.java | 2 +- .../workflow/engine/AbstractWorkflowEngine.java | 3 ++ .../workflow/engine/OozieWorkflowEngine.java | 37 ++++++++++++++++++-- .../resource/AbstractInstanceManager.java | 4 ++- .../workflow/engine/FalconWorkflowEngine.java | 6 ++++ 5 files changed, 47 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/client/src/main/java/org/apache/falcon/resource/InstancesResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java index f8de645..3dc74c4 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java @@ -34,7 +34,7 @@ public class InstancesResult extends APIResult { * Workflow status as being set in result object. */ public static enum WorkflowStatus { - WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED, READY + WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED, READY, KILLED_OR_IGNORED } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 0db7e9b..16a1753 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -80,6 +80,9 @@ public abstract class AbstractWorkflowEngine { public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException; + public abstract InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles) throws FalconException; + public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 06d0142..c371d69 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -588,6 +588,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override + public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.IGNORE, entity, start, end, props, lifeCycles); + } + + @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { @@ -649,7 +655,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private static enum JobAction { - KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS + KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS, IGNORE } private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException { @@ -953,6 +959,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { status = Status.KILLED.name(); break; + case IGNORE: + if (!status.equals(Status.IGNORED.name())) { + ignore(cluster, coordinatorAction.getJobId(), coordinatorAction.getActionNumber()); + } + status = mapActionStatus(Status.IGNORED.name()); + break; + case SUSPEND: if (jobInfo == null || !WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) { break; @@ -1100,8 +1113,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } else if (CoordinatorAction.Status.WAITING.toString().equals(status) || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) { return InstancesResult.WorkflowStatus.WAITING.name(); - } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) { + } else if (CoordinatorAction.Status.KILLED.toString().equals(status)) { return InstancesResult.WorkflowStatus.KILLED.name(); + } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) { + return InstancesResult.WorkflowStatus.KILLED_OR_IGNORED.name(); } else if (CoordinatorAction.Status.TIMEDOUT.toString().equals(status)) { return InstancesResult.WorkflowStatus.FAILED.name(); } else if (WorkflowJob.Status.PREP.toString().equals(status)) { @@ -1678,6 +1693,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private void ignore(String cluster, String jobId, int instanceNumber) throws FalconException { + try { + OozieClientFactory.get(cluster).ignore(jobId, String.valueOf(instanceNumber)); + assertStatus(cluster, jobId + "@" + instanceNumber, + Status.IGNORED, Status.FAILED, Status.SUCCEEDED, Status.DONEWITHERROR); + LOG.info("Ignored job {} on cluster {}", jobId, cluster); + } catch (OozieClientException e) { + throw new FalconException(e); + } + } + private void kill(String cluster, String jobId, String rangeType, String scope) throws FalconException { try { OozieClientFactory.get(cluster).kill(jobId, rangeType, scope); @@ -1791,11 +1817,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException { + // In case of a kill being issued from falcon api, the state will be moved to IGNORE // In case of a failure, the Oozie action has an errorCode. // In case of no errorCode in any of the actions would mean its killed by user try { - // Check for error code in all the actions in main workflow OozieClient oozieClient = OozieClientFactory.get(cluster); + String parentId = oozieClient.getJobInfo(jobId).getParentId(); + if (oozieClient.getCoordActionInfo(parentId).getStatus().equals(CoordinatorAction.Status.IGNORED)) { + return true; + } + // Check for error code in all the actions in main workflow List<WorkflowAction> wfActions = oozieClient.getJobInfo(jobId).getActions(); for (WorkflowAction subWfAction : wfActions) { if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) { http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 276e316..f86f097 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -580,7 +580,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { entityObject, startStr, endStr); AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); - return wfEngine.killInstances(entityObject, + wfEngine.killInstances(entityObject, + startAndEndDate.first, startAndEndDate.second, props, lifeCycles); + return wfEngine.ignoreInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); } catch (Throwable e) { LOG.error("Failed to kill instances", e); http://git-wip-us.apache.org/repos/asf/falcon/blob/470f3431/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index fe16443..9ba62a1 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -392,6 +392,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { } @Override + public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles) throws FalconException{ + throw new UnsupportedOperationException("Not yet Implemented"); + } + + @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { if (isForced == null) {
