This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch task-improvement
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/task-improvement by this push:
new ddde2f0 Recover Workflow GC Logic (#1181)
ddde2f0 is described below
commit ddde2f084ed2f6c0a0fe89f2c412c0fa4e2a36c1
Author: Neal Sun <[email protected]>
AuthorDate: Wed Jul 29 10:32:59 2020 -0700
Recover Workflow GC Logic (#1181)
Recover Workflow Garbage Collection Logic
Recover Workflow Garbage Collection Logic
---
.../helix/controller/stages/AttributeName.java | 6 +-
.../stages/TaskGarbageCollectionStage.java | 95 +++++++++--
.../main/java/org/apache/helix/task/TaskUtil.java | 173 ++++++++++++---------
.../org/apache/helix/task/WorkflowDispatcher.java | 14 --
.../helix/controller/stages/TestTaskStage.java | 89 +++++++++--
.../integration/task/TestJobQueueCleanUp.java | 19 ++-
.../task/TestWorkflowContextWithoutConfig.java | 66 ++++++++
.../java/org/apache/helix/task/TestTaskUtil.java | 95 +++++++++++
8 files changed, 438 insertions(+), 119 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 0af0ee5..557bb20 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,5 +40,9 @@ public enum AttributeName {
PipelineType,
LastRebalanceFinishTimeStamp,
ControllerDataProvider,
- STATEFUL_REBALANCER
+ STATEFUL_REBALANCER,
+ // This attribute should only be used in TaskGarbageCollectionStage, misuse
could cause race conditions.
+ TO_BE_PURGED_WORKFLOWS,
+ // This attribute should only be used in TaskGarbageCollectionStage, misuse
could cause race conditions.
+ TO_BE_PURGED_JOBS_MAP
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 3f8e744..3cdf560 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,6 +1,9 @@
package org.apache.helix.controller.stages;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixManager;
@@ -10,9 +13,12 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
private static Logger LOG =
LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
private static RebalanceScheduler _rebalanceScheduler = new
RebalanceScheduler();
@@ -23,34 +29,89 @@ public class TaskGarbageCollectionStage extends
AbstractAsyncBaseStage {
}
@Override
- public void execute(ClusterEvent event) {
- WorkflowControllerDataProvider dataProvider =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
+ public void process(ClusterEvent event) throws Exception {
+ // Use main thread to compute what jobs need to be purged, and what
workflows need to be gc'ed.
+ // This is to avoid race conditions since the cache will be modified.
After this work, then the
+ // async work will happen.
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
-
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ "HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
- for (WorkflowConfig workflowConfig : existingWorkflows) {
- // clean up the expired jobs if it is a queue.
+ Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+ Set<String> workflowsToBePurged = new HashSet<>();
+ WorkflowControllerDataProvider dataProvider =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ for (Map.Entry<String, ZNRecord> entry :
dataProvider.getContexts().entrySet()) {
+ WorkflowConfig workflowConfig =
dataProvider.getWorkflowConfig(entry.getKey());
if (workflowConfig != null && (!workflowConfig.isTerminable() ||
workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(),
workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()),
manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ WorkflowContext workflowContext =
dataProvider.getWorkflowContext(entry.getKey());
+ if (workflowContext == null) {
+ continue;
}
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
+ long nextPurgeTime = workflowContext.getLastJobPurgeTime() +
purgeInterval;
+ if (purgeInterval > 0 && nextPurgeTime <= currentTime) {
+ nextPurgeTime = currentTime + purgeInterval;
+ // Find jobs that are ready to be purged
+ Set<String> expiredJobs =
+ TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig,
workflowContext);
+ if (!expiredJobs.isEmpty()) {
+ expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+ }
+ }
+ scheduleNextJobPurge(workflowConfig.getWorkflowId(), nextPurgeTime,
_rebalanceScheduler,
+ manager);
+ } else if (workflowConfig == null && entry.getValue() != null &&
entry.getValue().getId()
+ .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
+ // Find workflows that need to be purged
+ workflowsToBePurged.add(entry.getKey());
}
}
+ event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
+ Collections.unmodifiableMap(expiredJobsMap));
+ event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
+ Collections.unmodifiableSet(workflowsToBePurged));
+
+ super.process(event);
+ }
+ @Override
+ public void execute(ClusterEvent event) {
+ HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
+ if (manager == null) {
+ LOG.warn(
+ "HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage async execution.",
+ event.getEventId(), event.getEventType(), event.getClusterName());
+ return;
+ }
+
+ Map<String, Set<String>> expiredJobsMap =
+ event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
+ Set<String> toBePurgedWorkflows =
+ event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+
+ for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
+ try {
+ TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager,
_rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
+ }
+ }
+
+ TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+ }
+
+ private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
+ RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+ long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+ if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+ rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+ }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 917a69b..ec8d216 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,20 +735,8 @@ public class TaskUtil {
for (String job : workflowConfig.getJobDag().getAllNodes()) {
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
- if (jobConfig == null) {
- LOG.error(String.format(
- "Job %s exists in JobDAG but JobConfig is missing! Job might
have been deleted manually from the JobQueue: %s, or left in the DAG due to a
failed clean-up attempt from last purge.",
- job, workflowConfig.getWorkflowId()));
- // Add the job name to expiredJobs so that purge operation will be
tried again on this job
+ if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
expiredJobs.add(job);
- continue;
- }
- long expiry = jobConfig.getExpiry();
- if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
- if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
- && System.currentTimeMillis() >= jobContext.getFinishTime() +
expiry) {
- expiredJobs.add(job);
- }
}
}
}
@@ -756,6 +744,52 @@ public class TaskUtil {
}
/**
+ * Based on a workflow's config or context, create a set of jobs that are
either expired, which
+ * means they are COMPLETED and have passed their expiration time, or don't
have JobConfigs,
+ * meaning that the job might have been deleted manually from the a job
queue, or is left in the
+ * DAG due to a failed clean-up attempt from last purge. The difference
between this function and
+ * getExpiredJobs() is that this function gets JobConfig and JobContext from
a
+ * WorkflowControllerDataProvider instead of Zk.
+ * @param workflowControllerDataProvider
+ * @param workflowConfig
+ * @param workflowContext
+ * @return
+ */
+ public static Set<String> getExpiredJobsFromCache(
+ WorkflowControllerDataProvider workflowControllerDataProvider,
WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ Set<String> expiredJobs = new HashSet<>();
+ Map<String, TaskState> jobStates = workflowContext.getJobStates();
+ for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+ JobContext jobContext =
workflowControllerDataProvider.getJobContext(job);
+ if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+ expiredJobs.add(job);
+ }
+ }
+ return expiredJobs;
+ }
+
+ /*
+ * Checks if a job is expired and should be purged. This includes a special
case when jobConfig
+ * is null. That happens when a job might have been deleted manually from
the a job queue, or is
+ * left in the DAG due to a failed clean-up attempt from last purge.
+ */
+ private static boolean isJobExpired(String jobName, JobConfig jobConfig,
JobContext jobContext,
+ TaskState jobState) {
+ if (jobConfig == null) {
+ LOG.warn(
+ "Job {} exists in JobDAG but JobConfig is missing! It's treated as
expired and will be purged.",
+ jobName);
+ return true;
+ }
+ long expiry = jobConfig.getExpiry();
+ return jobContext != null && jobState == TaskState.COMPLETED
+ && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+ && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+ }
+
+ /**
* Remove Job Config, IS/EV, and Context in order. Job name here must be a
namespaced job name.
* @param accessor
* @param propertyStore
@@ -977,72 +1011,71 @@ public class TaskUtil {
}
/**
- * Clean up all jobs that are COMPLETED and passes its expiry time.
- * @param workflowConfig
- * @param workflowContext
+ * Clean up all jobs that are marked as expired.
*/
- public static void purgeExpiredJobs(String workflow, WorkflowConfig
workflowConfig,
- WorkflowContext workflowContext, HelixManager manager,
- RebalanceScheduler rebalanceScheduler) {
- if (workflowContext == null) {
- LOG.warn(String.format("Workflow %s context does not exist!", workflow));
- return;
+ public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+ HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+ Set<String> failedJobRemovals = new HashSet<>();
+ for (String job : expiredJobs) {
+ if (!TaskUtil
+ .removeJob(manager.getHelixDataAccessor(),
manager.getHelixPropertyStore(), job)) {
+ failedJobRemovals.add(job);
+ LOG.warn("Failed to clean up expired and completed jobs from workflow
{}!", workflow);
+ }
+ rebalanceScheduler.removeScheduledRebalance(job);
}
- long purgeInterval = workflowConfig.getJobPurgeInterval();
- long currentTime = System.currentTimeMillis();
- final Set<String> expiredJobs = Sets.newHashSet();
- if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() +
purgeInterval <= currentTime) {
-
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
- manager.getHelixPropertyStore(), workflowConfig, workflowContext));
- if (expiredJobs.isEmpty()) {
- LOG.info("No job to purge for the queue " + workflow);
- } else {
- LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
- Set<String> failedJobRemovals = new HashSet<>();
- for (String job : expiredJobs) {
- if (!TaskUtil.removeJob(manager.getHelixDataAccessor(),
manager.getHelixPropertyStore(),
- job)) {
- failedJobRemovals.add(job);
- LOG.warn("Failed to clean up expired and completed jobs from
workflow " + workflow);
- }
- rebalanceScheduler.removeScheduledRebalance(job);
- }
- // If the job removal failed, make sure we do NOT prematurely delete
it from DAG so that the
- // removal will be tried again at next purge
- expiredJobs.removeAll(failedJobRemovals);
+ // If the job removal failed, make sure we do NOT prematurely delete it
from DAG so that the
+ // removal will be tried again at next purge
+ expiredJobs.removeAll(failedJobRemovals);
- if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(),
workflow, expiredJobs,
- true)) {
- LOG.warn("Error occurred while trying to remove jobs + " +
expiredJobs
- + " from the workflow " + workflow);
- }
+ if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow,
expiredJobs, true)) {
+ LOG.warn("Error occurred while trying to remove jobs {} from the
workflow {}!", expiredJobs,
+ workflow);
+ }
- if (expiredJobs.size() > 0) {
- // Update workflow context will be in main pipeline not here.
Otherwise, it will cause
- // concurrent write issue. It is possible that jobs got purged but
there is no event to
- // trigger the pipeline to clean context.
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- List<String> resourceConfigs =
- accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
- if (resourceConfigs.size() > 0) {
- RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(),
0L);
- } else {
- LOG.warn(
- "No resource config to trigger rebalance for clean up contexts
for" + expiredJobs);
- }
- }
+ if (expiredJobs.size() > 0) {
+ // Update workflow context will be in main pipeline not here. Otherwise,
it will cause
+ // concurrent write issue. It is possible that jobs got purged but there
is no event to
+ // trigger the pipeline to clean context.
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ List<String> resourceConfigs =
+ accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+ if (resourceConfigs.size() > 0) {
+ RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+ } else {
+ LOG.warn("No resource config to trigger rebalance for clean up
contexts for {}!",
+ expiredJobs);
}
}
- setNextJobPurgeTime(workflow, currentTime, purgeInterval,
rebalanceScheduler, manager);
}
- private static void setNextJobPurgeTime(String workflow, long currentTime,
long purgeInterval,
- RebalanceScheduler rebalanceScheduler, HelixManager manager) {
- long nextPurgeTime = currentTime + purgeInterval;
- long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
- if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
- rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+ /**
+ * The function that removes IdealStates and workflow contexts of the
workflows that need to be
+ * deleted.
+ * @param toBePurgedWorkflows
+ * @param manager
+ */
+ public static void workflowGarbageCollection(final Set<String>
toBePurgedWorkflows,
+ final HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ HelixPropertyStore<ZNRecord> propertyStore =
manager.getHelixPropertyStore();
+
+ for (String workflowName : toBePurgedWorkflows) {
+ LOG.warn(
+ "WorkflowContext exists for workflow {}. However, Workflow Config is
missing! Deleting the WorkflowConfig and IdealState!!",
+ workflowName);
+
+ // TODO: We dont need this in the future when TF is not relying on IS/EV
anymore.
+ if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+ LOG.warn("Error occurred while trying to remove workflow
idealstate/externalview for {}.",
+ workflowName);
+ continue;
+ }
+
+ if (!removeWorkflowContext(propertyStore, workflowName)) {
+ LOG.warn("Error occurred while trying to remove workflow context for
{}.", workflowName);
+ }
}
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 44d1787..4df2fd8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -346,20 +346,6 @@ public class WorkflowDispatcher extends
AbstractTaskDispatcher {
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
TaskConstants.STATE_MODEL_NAME);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Set the job configuration
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- HelixProperty resourceConfig = new HelixProperty(jobResource);
-
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
- Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
- if (taskConfigMap != null) {
- for (TaskConfig taskConfig : taskConfigMap.values()) {
- resourceConfig.getRecord().setMapField(taskConfig.getId(),
taskConfig.getConfigMap());
- }
- }
- accessor.setProperty(keyBuilder.resourceConfig(jobResource),
resourceConfig);
-
// Push out new ideal state based on number of target partitions
IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index fefc737..db27355 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -1,8 +1,16 @@
package org.apache.helix.controller.stages;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -66,7 +74,7 @@ public class TestTaskStage extends TaskTestBase {
TaskConstants.STATE_MODEL_NAME);
// Create the context
- WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+ WorkflowContext wfCtx = new WorkflowContext(new
ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -125,15 +133,34 @@ public class TestTaskStage extends TaskTestBase {
* async job purge will try to delete it again.
*/
@Test(dependsOnMethods = "testPersistContextData")
- public void testPartialDataPurge() {
+ public void testPartialDataPurge() throws Exception {
+ DedupEventProcessor<String, Runnable> worker =
+ new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+ AsyncWorkerType.TaskJobPurgeWorker.name()) {
+ @Override
+ protected void handleEvent(Runnable event) {
+ event.run();
+ }
+ };
+ worker.start();
+ Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool =
new HashMap<>();
+ workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+ _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
// Manually delete JobConfig
deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
+ // Manually refresh because there's no controller notify data change
+ BaseControllerDataProvider dataProvider =
+ _event.getAttribute(AttributeName.ControllerDataProvider.name());
+ dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+ dataProvider.refresh(_manager.getHelixDataAccessor());
+
// Then purge jobs
TaskGarbageCollectionStage garbageCollectionStage = new
TaskGarbageCollectionStage();
- garbageCollectionStage.execute(_event);
+ garbageCollectionStage.process(_event);
// Check that IS and contexts have been purged for the 2 jobs in both old
and new ZNode paths
// IdealState check
@@ -142,6 +169,41 @@ public class TestTaskStage extends TaskTestBase {
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
}
+ @Test(dependsOnMethods = "testPartialDataPurge")
+ public void testWorkflowGarbageCollection() throws Exception {
+ DedupEventProcessor<String, Runnable> worker =
+ new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+ AsyncWorkerType.TaskJobPurgeWorker.name()) {
+ @Override
+ protected void handleEvent(Runnable event) {
+ event.run();
+ }
+ };
+ worker.start();
+ Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool =
new HashMap<>();
+ workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+ _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
+ String zkPath =
+
_manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
+ _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
+
+ // Manually refresh because there's no controller notify data change
+ BaseControllerDataProvider dataProvider =
+ _event.getAttribute(AttributeName.ControllerDataProvider.name());
+ dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+ dataProvider.refresh(_manager.getHelixDataAccessor());
+
+ // Then garbage collect workflow
+ TaskGarbageCollectionStage garbageCollectionStage = new
TaskGarbageCollectionStage();
+ garbageCollectionStage.process(_event);
+
+ // Check that IS and contexts have been purged for the workflow
+ checkForIdealStateAndContextRemoval(_testWorkflow);
+
+ worker.shutdown();
+ }
+
private void deleteJobConfigs(String workflowName, String jobName) {
String oldPath =
_manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -150,16 +212,23 @@ public class TestTaskStage extends TaskTestBase {
_baseAccessor.remove(newPath, AccessOption.PERSISTENT);
}
- private void checkForIdealStateAndContextRemoval(String workflow, String
job) {
- // IdealState
- Assert.assertFalse(
- _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(),
AccessOption.PERSISTENT));
-
+ private void checkForIdealStateAndContextRemoval(String workflow, String
job) throws Exception {
// JobContexts in old ZNode path
String oldPath =
String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context",
CLUSTER_NAME, job);
String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
- Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
- Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+
+ Assert.assertTrue(TestHelper.verify(
+ () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(),
AccessOption.PERSISTENT)
+ && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) &&
!_baseAccessor
+ .exists(newPath, AccessOption.PERSISTENT), 120000));
+ }
+
+ private void checkForIdealStateAndContextRemoval(String workflow) throws
Exception {
+ Assert.assertTrue(TestHelper.verify(() ->
+ !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(),
AccessOption.PERSISTENT)
+ && !_baseAccessor
+ .exists(_keyBuilder.workflowContextZNode(workflow).getPath(),
AccessOption.PERSISTENT),
+ 120000));
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index c7274bc..80d411b 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -80,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
}
@Test
- public void testJobQueueAutoCleanUp() throws InterruptedException {
+ public void testJobQueueAutoCleanUp() throws Exception {
int capacity = 10;
String queueName = TestHelper.getTestMethodName();
JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
@@ -105,14 +105,19 @@ public class TestJobQueueCleanUp extends TaskTestBase {
}
_driver.start(builder.build());
_driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)),
TaskState.FAILED);
- Thread.sleep(2000);
- WorkflowConfig config = _driver.getWorkflowConfig(queueName);
- Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+ Assert
+ .assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ System.out.println("|Current time: " + System.currentTimeMillis() +"
**TEST: " + config.getJobDag().getAllNodes());
+ return config.getJobDag().getAllNodes().equals(remainJobs);
+ }, TestHelper.WAIT_DURATION));
- WorkflowContext context = _driver.getWorkflowContext(queueName);
- Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
-
Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ return context.getJobStates().keySet().equals(remainJobs) && remainJobs
+ .containsAll(context.getJobStartTimes().keySet());
+ }, TestHelper.WAIT_DURATION));
for (String job : deletedJobs) {
JobConfig cfg = _driver.getJobConfig(job);
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 6c29f3a..84df546 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,6 +103,72 @@ public class TestWorkflowContextWithoutConfig extends
TaskTestBase {
Assert.assertTrue(workflowContextNotCreated);
}
+ @Test
+ public void testWorkflowContextGarbageCollection() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+ _driver.start(builder1.build());
+
+ // Wait until workflow is created and IN_PROGRESS state.
+ _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+ // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed
created for this
+ // workflow
+ Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+ Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+ Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME,
workflowName));
+
+ String workflowContextPath =
+ "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName +
"/Context";
+
+ ZNRecord record =
_manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+ null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(record);
+
+ // Wait until workflow is completed.
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed
after workflow got
+ // expired.
+ boolean workflowExpired = TestHelper.verify(() -> {
+ WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+ WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+ IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME,
workflowName);
+ return (wCtx == null && wCfg == null && idealState == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(workflowExpired);
+
+ _controller.syncStop();
+
+ // Write workflow context to ZooKeeper
+
_manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath,
record,
+ AccessOption.PERSISTENT);
+
+ // Verify context is written back to ZK.
+ record =
_manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+ null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(record);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ // Create and start new workflow just to make sure controller is running
and new workflow is
+ // scheduled successfully.
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+ _driver.start(builder2.build());
+ _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+ // Verify that WorkflowContext will be deleted
+ boolean contextDeleted = TestHelper.verify(() -> {
+ WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+ return (wCtx == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(contextDeleted);
+ }
+
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..56e756d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTaskUtil extends TaskTestBase {
+
+ @Test
+ public void testGetExpiredJobsFromCache() {
+ String workflowName = "TEST_WORKFLOW";
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
+
+ JobConfig.Builder jobBuilder_0 =
+ new
JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_1 =
+ new
JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_2 =
+ new
JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_3 =
+ new
JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ Workflow jobQueue =
+ queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1",
jobBuilder_1)
+ .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3",
jobBuilder_3).build();
+
+ WorkflowContext workflowContext = mock(WorkflowContext.class);
+ Map<String, TaskState> jobStates = new HashMap<>();
+ jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
+ jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
+ jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
+ when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+ JobConfig jobConfig = mock(JobConfig.class);
+ WorkflowControllerDataProvider workflowControllerDataProvider =
+ mock(WorkflowControllerDataProvider.class);
+ when(workflowControllerDataProvider.getJobConfig(workflowName +
"_Job_1")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
+ .thenReturn(jobConfig);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
+ .thenReturn(jobConfig);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
+ .thenReturn(jobConfig);
+
+ JobContext jobContext = mock(JobContext.class);
+ when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
+
+ when(workflowControllerDataProvider.getJobContext(workflowName +
"_Job_1")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+ .thenReturn(jobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+ .thenReturn(jobContext);
+
+ Set<String> expectedJobs = new HashSet<>();
+ expectedJobs.add(workflowName + "_Job_0");
+ expectedJobs.add(workflowName + "_Job_3");
+ Assert.assertEquals(TaskUtil
+ .getExpiredJobsFromCache(workflowControllerDataProvider,
jobQueue.getWorkflowConfig(),
+ workflowContext), expectedJobs);
+ }
+}