Repository: helix Updated Branches: refs/heads/master 4a121ef26 -> c012c7b9d
[HELIX-743] Fix purgeExpiredJobs() so that jobs whose removal has failed do not get removed from DAG Previously, even if the job removal had failed, Task Framework would go ahead and remove the job from the DAG. This would cause some ZNodes to be left over and never be cleaned up at next purge time. Changelist: 1. Keep track of jobs whose removal failed and remove them from expiredJobs so that next call to purgeExpiredJobs(), the job would be included in expiredJobs again and removal would be tried again. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c012c7b9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c012c7b9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c012c7b9 Branch: refs/heads/master Commit: c012c7b9dea35137935ac29c035a96aea570bf9c Parents: 4a121ef Author: Hunter Lee <[email protected]> Authored: Tue Jul 24 12:20:00 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Tue Jul 24 12:20:00 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/task/WorkflowRebalancer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c012c7b9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 32a5370..165e61d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -538,18 +538,24 @@ public class WorkflowRebalancer extends TaskRebalancer { if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { Set<String> expiredJobs = TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), workflowConfig, workflowContext); - if (expiredJobs.isEmpty()) { LOG.info("No job to purge for the queue " + workflow); } else { LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); + Set<String> failedJobRemovals = new HashSet<>(); for (String job : expiredJobs) { if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) { + failedJobRemovals.add(job); LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); } _rebalanceScheduler.removeScheduledRebalance(job); } + + // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the + // removal will be tried again at next purge + expiredJobs.removeAll(failedJobRemovals); + if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) { LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
