Repository: falcon
Updated Branches:
  refs/heads/master 0fe2762b7 -> f3b781dec


FALCON-1597 Falcon should not retry in case of an instance being manual kill 
from user (Sandeep Samudrala)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fac91b24
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fac91b24
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fac91b24

Branch: refs/heads/master
Commit: fac91b24aa6bfba561e83e2e7bb55772b137e39e
Parents: db0604d
Author: Pallavi Rao <[email protected]>
Authored: Mon Nov 23 12:31:59 2015 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Mon Nov 23 12:31:59 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../workflow/WorkflowExecutionContext.java      | 12 ++++++
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +
 .../workflow/engine/OozieWorkflowEngine.java    | 40 ++++++++++++++++++++
 .../falcon/rerun/handler/RetryHandler.java      |  4 +-
 .../workflow/engine/FalconWorkflowEngine.java   |  5 +++
 6 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f46450..8ce2e85 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1597 Falcon should not retry in case of an instance being manual 
kill from user (Sandeep Samudrala via Pallavi Rao)
+
     FALCON-1603 FeedHelperTest::testGetDateFromPath fails in some 
environments(Balu Vellanki via Ajay Yadava)
 
     FALCON-1372 Retention does not work in corner cases(Balu Vellanki via Ajay 
Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 899165b..f206ff1 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -134,6 +134,18 @@ public class WorkflowExecutionContext {
         return 
Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
     }
 
+    public boolean isWorkflowKilledManually(){
+        try {
+            return WorkflowEngineFactory.getWorkflowEngine().
+                    isWorkflowKilledByUser(
+                            getValue(WorkflowExecutionArgs.CLUSTER_NAME),
+                            getValue(WorkflowExecutionArgs.WORKFLOW_ID));
+        } catch (Exception e) {
+            LOG.error("Got Error in getting error codes from actions: " + e);
+        }
+        return false;
+    }
+
     public boolean hasWorkflowTimedOut() {
         return 
Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS));
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
 
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 8b3460a..7b36b11 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -107,4 +107,6 @@ public abstract class AbstractWorkflowEngine {
                                                       List<LifeCycle> 
lifeCycles) throws FalconException;
 
     public abstract boolean isNotificationEnabled(String cluster, String 
jobID) throws FalconException;
+
+    public abstract Boolean isWorkflowKilledByUser(String cluster, String 
jobId) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7262964..2211c52 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1651,4 +1651,44 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
             throw new FalconException(e);
         }
     }
+
+    @Override
+    public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws 
FalconException {
+        // In case of a failure, the Oozie action has an errorCode.
+        // In case of no errorCode in any of the actions would mean its killed 
by user
+        try {
+            // Check for error code in all the actions in main workflow
+            OozieClient oozieClient = OozieClientFactory.get(cluster);
+            List<WorkflowAction> wfActions = 
oozieClient.getJobInfo(jobId).getActions();
+            for (WorkflowAction subWfAction : wfActions) {
+                if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) {
+                    return false;
+                }
+            }
+            // Assumption taken, there are no sub workflows in user action.
+            String subWfId = getUserWorkflowAction(wfActions);
+            List<WorkflowAction> subWfActions;
+            // Check for error code in all the user-workflow(sub-workflow)'s 
actions.
+            if (StringUtils.isNotBlank(subWfId)) {
+                subWfActions = oozieClient.getJobInfo(subWfId).getActions();
+                for (WorkflowAction subWfAction : subWfActions) {
+                    if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private String getUserWorkflowAction(List<WorkflowAction> actionsList){
+        for (WorkflowAction wfAction : actionsList) {
+            if (StringUtils.equals(wfAction.getName(), "user-action")) {
+                return wfAction.getExternalId();
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 7aa094a..84cd93f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -106,8 +106,8 @@ public class RetryHandler<M extends 
DelayedQueue<RetryEvent>> extends
 
     @Override
     public void onFailure(WorkflowExecutionContext context) throws 
FalconException {
-        // Re-run does not make sense on timeouts.
-        if (context.hasWorkflowTimedOut()) {
+        // Re-run does not make sense on timeouts or when killed by user.
+        if (context.hasWorkflowTimedOut() || 
context.isWorkflowKilledManually()) {
             return;
         }
         handleRerun(context.getClusterName(), context.getEntityType(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/fac91b24/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 8dcf3a5..3a5024a 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -362,5 +362,10 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
         result.setInstances(instances);
         return result;
     }
+
+    @Override
+    public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws 
FalconException {
+        throw new UnsupportedOperationException("Not yet Implemented");
+    }
 }
 

Reply via email to