This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3501111a3b9cae27bb7b875fadbf49406a4b6cd0 Author: Neal Sun <[email protected]> AuthorDate: Wed Jul 29 10:32:59 2020 -0700 Recover Workflow GC Logic (#1181) Recover Workflow Garbage Collection Logic Recover Workflow Garbage Collection Logic --- .../helix/controller/stages/AttributeName.java | 6 +- .../stages/TaskGarbageCollectionStage.java | 95 +++++++++-- .../main/java/org/apache/helix/task/TaskUtil.java | 173 ++++++++++++--------- .../org/apache/helix/task/WorkflowDispatcher.java | 14 -- .../helix/controller/stages/TestTaskStage.java | 89 +++++++++-- .../integration/task/TestJobQueueCleanUp.java | 19 ++- .../task/TestWorkflowContextWithoutConfig.java | 66 ++++++++ .../java/org/apache/helix/task/TestTaskUtil.java | 95 +++++++++++ 8 files changed, 438 insertions(+), 119 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java index 589988f..9a0bbb6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java @@ -40,5 +40,9 @@ public enum AttributeName { PipelineType, LastRebalanceFinishTimeStamp, ControllerDataProvider, - STATEFUL_REBALANCER + STATEFUL_REBALANCER, + // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions. + TO_BE_PURGED_WORKFLOWS, + // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions. + TO_BE_PURGED_JOBS_MAP } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java index 7eb0db9..915cba1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java @@ -19,7 +19,10 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.helix.HelixManager; @@ -29,9 +32,12 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage { private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class); private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler(); @@ -42,34 +48,89 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage { } @Override - public void execute(ClusterEvent event) { - WorkflowControllerDataProvider dataProvider = - event.getAttribute(AttributeName.ControllerDataProvider.name()); + public void process(ClusterEvent event) throws Exception { + // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed. + // This is to avoid race conditions since the cache will be modified. After this work, then the + // async work will happen. HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); - - if (dataProvider == null || manager == null) { + if (manager == null) { LOG.warn( - "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.", + "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.", event.getEventId(), event.getEventType(), event.getClusterName()); return; } - Set<WorkflowConfig> existingWorkflows = - new HashSet<>(dataProvider.getWorkflowConfigMap().values()); - for (WorkflowConfig workflowConfig : existingWorkflows) { - // clean up the expired jobs if it is a queue. + Map<String, Set<String>> expiredJobsMap = new HashMap<>(); + Set<String> workflowsToBePurged = new HashSet<>(); + WorkflowControllerDataProvider dataProvider = + event.getAttribute(AttributeName.ControllerDataProvider.name()); + for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) { + WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey()); if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig .isJobQueue())) { - try { - TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig, - dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager, - _rebalanceScheduler); - } catch (Exception e) { - LOG.warn(String.format("Failed to purge job for workflow %s with reason %s", - workflowConfig.getWorkflowId(), e.toString())); + WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey()); + if (workflowContext == null) { + continue; } + long purgeInterval = workflowConfig.getJobPurgeInterval(); + long currentTime = System.currentTimeMillis(); + long nextPurgeTime = workflowContext.getLastJobPurgeTime() + purgeInterval; + if (purgeInterval > 0 && nextPurgeTime <= currentTime) { + nextPurgeTime = currentTime + purgeInterval; + // Find jobs that are ready to be purged + Set<String> expiredJobs = + TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext); + if (!expiredJobs.isEmpty()) { + expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs); + } + } + scheduleNextJobPurge(workflowConfig.getWorkflowId(), nextPurgeTime, _rebalanceScheduler, + manager); + } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId() + .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) { + // Find workflows that need to be purged + workflowsToBePurged.add(entry.getKey()); } } + event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(), + Collections.unmodifiableMap(expiredJobsMap)); + event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(), + Collections.unmodifiableSet(workflowsToBePurged)); + + super.process(event); + } + @Override + public void execute(ClusterEvent event) { + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + if (manager == null) { + LOG.warn( + "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.", + event.getEventId(), event.getEventType(), event.getClusterName()); + return; + } + + Map<String, Set<String>> expiredJobsMap = + event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name()); + Set<String> toBePurgedWorkflows = + event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name()); + + for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) { + try { + TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager, _rebalanceScheduler); + } catch (Exception e) { + LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e); + } + } + + TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager); + } + + private static void scheduleNextJobPurge(String workflow, long nextPurgeTime, + RebalanceScheduler rebalanceScheduler, HelixManager manager) { + long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow); + if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) { + rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime); + } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index e11f49e..e6e792f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -735,20 +735,8 @@ public class TaskUtil { for (String job : workflowConfig.getJobDag().getAllNodes()) { JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job); JobContext jobContext = TaskUtil.getJobContext(propertyStore, job); - if (jobConfig == null) { - LOG.error(String.format( - "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.", - job, workflowConfig.getWorkflowId())); - // Add the job name to expiredJobs so that purge operation will be tried again on this job + if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) { expiredJobs.add(job); - continue; - } - long expiry = jobConfig.getExpiry(); - if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) { - if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED - && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) { - expiredJobs.add(job); - } } } } @@ -756,6 +744,52 @@ public class TaskUtil { } /** + * Based on a workflow's config or context, create a set of jobs that are either expired, which + * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs, + * meaning that the job might have been deleted manually from the a job queue, or is left in the + * DAG due to a failed clean-up attempt from last purge. The difference between this function and + * getExpiredJobs() is that this function gets JobConfig and JobContext from a + * WorkflowControllerDataProvider instead of Zk. + * @param workflowControllerDataProvider + * @param workflowConfig + * @param workflowContext + * @return + */ + public static Set<String> getExpiredJobsFromCache( + WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig, + WorkflowContext workflowContext) { + Set<String> expiredJobs = new HashSet<>(); + Map<String, TaskState> jobStates = workflowContext.getJobStates(); + for (String job : workflowConfig.getJobDag().getAllNodes()) { + JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job); + JobContext jobContext = workflowControllerDataProvider.getJobContext(job); + if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) { + expiredJobs.add(job); + } + } + return expiredJobs; + } + + /* + * Checks if a job is expired and should be purged. This includes a special case when jobConfig + * is null. That happens when a job might have been deleted manually from the a job queue, or is + * left in the DAG due to a failed clean-up attempt from last purge. + */ + private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext, + TaskState jobState) { + if (jobConfig == null) { + LOG.warn( + "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.", + jobName); + return true; + } + long expiry = jobConfig.getExpiry(); + return jobContext != null && jobState == TaskState.COMPLETED + && jobContext.getFinishTime() != WorkflowContext.UNFINISHED + && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry; + } + + /** * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name. * @param accessor * @param propertyStore @@ -977,72 +1011,71 @@ public class TaskUtil { } /** - * Clean up all jobs that are COMPLETED and passes its expiry time. - * @param workflowConfig - * @param workflowContext + * Clean up all jobs that are marked as expired. */ - public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig, - WorkflowContext workflowContext, HelixManager manager, - RebalanceScheduler rebalanceScheduler) { - if (workflowContext == null) { - LOG.warn(String.format("Workflow %s context does not exist!", workflow)); - return; + public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs, + HelixManager manager, RebalanceScheduler rebalanceScheduler) { + Set<String> failedJobRemovals = new HashSet<>(); + for (String job : expiredJobs) { + if (!TaskUtil + .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) { + failedJobRemovals.add(job); + LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow); + } + rebalanceScheduler.removeScheduledRebalance(job); } - long purgeInterval = workflowConfig.getJobPurgeInterval(); - long currentTime = System.currentTimeMillis(); - final Set<String> expiredJobs = Sets.newHashSet(); - if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { - expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(), - manager.getHelixPropertyStore(), workflowConfig, workflowContext)); - if (expiredJobs.isEmpty()) { - LOG.info("No job to purge for the queue " + workflow); - } else { - LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); - Set<String> failedJobRemovals = new HashSet<>(); - for (String job : expiredJobs) { - if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), - job)) { - failedJobRemovals.add(job); - LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); - } - rebalanceScheduler.removeScheduledRebalance(job); - } - // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the - // removal will be tried again at next purge - expiredJobs.removeAll(failedJobRemovals); + // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the + // removal will be tried again at next purge + expiredJobs.removeAll(failedJobRemovals); - if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, - true)) { - LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs - + " from the workflow " + workflow); - } + if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) { + LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs, + workflow); + } - if (expiredJobs.size() > 0) { - // Update workflow context will be in main pipeline not here. Otherwise, it will cause - // concurrent write issue. It is possible that jobs got purged but there is no event to - // trigger the pipeline to clean context. - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - List<String> resourceConfigs = - accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); - if (resourceConfigs.size() > 0) { - RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); - } else { - LOG.warn( - "No resource config to trigger rebalance for clean up contexts for" + expiredJobs); - } - } + if (expiredJobs.size() > 0) { + // Update workflow context will be in main pipeline not here. Otherwise, it will cause + // concurrent write issue. It is possible that jobs got purged but there is no event to + // trigger the pipeline to clean context. + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + List<String> resourceConfigs = + accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); + if (resourceConfigs.size() > 0) { + RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); + } else { + LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!", + expiredJobs); } } - setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager); } - private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval, - RebalanceScheduler rebalanceScheduler, HelixManager manager) { - long nextPurgeTime = currentTime + purgeInterval; - long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow); - if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) { - rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime); + /** + * The function that removes IdealStates and workflow contexts of the workflows that need to be + * deleted. + * @param toBePurgedWorkflows + * @param manager + */ + public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows, + final HelixManager manager) { + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore(); + + for (String workflowName : toBePurgedWorkflows) { + LOG.warn( + "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!", + workflowName); + + // TODO: We dont need this in the future when TF is not relying on IS/EV anymore. + if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) { + LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.", + workflowName); + continue; + } + + if (!removeWorkflowContext(propertyStore, workflowName)) { + LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName); + } } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 53be558..4c9bd18 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -356,20 +356,6 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { admin.addResource(_manager.getClusterName(), jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME); - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - - // Set the job configuration - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - HelixProperty resourceConfig = new HelixProperty(jobResource); - resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap()); - Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); - if (taskConfigMap != null) { - for (TaskConfig taskConfig : taskConfigMap.values()) { - resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap()); - } - } - accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig); - // Push out new ideal state based on number of target partitions IdealStateBuilder builder = new CustomModeISBuilder(jobResource); builder.setRebalancerMode(IdealState.RebalanceMode.TASK); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java index 99e227c..43fda00 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java @@ -19,9 +19,17 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.HashMap; +import java.util.Map; + import org.apache.helix.AccessOption; +import org.apache.helix.HelixConstants; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; +import org.apache.helix.common.DedupEventProcessor; +import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; +import org.apache.helix.controller.pipeline.AsyncWorkerType; +import org.apache.helix.task.TaskUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; @@ -85,7 +93,7 @@ public class TestTaskStage extends TaskTestBase { TaskConstants.STATE_MODEL_NAME); // Create the context - WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow)); + WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED); wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED); wfCtx.setWorkflowState(TaskState.IN_PROGRESS); @@ -144,15 +152,34 @@ public class TestTaskStage extends TaskTestBase { * async job purge will try to delete it again. */ @Test(dependsOnMethods = "testPersistContextData") - public void testPartialDataPurge() { + public void testPartialDataPurge() throws Exception { + DedupEventProcessor<String, Runnable> worker = + new DedupEventProcessor<String, Runnable>(CLUSTER_NAME, + AsyncWorkerType.TaskJobPurgeWorker.name()) { + @Override + protected void handleEvent(Runnable event) { + event.run(); + } + }; + worker.start(); + Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>(); + workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker); + _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool); + // Manually delete JobConfig deleteJobConfigs(_testWorkflow, _testJobPrefix + "0"); deleteJobConfigs(_testWorkflow, _testJobPrefix + "1"); deleteJobConfigs(_testWorkflow, _testJobPrefix + "2"); + // Manually refresh because there's no controller notify data change + BaseControllerDataProvider dataProvider = + _event.getAttribute(AttributeName.ControllerDataProvider.name()); + dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); + dataProvider.refresh(_manager.getHelixDataAccessor()); + // Then purge jobs TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); - garbageCollectionStage.execute(_event); + garbageCollectionStage.process(_event); // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths // IdealState check @@ -161,6 +188,41 @@ public class TestTaskStage extends TaskTestBase { checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2"); } + @Test(dependsOnMethods = "testPartialDataPurge") + public void testWorkflowGarbageCollection() throws Exception { + DedupEventProcessor<String, Runnable> worker = + new DedupEventProcessor<String, Runnable>(CLUSTER_NAME, + AsyncWorkerType.TaskJobPurgeWorker.name()) { + @Override + protected void handleEvent(Runnable event) { + event.run(); + } + }; + worker.start(); + Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>(); + workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker); + _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool); + + String zkPath = + _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath(); + _baseAccessor.remove(zkPath, AccessOption.PERSISTENT); + + // Manually refresh because there's no controller notify data change + BaseControllerDataProvider dataProvider = + _event.getAttribute(AttributeName.ControllerDataProvider.name()); + dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); + dataProvider.refresh(_manager.getHelixDataAccessor()); + + // Then garbage collect workflow + TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); + garbageCollectionStage.process(_event); + + // Check that IS and contexts have been purged for the workflow + checkForIdealStateAndContextRemoval(_testWorkflow); + + worker.shutdown(); + } + private void deleteJobConfigs(String workflowName, String jobName) { String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath(); String newPath = _manager.getHelixDataAccessor().keyBuilder() @@ -169,16 +231,23 @@ public class TestTaskStage extends TaskTestBase { _baseAccessor.remove(newPath, AccessOption.PERSISTENT); } - private void checkForIdealStateAndContextRemoval(String workflow, String job) { - // IdealState - Assert.assertFalse( - _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)); - + private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception { // JobContexts in old ZNode path String oldPath = String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job); String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath(); - Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT)); - Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT)); + + Assert.assertTrue(TestHelper.verify( + () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT) + && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor + .exists(newPath, AccessOption.PERSISTENT), 120000)); + } + + private void checkForIdealStateAndContextRemoval(String workflow) throws Exception { + Assert.assertTrue(TestHelper.verify(() -> + !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT) + && !_baseAccessor + .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT), + 120000)); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java index daa9b4b..ba4fb44 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java @@ -80,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase { } @Test - public void testJobQueueAutoCleanUp() throws InterruptedException { + public void testJobQueueAutoCleanUp() throws Exception { int capacity = 10; String queueName = TestHelper.getTestMethodName(); JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity); @@ -105,14 +105,19 @@ public class TestJobQueueCleanUp extends TaskTestBase { } _driver.start(builder.build()); _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED); - Thread.sleep(2000); - WorkflowConfig config = _driver.getWorkflowConfig(queueName); - Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs); + Assert + .assertTrue(TestHelper.verify(() -> { + WorkflowConfig config = _driver.getWorkflowConfig(queueName); + System.out.println("|Current time: " + System.currentTimeMillis() +" **TEST: " + config.getJobDag().getAllNodes()); + return config.getJobDag().getAllNodes().equals(remainJobs); + }, TestHelper.WAIT_DURATION)); - WorkflowContext context = _driver.getWorkflowContext(queueName); - Assert.assertEquals(context.getJobStates().keySet(), remainJobs); - Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet())); + Assert.assertTrue(TestHelper.verify(() -> { + WorkflowContext context = _driver.getWorkflowContext(queueName); + return context.getJobStates().keySet().equals(remainJobs) && remainJobs + .containsAll(context.getJobStartTimes().keySet()); + }, TestHelper.WAIT_DURATION)); for (String job : deletedJobs) { JobConfig cfg = _driver.getJobConfig(job); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java index 70dd33f..31010cf 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java @@ -103,6 +103,72 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase { Assert.assertTrue(workflowContextNotCreated); } + @Test + public void testWorkflowContextGarbageCollection() throws Exception { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName); + _driver.start(builder1.build()); + + // Wait until workflow is created and IN_PROGRESS state. + _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS); + + // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this + // workflow + Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); + Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); + Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); + + String workflowContextPath = + "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context"; + + ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath, + null, AccessOption.PERSISTENT); + Assert.assertNotNull(record); + + // Wait until workflow is completed. + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got + // expired. + boolean workflowExpired = TestHelper.verify(() -> { + WorkflowContext wCtx = _driver.getWorkflowContext(workflowName); + WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName); + IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); + return (wCtx == null && wCfg == null && idealState == null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(workflowExpired); + + _controller.syncStop(); + + // Write workflow context to ZooKeeper + _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record, + AccessOption.PERSISTENT); + + // Verify context is written back to ZK. + record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath, + null, AccessOption.PERSISTENT); + Assert.assertNotNull(record); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Create and start new workflow just to make sure controller is running and new workflow is + // scheduled successfully. + String workflowName2 = TestHelper.getTestMethodName() + "_2"; + Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2); + _driver.start(builder2.build()); + _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED); + + // Verify that WorkflowContext will be deleted + boolean contextDeleted = TestHelper.verify(() -> { + WorkflowContext wCtx = _driver.getWorkflowContext(workflowName); + return (wCtx == null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(contextDeleted); + } + private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) { final long expiryTime = 5000L; Workflow.Builder builder = new Workflow.Builder(workflowName); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java new file mode 100644 index 0000000..56e756d --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java @@ -0,0 +1,95 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.TaskTestUtil; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestTaskUtil extends TaskTestBase { + + @Test + public void testGetExpiredJobsFromCache() { + String workflowName = "TEST_WORKFLOW"; + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName); + + JobConfig.Builder jobBuilder_0 = + new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1") + .setExpiry(1L); + JobConfig.Builder jobBuilder_1 = + new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1") + .setExpiry(1L); + JobConfig.Builder jobBuilder_2 = + new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1") + .setExpiry(1L); + JobConfig.Builder jobBuilder_3 = + new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1") + .setExpiry(1L); + Workflow jobQueue = + queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1) + .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build(); + + WorkflowContext workflowContext = mock(WorkflowContext.class); + Map<String, TaskState> jobStates = new HashMap<>(); + jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED); + jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED); + jobStates.put(workflowName + "_Job_2", TaskState.FAILED); + jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED); + when(workflowContext.getJobStates()).thenReturn(jobStates); + + JobConfig jobConfig = mock(JobConfig.class); + WorkflowControllerDataProvider workflowControllerDataProvider = + mock(WorkflowControllerDataProvider.class); + when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null); + when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")) + .thenReturn(jobConfig); + when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2")) + .thenReturn(jobConfig); + when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3")) + .thenReturn(jobConfig); + + JobContext jobContext = mock(JobContext.class); + when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis()); + + when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null); + when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2")) + .thenReturn(jobContext); + when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3")) + .thenReturn(jobContext); + + Set<String> expectedJobs = new HashSet<>(); + expectedJobs.add(workflowName + "_Job_0"); + expectedJobs.add(workflowName + "_Job_3"); + Assert.assertEquals(TaskUtil + .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(), + workflowContext), expectedJobs); + } +}
