TASK: Fix JobQueue's job state-related bug The bug was observed in TestTaskRebalancerStopResume:stopAndResumeNamedQueue(), which was being unstable. It was observed that for JobQueues with multiple jobs, the second job would get marked as IN_PROGRESS even though the first job hadn't completed/failed, especially when the queue was being stopped and resumed. This was due to a bug in getIncompleteJobCount() because it was not counting jobs in STOPPING state. This was fixed and another check was added right before JobDispatcher marks a job as STOPPED so that it would not mark it STOPPED if the job state is NOT_STARTED. Changelist: 1. Fix getIncompleteJobCount() 2. Add a check so that we don't mark NOT_STARTED jobs as STOPPED
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3d9c0306 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3d9c0306 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3d9c0306 Branch: refs/heads/master Commit: 3d9c03064a5c26a9ed9ad674567674f2d9eca160 Parents: 3844ad6 Author: narendly <[email protected]> Authored: Thu Nov 1 16:55:59 2018 -0700 Committer: narendly <[email protected]> Committed: Thu Nov 1 16:55:59 2018 -0700 ---------------------------------------------------------------------- helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java | 2 +- helix-core/src/main/java/org/apache/helix/task/TaskUtil.java | 3 ++- .../org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 06f6ce4..05d8e5c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -133,7 +133,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted // Update running status in workflow context if (jobTgtState == TargetState.STOP) { - if (TaskUtil.checkJobStopped(jobCtx)) { + if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) { workflowCtx.setJobState(jobName, TaskState.STOPPED); } else { workflowCtx.setJobState(jobName, TaskState.STOPPING); http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 958805b..b587408 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -943,7 +943,8 @@ public class TaskUtil { int inCompleteCount = 0; for (String jobName : workflowCfg.getJobDag().getAllNodes()) { TaskState jobState = workflowCtx.getJobState(jobName); - if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { + if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED + || jobState == TaskState.STOPPING) { ++inCompleteCount; } } http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java index 73fe674..25c486a 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java @@ -53,7 +53,7 @@ public class TestGetLastScheduledTaskExecInfo extends TaskTestBase { Assert.assertEquals(startTimesWithStuckTasks.get(3), lastScheduledTaskTs); Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_2_job_0"); - // Workflow 2 will stuck, so its partition state will be RUNNING + // Workflow 2 will be stuck, so its partition state will be RUNNING Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.RUNNING); Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs); @@ -61,6 +61,7 @@ public class TestGetLastScheduledTaskExecInfo extends TaskTestBase { // API call needs to return the most recent timestamp (value at last index) lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"); execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3"); + Thread.sleep(200); // Let the tasks run Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs); Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");
