Repository: falcon
Updated Branches:
  refs/heads/master 820572d8a -> 8e79ba81a


FALCON-1864 Retry event does not get removed from delay queue

Author: Pallavi Rao <[email protected]>

Reviewers: @sandeepSamudrala, @pavankumar526, @PraveenAdlakha

Closes #76 from pallavi-rao/1864


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8e79ba81
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8e79ba81
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8e79ba81

Branch: refs/heads/master
Commit: 8e79ba81ad5e228ab235477860134ec6891d291a
Parents: 820572d
Author: Pallavi Rao <[email protected]>
Authored: Mon Mar 28 12:11:35 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Mon Mar 28 12:11:35 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/rerun/handler/LateRerunConsumer.java  |  2 +-
 .../org/apache/falcon/rerun/handler/RetryConsumer.java  | 12 ++++++++----
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index f224805..047fa0f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -60,7 +60,7 @@ public class LateRerunConsumer<T extends 
LateRerunHandler<DelayedQueue<LaterunEv
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
                     || jobStatus.equals("SUSPENDED")) {
                 LOG.debug("Re-enqueing message in LateRerunHandler for 
workflow with same delay as "
-                        + "job status is running: {}", message.getWfId());
+                        + "job status is {} for : {}", jobStatus, 
message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 836a172..4c763c2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -43,14 +43,18 @@ public class RetryConsumer<T extends 
RetryHandler<DelayedQueue<RetryEvent>>>
     protected void handleRerun(String clusterName, String jobStatus,
                                RetryEvent message, String entityType, String 
entityName) {
         try {
-            if (!jobStatus.equals("KILLED")) {
-                LOG.debug("Re-enqueing message in RetryHandler for workflow 
with same delay as job status is running:"
-                        + " {}", message.getWfId());
+            // Can happen when user does a manual re-run in-between retries.
+            if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")) {
+                LOG.debug("Re-enqueing message in RetryHandler for workflow 
with same delay as "
+                        + "job status is {} for : {}", jobStatus, 
message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;
+            } else if (jobStatus.equals("SUSPENDED") || 
jobStatus.equals("SUCCEEDED")) {
+                LOG.debug("Not retrying workflow {} anymore as it is in {} 
state. ", message.getWfId(), jobStatus);
+                return;
             }
-            LOG.info("Retrying attempt: {} out of configured: {} attempt for 
instance: {}:{} And WorkflowId: {}"
+            LOG.info("Retrying attempt: {} out of configured: {} attempts for 
instance: {}:{} And WorkflowId: {}"
                             + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), 
message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new 
Date(System.currentTimeMillis())));

Reply via email to