Repository: falcon Updated Branches: refs/heads/master 820572d8a -> 8e79ba81a
FALCON-1864 Retry event does not get removed from delay queue Author: Pallavi Rao <[email protected]> Reviewers: @sandeepSamudrala, @pavankumar526, @PraveenAdlakha Closes #76 from pallavi-rao/1864 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8e79ba81 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8e79ba81 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8e79ba81 Branch: refs/heads/master Commit: 8e79ba81ad5e228ab235477860134ec6891d291a Parents: 820572d Author: Pallavi Rao <[email protected]> Authored: Mon Mar 28 12:11:35 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Mar 28 12:11:35 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/rerun/handler/LateRerunConsumer.java | 2 +- .../org/apache/falcon/rerun/handler/RetryConsumer.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java index f224805..047fa0f 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java @@ -60,7 +60,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP") || jobStatus.equals("SUSPENDED")) { LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as " - + "job status is running: {}", message.getWfId()); + + "job status is {} for : {}", jobStatus, message.getWfId()); message.setMsgInsertTime(System.currentTimeMillis()); handler.offerToQueue(message); return; http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 836a172..4c763c2 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -43,14 +43,18 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> protected void handleRerun(String clusterName, String jobStatus, RetryEvent message, String entityType, String entityName) { try { - if (!jobStatus.equals("KILLED")) { - LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:" - + " {}", message.getWfId()); + // Can happen when user does a manual re-run in-between retries. + if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")) { + LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as " + + "job status is {} for : {}", jobStatus, message.getWfId()); message.setMsgInsertTime(System.currentTimeMillis()); handler.offerToQueue(message); return; + } else if (jobStatus.equals("SUSPENDED") || jobStatus.equals("SUCCEEDED")) { + LOG.debug("Not retrying workflow {} anymore as it is in {} state. ", message.getWfId(), jobStatus); + return; } - LOG.info("Retrying attempt: {} out of configured: {} attempt for instance: {}:{} And WorkflowId: {}" + LOG.info("Retrying attempt: {} out of configured: {} attempts for instance: {}:{} And WorkflowId: {}" + " At time: {}", (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(), message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
