Repository: helix
Updated Branches:
  refs/heads/master 4a121ef26 -> c012c7b9d


[HELIX-743] Fix purgeExpiredJobs() so that jobs whose removal has failed do not 
get removed from DAG

Previously, even if the job removal had failed, Task Framework would go ahead 
and remove the job from the DAG. This would cause some ZNodes to be left over 
and never be cleaned up at next purge time.
Changelist:
1. Keep track of jobs whose removal failed and remove them from expiredJobs so 
that next call to purgeExpiredJobs(), the job would be included in expiredJobs 
again and removal would be tried again.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c012c7b9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c012c7b9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c012c7b9

Branch: refs/heads/master
Commit: c012c7b9dea35137935ac29c035a96aea570bf9c
Parents: 4a121ef
Author: Hunter Lee <[email protected]>
Authored: Tue Jul 24 12:20:00 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Tue Jul 24 12:20:00 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/task/WorkflowRebalancer.java  | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c012c7b9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 32a5370..165e61d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -538,18 +538,24 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
       Set<String> expiredJobs = 
TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(),
           _manager.getHelixPropertyStore(), workflowConfig, workflowContext);
-
       if (expiredJobs.isEmpty()) {
         LOG.info("No job to purge for the queue " + workflow);
       } else {
         LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
         for (String job : expiredJobs) {
           if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(),
               job)) {
+            failedJobRemovals.add(job);
             LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
           }
           _rebalanceScheduler.removeScheduledRebalance(job);
         }
+
+        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
+
         if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), 
workflow, expiredJobs,
             true)) {
           LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs

Reply via email to