Repository: helix Updated Branches: refs/heads/master a6bdb3c22 -> 317c300c8
[HELIX-696] fix workflow state flip-flop issue Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/317c300c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/317c300c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/317c300c Branch: refs/heads/master Commit: 317c300c8f951c7e8308cb0b24e48f97a1ef32ef Parents: a6bdb3c Author: Harry Zhang <[email protected]> Authored: Thu Apr 19 16:13:13 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Fri Apr 20 09:56:34 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/JobRebalancer.java | 1 + .../org/apache/helix/task/TaskRebalancer.java | 36 ++- .../apache/helix/task/WorkflowRebalancer.java | 66 +++-- .../integration/task/TestDeleteWorkflow.java | 2 +- .../task/TestWorkflowTermination.java | 294 +++++++++++++++++++ .../helix/task/TestJobStateOnCreation.java | 2 +- 6 files changed, 366 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 1620238..fd80229 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -678,6 +678,7 @@ public class JobRebalancer extends TaskRebalancer { jobContext.setFinishTime(currentTime); if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { workflowContext.setFinishTime(currentTime); + updateWorkflowMonitor(workflowContext, workflowConfig); } scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); } http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index ece1935..d8b8ea9 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -65,24 +65,31 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { * * @param ctx Workflow context containing job states * @param cfg Workflow config containing set of jobs - * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED}) - * or failed (any task is {@link TaskState#FAILED}, false otherwise. + * @return returns true if the workflow + * 1. completed (all tasks are {@link TaskState#COMPLETED}) + * 2. failed (any task is {@link TaskState#FAILED} + * 3. workflow is {@link TaskState#TIMED_OUT} + * returns false otherwise. */ protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg, Map<String, JobConfig> jobConfigMap) { boolean incomplete = false; - int failedJobs = 0; + TaskState workflowState = ctx.getWorkflowState(); + if (TaskState.TIMED_OUT.equals(workflowState)) { + // We don't update job state here as JobRebalancer will do it + return true; + } + + // Check if failed job count is beyond threshold and if so, fail the workflow + // and abort in-progress jobs + int failedJobs = 0; for (String job : cfg.getJobDag().getAllNodes()) { TaskState jobState = ctx.getJobState(job); if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) { failedJobs++; - if (TaskState.TIMED_OUT.equals(workflowState) || (!cfg.isJobQueue() && failedJobs > cfg - .getFailureThreshold())) { + if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) { ctx.setWorkflowState(TaskState.FAILED); - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED); - } for (String jobToFail : cfg.getJobDag().getAllNodes()) { if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) { ctx.setJobState(jobToFail, TaskState.ABORTED); @@ -104,10 +111,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (!incomplete && cfg.isTerminable()) { ctx.setWorkflowState(TaskState.COMPLETED); - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, - ctx.getFinishTime() - ctx.getStartTime()); - } return true; } @@ -254,6 +257,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { workflowContext.setFinishTime(currentTime); + updateWorkflowMonitor(workflowContext, workflowConfig); } scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); } @@ -270,6 +274,14 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } + protected void updateWorkflowMonitor(WorkflowContext context, + WorkflowConfig config) { + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.updateWorkflowCounters(config, context.getWorkflowState(), + context.getFinishTime() - context.getStartTime()); + } + } + /** * Check if a workflow is ready to schedule. * http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 709ba65..9233f9b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -54,6 +54,9 @@ import com.google.common.collect.Lists; */ public class WorkflowRebalancer extends TaskRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WorkflowRebalancer.class); + private static final Set<TaskState> finalStates = new HashSet<>( + Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT) + ); @Override public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, @@ -68,40 +71,41 @@ public class WorkflowRebalancer extends TaskRebalancer { return buildEmptyAssignment(workflow, currStateOutput); } - WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflow); - // Initialize workflow context if needed - if (workflowCtx == null) { - workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); - workflowCtx.setStartTime(System.currentTimeMillis()); - workflowCtx.setName(workflow); + WorkflowContext workflowCtx = getOrInitializeWorkflowContext(clusterData, workflow); + + // Step 1: Check for deletion - if so, we don't need to go through further steps + // Clean up if workflow marked for deletion + TargetState targetState = workflowCfg.getTargetState(); + if (targetState == TargetState.DELETE) { + LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); + cleanupWorkflow(workflow, workflowCfg); + return buildEmptyAssignment(workflow, currStateOutput); } - Set<TaskState> finalStates = new HashSet<>(Arrays.asList( - new TaskState[] { TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, - TaskState.FAILED, TaskState.TIMED_OUT - })); + // Step 2: handle timeout, which should have higher priority than STOP // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if // the workflow already got timeouted. Job Queue will ignore the setup. if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) { // If timeout point has already been passed, it will not be scheduled scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout()); - if (isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { + if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout( + workflowCtx.getStartTime(), workflowCfg.getTimeout())) { workflowCtx.setWorkflowState(TaskState.TIMED_OUT); clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor()); - return buildEmptyAssignment(workflow, currStateOutput); } - } - // Clean up if workflow marked for deletion - TargetState targetState = workflowCfg.getTargetState(); - if (targetState == TargetState.DELETE) { - LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); - cleanupWorkflow(workflow, workflowCfg); - return buildEmptyAssignment(workflow, currStateOutput); + // We should not return after setting timeout, as in case the workflow is stopped already + // marking it timeout will not trigger rebalance pipeline as we are not listening on + // PropertyStore change, nor will we schedule rebalance for timeout as at this point, + // workflow is already timed-out. We should let the code proceed and wait for schedule + // future cleanup work } - if (targetState == TargetState.STOP) { + // Step 3: handle workflow that should STOP + // For workflows that already reached final states, STOP should not take into effect + if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP + .equals(targetState)) { LOG.info("Workflow " + workflow + "is marked as stopped."); if (isWorkflowStopped(workflowCtx, workflowCfg)) { workflowCtx.setWorkflowState(TaskState.STOPPED); @@ -111,13 +115,21 @@ public class WorkflowRebalancer extends TaskRebalancer { } long currentTime = System.currentTimeMillis(); - // Check if workflow has been finished and mark it if it is. + + // Step 4: Check and process finished workflow context (confusing, + // but its inside isWorkflowFinished()) + // Check if workflow has been finished and mark it if it is. Also update cluster status + // monitor if provided + // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed + // This is to handle TIMED_OUT only if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx, workflowCfg, clusterData.getJobConfigMap())) { workflowCtx.setFinishTime(currentTime); + updateWorkflowMonitor(workflowCtx, workflowCfg); clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor()); } + // Step 5: Handle finished workflows if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { LOG.info("Workflow " + workflow + " is finished."); long expiryTime = workflowCfg.getExpiry(); @@ -160,6 +172,18 @@ public class WorkflowRebalancer extends TaskRebalancer { return buildEmptyAssignment(workflow, currStateOutput); } + private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterData, String workflowName) { + WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName); + if (workflowCtx == null) { + WorkflowConfig config = clusterData.getWorkflowConfig(workflowName); + workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); + workflowCtx.setStartTime(System.currentTimeMillis()); + workflowCtx.setName(workflowName); + LOG.debug("Workflow context is created for " + workflowName); + } + return workflowCtx; + } + /** * Figure out whether the jobs in the workflow should be run, * and if it's ready, then just schedule it http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java index a151827..381bdeb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java @@ -18,7 +18,7 @@ import org.testng.annotations.Test; public class TestDeleteWorkflow extends TaskTestBase { - private static final int DELETE_DELAY = 1000; + private static final int DELETE_DELAY = 2000; private HelixAdmin admin; http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java new file mode 100644 index 0000000..71d494b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java @@ -0,0 +1,294 @@ +package org.apache.helix.integration.task; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import java.lang.management.ManagementFactory; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.helix.TestHelper; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * This test contains cases when a workflow finish + */ +public class TestWorkflowTermination extends TaskTestBase { + private final static String JOB_NAME = "TestJob"; + private final static String WORKFLOW_TYPE = "TestWorkflow"; + private static final MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); + + @BeforeClass + public void beforeClass() throws Exception { + _numDbs = 1; + _numNodes = 3; + _numParitions = 5; + _numReplicas = 3; + super.beforeClass(); + } + + private JobConfig.Builder createJobConfigBuilder(String workflow, boolean shouldJobFail, long timeoutMs) { + String taskState = shouldJobFail ? TaskState.FAILED.name() : TaskState.COMPLETED.name(); + return new JobConfig.Builder() + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) + .setWorkflow(workflow) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap( + ImmutableMap.of( + MockTask.TIMEOUT_CONFIG, Long.toString(timeoutMs), + MockTask.TASK_RESULT_STATUS, taskState + ) + ); + } + + @Test + public void testWorkflowSucceed() throws Exception { + String workflowName = TestHelper.getTestMethodName(); + long workflowExpiry = 2000; + long timeout = 2000; + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50); + jobBuilder.setWorkflow(workflowName); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) + .setWorkflowConfig( + new WorkflowConfig.Builder(workflowName) + .setTimeout(timeout) + .setWorkFlowType(WORKFLOW_TYPE) + .build() + ) + .addJob(JOB_NAME, jobBuilder) + .setExpiry(workflowExpiry); + _driver.start(workflowBuilder.build()); + + // Timeout is longer than job finish so workflow status should be COMPLETED + _driver.pollForWorkflowState(workflowName, 5000L, TaskState.COMPLETED); + WorkflowContext context = _driver.getWorkflowContext(workflowName); + Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout); + + // Workflow should be cleaned up after expiry + Thread.sleep(workflowExpiry + 200); + verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME)); + + ObjectName objectName = getWorkflowMBeanObjectName(workflowName); + Assert.assertEquals((long) beanServer.getAttribute(objectName, "SuccessfulWorkflowCount"), 1); + Assert.assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge") > 0); + Assert.assertTrue((long) beanServer.getAttribute(objectName, "WorkflowLatencyCount") > 0); + + } + + @Test + public void testWorkflowRunningTimeout() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + String notStartedJobName = JOB_NAME + "-NotStarted"; + long workflowExpiry = 2000; // 2sec expiry time + long timeout = 50; + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 5000); + jobBuilder.setWorkflow(workflowName); + + // Create a workflow where job2 depends on job1. Workflow would timeout before job1 finishes + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) + .setWorkflowConfig( + new WorkflowConfig.Builder(workflowName) + .setTimeout(timeout) + .setWorkFlowType(WORKFLOW_TYPE) + .build() + ) + .addJob(JOB_NAME, jobBuilder) + .addJob(notStartedJobName, jobBuilder) + .addParentChildDependency(JOB_NAME, notStartedJobName) + .setExpiry(workflowExpiry); + + _driver.start(workflowBuilder.build()); + + _driver.pollForWorkflowState(workflowName, 10000L, TaskState.TIMED_OUT); + + // Running job should be marked as timeout + // and job not started should be marked as NOT_STARTED + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L, TaskState.TIMED_OUT); + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, notStartedJobName), 10000L, + TaskState.NOT_STARTED); + + WorkflowContext context = _driver.getWorkflowContext(workflowName); + Assert.assertTrue(context.getFinishTime() - context.getStartTime() >= timeout); + + Thread.sleep(workflowExpiry + 200); + + verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME), + getJobNameToPoll(workflowName, notStartedJobName)); + } + + @Test + public void testWorkflowPausedTimeout() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + long workflowExpiry = 2000; // 2sec expiry time + long timeout = 2000; + String notStartedJobName = JOB_NAME + "-NotStarted"; + + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 100); + jobBuilder.setWorkflow(workflowName); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) + .setWorkflowConfig( + new WorkflowConfig.Builder(workflowName) + .setTimeout(timeout) + .setWorkFlowType(WORKFLOW_TYPE) + .build() + ) + .addJob(JOB_NAME, jobBuilder) + .addJob(notStartedJobName, jobBuilder) + .addParentChildDependency(JOB_NAME, notStartedJobName) + .setExpiry(workflowExpiry); + + _driver.start(workflowBuilder.build()); + + // Wait a bit for the job to get scheduled. Job runs for 100ms so this will very likely + // to trigger a job stopped + Thread.sleep(40); + + // Pause the queue + _driver.waitToStop(workflowName, 10000L); + + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L, + TaskState.STOPPED); + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, notStartedJobName), 10000L, + TaskState.NOT_STARTED); + + _driver.pollForWorkflowState(workflowName, 10000L, TaskState.TIMED_OUT); + + WorkflowContext context = _driver.getWorkflowContext(workflowName); + Assert.assertTrue(context.getFinishTime() - context.getStartTime() >= timeout); + + Thread.sleep(workflowExpiry + 200); + verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME), + getJobNameToPoll(workflowName, notStartedJobName)); + + } + + @Test + public void testJobQueueNotApplyTimeout() throws InterruptedException { + String queueName = TestHelper.getTestMethodName(); + long timeout = 1000; + // Make jobs run success + JobConfig.Builder jobBuilder = createJobConfigBuilder(queueName, false, 10); + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(queueName); + jobQueue.setWorkflowConfig( + new WorkflowConfig.Builder(queueName) + .setTimeout(timeout) + .setWorkFlowType(WORKFLOW_TYPE) + .build() + ) + .enqueueJob(JOB_NAME, jobBuilder) + .enqueueJob(JOB_NAME + 1, jobBuilder); + + _driver.start(jobQueue.build()); + + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, JOB_NAME), + TaskState.COMPLETED); + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, JOB_NAME + 1), + TaskState.COMPLETED); + + Thread.sleep(timeout); + + // Verify that job queue is still in progress + _driver.pollForWorkflowState(queueName, 10000L, TaskState.IN_PROGRESS); + } + + @Test + public void testWorkflowJobFail() throws Exception { + String workflowName = TestHelper.getTestMethodName(); + String job1 = JOB_NAME + "1"; + String job2 = JOB_NAME + "2"; + String job3 = JOB_NAME + "3"; + String job4 = JOB_NAME + "4"; + long workflowExpiry = 2000; + long timeout = 5000; + + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50); + JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 10); + + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) + .setWorkflowConfig( + new WorkflowConfig.Builder(workflowName) + .setWorkFlowType(WORKFLOW_TYPE) + .setTimeout(timeout) + .setFailureThreshold(1) + .build() + ) + .addJob(job1, jobBuilder) + .addJob(job2, jobBuilder) + .addJob(job3, failedJobBuilder) + .addJob(job4, jobBuilder) + .addParentChildDependency(job1, job2) + .addParentChildDependency(job1, job3) + .addParentChildDependency(job2, job4) + .addParentChildDependency(job3, job4) + .setExpiry(workflowExpiry); + + _driver.start(workflowBuilder.build()); + + _driver.pollForWorkflowState(workflowName, 5000L, TaskState.FAILED); + + // Timeout is longer than fail time, so the failover should occur earlier + WorkflowContext context = _driver.getWorkflowContext(workflowName); + Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout); + + // job1 will complete + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 5000L, + TaskState.COMPLETED); + + // Possible race between 2 and 3 so it's likely for job2 to stay in either COMPLETED or ABORTED + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 5000L, + TaskState.COMPLETED, TaskState.ABORTED); + + // job3 meant to fail + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 5000L, + TaskState.FAILED); + + // because job4 has dependency over job3, it will fail as well + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 5000L, + TaskState.FAILED); + + // Check MBean is updated + ObjectName objectName = getWorkflowMBeanObjectName(workflowName); + Assert.assertEquals((long) beanServer.getAttribute(objectName, "FailedWorkflowCount"), 1); + + // For a failed workflow, after timing out, it will be purged + Thread.sleep(workflowExpiry + 200); + verifyWorkflowCleanup( + workflowName, + getJobNameToPoll(workflowName, job1), + getJobNameToPoll(workflowName, job2), + getJobNameToPoll(workflowName, job3), + getJobNameToPoll(workflowName, job4) + ); + } + + private void verifyWorkflowCleanup(String workflowName, String... jobNames) { + Assert.assertNull(_driver.getWorkflowConfig(workflowName)); + Assert.assertNull(_driver.getWorkflowContext(workflowName)); + for (String job : jobNames) { + Assert.assertNull(_driver.getJobConfig(job)); + Assert.assertNull(_driver.getJobContext(job)); + } + } + + private static String getJobNameToPoll(String workflowName, String jobName) { + return String.format("%s_%s", workflowName, jobName); + } + + private ObjectName getWorkflowMBeanObjectName(String workflowName) + throws MalformedObjectNameException { + return new ObjectName(String + .format("%s:%s=%s, %s=%s", MonitorDomainNames.ClusterStatus.name(), "cluster", + CLUSTER_NAME, "workflowType", WORKFLOW_TYPE)); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java index 4b5b309..c454879 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java @@ -86,4 +86,4 @@ public class TestJobStateOnCreation extends TaskSynchronizedTestBase { Assert.assertEquals(jobStates.get(job), TaskState.NOT_STARTED); } } -} \ No newline at end of file +}
