Repository: falcon Updated Branches: refs/heads/master 0fe2762b7 -> f3b781dec
FALCON-1597 Falcon should not retry in case of an instance being manual kill from user (Sandeep Samudrala) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fac91b24 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fac91b24 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fac91b24 Branch: refs/heads/master Commit: fac91b24aa6bfba561e83e2e7bb55772b137e39e Parents: db0604d Author: Pallavi Rao <[email protected]> Authored: Mon Nov 23 12:31:59 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Nov 23 12:31:59 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../workflow/WorkflowExecutionContext.java | 12 ++++++ .../workflow/engine/AbstractWorkflowEngine.java | 2 + .../workflow/engine/OozieWorkflowEngine.java | 40 ++++++++++++++++++++ .../falcon/rerun/handler/RetryHandler.java | 4 +- .../workflow/engine/FalconWorkflowEngine.java | 5 +++ 6 files changed, 63 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f46450..8ce2e85 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -33,6 +33,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1597 Falcon should not retry in case of an instance being manual kill from user (Sandeep Samudrala via Pallavi Rao) + FALCON-1603 FeedHelperTest::testGetDateFromPath fails in some environments(Balu Vellanki via Ajay Yadava) FALCON-1372 Retention does not work in corner cases(Balu Vellanki via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 899165b..f206ff1 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -134,6 +134,18 @@ public class WorkflowExecutionContext { return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); } + public boolean isWorkflowKilledManually(){ + try { + return WorkflowEngineFactory.getWorkflowEngine(). + isWorkflowKilledByUser( + getValue(WorkflowExecutionArgs.CLUSTER_NAME), + getValue(WorkflowExecutionArgs.WORKFLOW_ID)); + } catch (Exception e) { + LOG.error("Got Error in getting error codes from actions: " + e); + } + return false; + } + public boolean hasWorkflowTimedOut() { return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS)); } http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 8b3460a..7b36b11 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -107,4 +107,6 @@ public abstract class AbstractWorkflowEngine { List<LifeCycle> lifeCycles) throws FalconException; public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException; + + public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 7262964..2211c52 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1651,4 +1651,44 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { throw new FalconException(e); } } + + @Override + public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException { + // In case of a failure, the Oozie action has an errorCode. + // In case of no errorCode in any of the actions would mean its killed by user + try { + // Check for error code in all the actions in main workflow + OozieClient oozieClient = OozieClientFactory.get(cluster); + List<WorkflowAction> wfActions = oozieClient.getJobInfo(jobId).getActions(); + for (WorkflowAction subWfAction : wfActions) { + if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) { + return false; + } + } + // Assumption taken, there are no sub workflows in user action. + String subWfId = getUserWorkflowAction(wfActions); + List<WorkflowAction> subWfActions; + // Check for error code in all the user-workflow(sub-workflow)'s actions. + if (StringUtils.isNotBlank(subWfId)) { + subWfActions = oozieClient.getJobInfo(subWfId).getActions(); + for (WorkflowAction subWfAction : subWfActions) { + if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) { + return false; + } + } + } + return true; + } catch (Exception e) { + throw new FalconException(e); + } + } + + private String getUserWorkflowAction(List<WorkflowAction> actionsList){ + for (WorkflowAction wfAction : actionsList) { + if (StringUtils.equals(wfAction.getName(), "user-action")) { + return wfAction.getExternalId(); + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index 7aa094a..84cd93f 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -106,8 +106,8 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends @Override public void onFailure(WorkflowExecutionContext context) throws FalconException { - // Re-run does not make sense on timeouts. - if (context.hasWorkflowTimedOut()) { + // Re-run does not make sense on timeouts or when killed by user. + if (context.hasWorkflowTimedOut() || context.isWorkflowKilledManually()) { return; } handleRerun(context.getClusterName(), context.getEntityType(), http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 8dcf3a5..3a5024a 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -362,5 +362,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { result.setInstances(instances); return result; } + + @Override + public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException { + throw new UnsupportedOperationException("Not yet Implemented"); + } }
