This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch revert-1076-nealsun/recover-tf-garbage-collection in repository https://gitbox.apache.org/repos/asf/helix.git
commit 73136670815469898b901c5b69b1895666f136d8 Author: Junkai Xue <[email protected]> AuthorDate: Fri Jul 17 12:00:31 2020 -0700 Revert "Recover Workflow Garbage Collection Logic (#1076)" This reverts commit c7a97bdce66b21dab5522a4f100d08054ada99c2. --- .../helix/controller/stages/AttributeName.java | 6 +- .../stages/TaskGarbageCollectionStage.java | 93 ++--------- .../main/java/org/apache/helix/task/TaskUtil.java | 173 +++++++++------------ .../org/apache/helix/task/WorkflowDispatcher.java | 14 ++ .../helix/controller/stages/TestTaskStage.java | 89 ++--------- .../task/TestWorkflowContextWithoutConfig.java | 66 -------- .../java/org/apache/helix/task/TestTaskUtil.java | 95 ----------- 7 files changed, 112 insertions(+), 424 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java index 557bb20..0af0ee5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java @@ -40,9 +40,5 @@ public enum AttributeName { PipelineType, LastRebalanceFinishTimeStamp, ControllerDataProvider, - STATEFUL_REBALANCER, - // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions. - TO_BE_PURGED_WORKFLOWS, - // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions. - TO_BE_PURGED_JOBS_MAP + STATEFUL_REBALANCER } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java index f965a3e..3f8e744 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java @@ -1,9 +1,6 @@ package org.apache.helix.controller.stages; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import org.apache.helix.HelixManager; @@ -13,12 +10,9 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowConfig; -import org.apache.helix.task.WorkflowContext; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage { private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class); private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler(); @@ -29,87 +23,34 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage { } @Override - public void process(ClusterEvent event) throws Exception { - // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed. - // This is to avoid race conditions since the cache will be modified. After this work, then the - // async work will happen. + public void execute(ClusterEvent event) { + WorkflowControllerDataProvider dataProvider = + event.getAttribute(AttributeName.ControllerDataProvider.name()); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); - if (manager == null) { + + if (dataProvider == null || manager == null) { LOG.warn( - "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.", + "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.", event.getEventId(), event.getEventType(), event.getClusterName()); return; } - Map<String, Set<String>> expiredJobsMap = new HashMap<>(); - Set<String> workflowsToBePurged = new HashSet<>(); - WorkflowControllerDataProvider dataProvider = - event.getAttribute(AttributeName.ControllerDataProvider.name()); - for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) { - WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey()); + Set<WorkflowConfig> existingWorkflows = + new HashSet<>(dataProvider.getWorkflowConfigMap().values()); + for (WorkflowConfig workflowConfig : existingWorkflows) { + // clean up the expired jobs if it is a queue. if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig .isJobQueue())) { - WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey()); - long purgeInterval = workflowConfig.getJobPurgeInterval(); - long currentTime = System.currentTimeMillis(); - if (purgeInterval > 0 - && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { - // Find jobs that are ready to be purged - Set<String> expiredJobs = - TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext); - if (!expiredJobs.isEmpty()) { - expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs); - } - scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, purgeInterval, - _rebalanceScheduler, manager); + try { + TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig, + dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager, + _rebalanceScheduler); + } catch (Exception e) { + LOG.warn(String.format("Failed to purge job for workflow %s with reason %s", + workflowConfig.getWorkflowId(), e.toString())); } - } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId() - .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) { - // Find workflows that need to be purged - workflowsToBePurged.add(entry.getKey()); - } - } - event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(), - Collections.unmodifiableMap(expiredJobsMap)); - event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(), - Collections.unmodifiableSet(workflowsToBePurged)); - - super.process(event); - } - - @Override - public void execute(ClusterEvent event) { - HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); - if (manager == null) { - LOG.warn( - "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.", - event.getEventId(), event.getEventType(), event.getClusterName()); - return; - } - - Map<String, Set<String>> expiredJobsMap = - event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name()); - Set<String> toBePurgedWorkflows = - event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name()); - - for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) { - try { - TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager, - _rebalanceScheduler); - } catch (Exception e) { - LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e); } } - TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager); - } - - private static void scheduleNextJobPurge(String workflow, long currentTime, long purgeInterval, - RebalanceScheduler rebalanceScheduler, HelixManager manager) { - long nextPurgeTime = currentTime + purgeInterval; - long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow); - if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) { - rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime); - } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index ec8d216..917a69b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -735,8 +735,20 @@ public class TaskUtil { for (String job : workflowConfig.getJobDag().getAllNodes()) { JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job); JobContext jobContext = TaskUtil.getJobContext(propertyStore, job); - if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) { + if (jobConfig == null) { + LOG.error(String.format( + "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.", + job, workflowConfig.getWorkflowId())); + // Add the job name to expiredJobs so that purge operation will be tried again on this job expiredJobs.add(job); + continue; + } + long expiry = jobConfig.getExpiry(); + if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) { + if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED + && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) { + expiredJobs.add(job); + } } } } @@ -744,52 +756,6 @@ public class TaskUtil { } /** - * Based on a workflow's config or context, create a set of jobs that are either expired, which - * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs, - * meaning that the job might have been deleted manually from the a job queue, or is left in the - * DAG due to a failed clean-up attempt from last purge. The difference between this function and - * getExpiredJobs() is that this function gets JobConfig and JobContext from a - * WorkflowControllerDataProvider instead of Zk. - * @param workflowControllerDataProvider - * @param workflowConfig - * @param workflowContext - * @return - */ - public static Set<String> getExpiredJobsFromCache( - WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig, - WorkflowContext workflowContext) { - Set<String> expiredJobs = new HashSet<>(); - Map<String, TaskState> jobStates = workflowContext.getJobStates(); - for (String job : workflowConfig.getJobDag().getAllNodes()) { - JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job); - JobContext jobContext = workflowControllerDataProvider.getJobContext(job); - if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) { - expiredJobs.add(job); - } - } - return expiredJobs; - } - - /* - * Checks if a job is expired and should be purged. This includes a special case when jobConfig - * is null. That happens when a job might have been deleted manually from the a job queue, or is - * left in the DAG due to a failed clean-up attempt from last purge. - */ - private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext, - TaskState jobState) { - if (jobConfig == null) { - LOG.warn( - "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.", - jobName); - return true; - } - long expiry = jobConfig.getExpiry(); - return jobContext != null && jobState == TaskState.COMPLETED - && jobContext.getFinishTime() != WorkflowContext.UNFINISHED - && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry; - } - - /** * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name. * @param accessor * @param propertyStore @@ -1011,71 +977,72 @@ public class TaskUtil { } /** - * Clean up all jobs that are marked as expired. + * Clean up all jobs that are COMPLETED and passes its expiry time. + * @param workflowConfig + * @param workflowContext */ - public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs, - HelixManager manager, RebalanceScheduler rebalanceScheduler) { - Set<String> failedJobRemovals = new HashSet<>(); - for (String job : expiredJobs) { - if (!TaskUtil - .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) { - failedJobRemovals.add(job); - LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow); - } - rebalanceScheduler.removeScheduledRebalance(job); + public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig, + WorkflowContext workflowContext, HelixManager manager, + RebalanceScheduler rebalanceScheduler) { + if (workflowContext == null) { + LOG.warn(String.format("Workflow %s context does not exist!", workflow)); + return; } + long purgeInterval = workflowConfig.getJobPurgeInterval(); + long currentTime = System.currentTimeMillis(); + final Set<String> expiredJobs = Sets.newHashSet(); + if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { + expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(), + manager.getHelixPropertyStore(), workflowConfig, workflowContext)); + if (expiredJobs.isEmpty()) { + LOG.info("No job to purge for the queue " + workflow); + } else { + LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); + Set<String> failedJobRemovals = new HashSet<>(); + for (String job : expiredJobs) { + if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), + job)) { + failedJobRemovals.add(job); + LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); + } + rebalanceScheduler.removeScheduledRebalance(job); + } - // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the - // removal will be tried again at next purge - expiredJobs.removeAll(failedJobRemovals); + // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the + // removal will be tried again at next purge + expiredJobs.removeAll(failedJobRemovals); - if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) { - LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs, - workflow); - } + if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, + true)) { + LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + + " from the workflow " + workflow); + } - if (expiredJobs.size() > 0) { - // Update workflow context will be in main pipeline not here. Otherwise, it will cause - // concurrent write issue. It is possible that jobs got purged but there is no event to - // trigger the pipeline to clean context. - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - List<String> resourceConfigs = - accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); - if (resourceConfigs.size() > 0) { - RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); - } else { - LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!", - expiredJobs); + if (expiredJobs.size() > 0) { + // Update workflow context will be in main pipeline not here. Otherwise, it will cause + // concurrent write issue. It is possible that jobs got purged but there is no event to + // trigger the pipeline to clean context. + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + List<String> resourceConfigs = + accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); + if (resourceConfigs.size() > 0) { + RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); + } else { + LOG.warn( + "No resource config to trigger rebalance for clean up contexts for" + expiredJobs); + } + } } } + setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager); } - /** - * The function that removes IdealStates and workflow contexts of the workflows that need to be - * deleted. - * @param toBePurgedWorkflows - * @param manager - */ - public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows, - final HelixManager manager) { - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore(); - - for (String workflowName : toBePurgedWorkflows) { - LOG.warn( - "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!", - workflowName); - - // TODO: We dont need this in the future when TF is not relying on IS/EV anymore. - if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) { - LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.", - workflowName); - continue; - } - - if (!removeWorkflowContext(propertyStore, workflowName)) { - LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName); - } + private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval, + RebalanceScheduler rebalanceScheduler, HelixManager manager) { + long nextPurgeTime = currentTime + purgeInterval; + long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow); + if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) { + rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime); } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 8c49e1e..89e6d76 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -356,6 +356,20 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { admin.addResource(_manager.getClusterName(), jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + + // Set the job configuration + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + HelixProperty resourceConfig = new HelixProperty(jobResource); + resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap()); + Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); + if (taskConfigMap != null) { + for (TaskConfig taskConfig : taskConfigMap.values()) { + resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap()); + } + } + accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig); + // Push out new ideal state based on number of target partitions IdealStateBuilder builder = new CustomModeISBuilder(jobResource); builder.setRebalancerMode(IdealState.RebalanceMode.TASK); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java index db27355..fefc737 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java @@ -1,16 +1,8 @@ package org.apache.helix.controller.stages; -import java.util.HashMap; -import java.util.Map; - import org.apache.helix.AccessOption; -import org.apache.helix.HelixConstants; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; -import org.apache.helix.common.DedupEventProcessor; -import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; -import org.apache.helix.controller.pipeline.AsyncWorkerType; -import org.apache.helix.task.TaskUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; @@ -74,7 +66,7 @@ public class TestTaskStage extends TaskTestBase { TaskConstants.STATE_MODEL_NAME); // Create the context - WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); + WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow)); wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED); wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED); wfCtx.setWorkflowState(TaskState.IN_PROGRESS); @@ -133,34 +125,15 @@ public class TestTaskStage extends TaskTestBase { * async job purge will try to delete it again. */ @Test(dependsOnMethods = "testPersistContextData") - public void testPartialDataPurge() throws Exception { - DedupEventProcessor<String, Runnable> worker = - new DedupEventProcessor<String, Runnable>(CLUSTER_NAME, - AsyncWorkerType.TaskJobPurgeWorker.name()) { - @Override - protected void handleEvent(Runnable event) { - event.run(); - } - }; - worker.start(); - Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>(); - workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker); - _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool); - + public void testPartialDataPurge() { // Manually delete JobConfig deleteJobConfigs(_testWorkflow, _testJobPrefix + "0"); deleteJobConfigs(_testWorkflow, _testJobPrefix + "1"); deleteJobConfigs(_testWorkflow, _testJobPrefix + "2"); - // Manually refresh because there's no controller notify data change - BaseControllerDataProvider dataProvider = - _event.getAttribute(AttributeName.ControllerDataProvider.name()); - dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); - dataProvider.refresh(_manager.getHelixDataAccessor()); - // Then purge jobs TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); - garbageCollectionStage.process(_event); + garbageCollectionStage.execute(_event); // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths // IdealState check @@ -169,41 +142,6 @@ public class TestTaskStage extends TaskTestBase { checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2"); } - @Test(dependsOnMethods = "testPartialDataPurge") - public void testWorkflowGarbageCollection() throws Exception { - DedupEventProcessor<String, Runnable> worker = - new DedupEventProcessor<String, Runnable>(CLUSTER_NAME, - AsyncWorkerType.TaskJobPurgeWorker.name()) { - @Override - protected void handleEvent(Runnable event) { - event.run(); - } - }; - worker.start(); - Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>(); - workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker); - _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool); - - String zkPath = - _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath(); - _baseAccessor.remove(zkPath, AccessOption.PERSISTENT); - - // Manually refresh because there's no controller notify data change - BaseControllerDataProvider dataProvider = - _event.getAttribute(AttributeName.ControllerDataProvider.name()); - dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); - dataProvider.refresh(_manager.getHelixDataAccessor()); - - // Then garbage collect workflow - TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); - garbageCollectionStage.process(_event); - - // Check that IS and contexts have been purged for the workflow - checkForIdealStateAndContextRemoval(_testWorkflow); - - worker.shutdown(); - } - private void deleteJobConfigs(String workflowName, String jobName) { String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath(); String newPath = _manager.getHelixDataAccessor().keyBuilder() @@ -212,23 +150,16 @@ public class TestTaskStage extends TaskTestBase { _baseAccessor.remove(newPath, AccessOption.PERSISTENT); } - private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception { + private void checkForIdealStateAndContextRemoval(String workflow, String job) { + // IdealState + Assert.assertFalse( + _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)); + // JobContexts in old ZNode path String oldPath = String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job); String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath(); - - Assert.assertTrue(TestHelper.verify( - () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT) - && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor - .exists(newPath, AccessOption.PERSISTENT), 120000)); - } - - private void checkForIdealStateAndContextRemoval(String workflow) throws Exception { - Assert.assertTrue(TestHelper.verify(() -> - !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT) - && !_baseAccessor - .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT), - 120000)); + Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT)); + Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT)); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java index 84df546..6c29f3a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java @@ -103,72 +103,6 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase { Assert.assertTrue(workflowContextNotCreated); } - @Test - public void testWorkflowContextGarbageCollection() throws Exception { - String workflowName = TestHelper.getTestMethodName(); - Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName); - _driver.start(builder1.build()); - - // Wait until workflow is created and IN_PROGRESS state. - _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS); - - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow - Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); - Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); - - String workflowContextPath = - "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context"; - - ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath, - null, AccessOption.PERSISTENT); - Assert.assertNotNull(record); - - // Wait until workflow is completed. - _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); - - // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got - // expired. - boolean workflowExpired = TestHelper.verify(() -> { - WorkflowContext wCtx = _driver.getWorkflowContext(workflowName); - WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName); - IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wCtx == null && wCfg == null && idealState == null); - }, TestHelper.WAIT_DURATION); - Assert.assertTrue(workflowExpired); - - _controller.syncStop(); - - // Write workflow context to ZooKeeper - _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record, - AccessOption.PERSISTENT); - - // Verify context is written back to ZK. - record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath, - null, AccessOption.PERSISTENT); - Assert.assertNotNull(record); - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - // Create and start new workflow just to make sure controller is running and new workflow is - // scheduled successfully. - String workflowName2 = TestHelper.getTestMethodName() + "_2"; - Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2); - _driver.start(builder2.build()); - _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED); - - // Verify that WorkflowContext will be deleted - boolean contextDeleted = TestHelper.verify(() -> { - WorkflowContext wCtx = _driver.getWorkflowContext(workflowName); - return (wCtx == null); - }, TestHelper.WAIT_DURATION); - Assert.assertTrue(contextDeleted); - } - private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) { final long expiryTime = 5000L; Workflow.Builder builder = new Workflow.Builder(workflowName); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java deleted file mode 100644 index 56e756d..0000000 --- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.apache.helix.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; -import org.apache.helix.integration.task.TaskTestBase; -import org.apache.helix.integration.task.TaskTestUtil; -import org.testng.Assert; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestTaskUtil extends TaskTestBase { - - @Test - public void testGetExpiredJobsFromCache() { - String workflowName = "TEST_WORKFLOW"; - JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName); - - JobConfig.Builder jobBuilder_0 = - new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1") - .setExpiry(1L); - JobConfig.Builder jobBuilder_1 = - new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1") - .setExpiry(1L); - JobConfig.Builder jobBuilder_2 = - new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1") - .setExpiry(1L); - JobConfig.Builder jobBuilder_3 = - new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1") - .setExpiry(1L); - Workflow jobQueue = - queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1) - .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build(); - - WorkflowContext workflowContext = mock(WorkflowContext.class); - Map<String, TaskState> jobStates = new HashMap<>(); - jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED); - jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED); - jobStates.put(workflowName + "_Job_2", TaskState.FAILED); - jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED); - when(workflowContext.getJobStates()).thenReturn(jobStates); - - JobConfig jobConfig = mock(JobConfig.class); - WorkflowControllerDataProvider workflowControllerDataProvider = - mock(WorkflowControllerDataProvider.class); - when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null); - when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")) - .thenReturn(jobConfig); - when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2")) - .thenReturn(jobConfig); - when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3")) - .thenReturn(jobConfig); - - JobContext jobContext = mock(JobContext.class); - when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis()); - - when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null); - when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2")) - .thenReturn(jobContext); - when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3")) - .thenReturn(jobContext); - - Set<String> expectedJobs = new HashSet<>(); - expectedJobs.add(workflowName + "_Job_0"); - expectedJobs.add(workflowName + "_Job_3"); - Assert.assertEquals(TaskUtil - .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(), - workflowContext), expectedJobs); - } -}
