TASK: Fix JobQueue's job state-related bug

The bug was observed in TestTaskRebalancerStopResume:stopAndResumeNamedQueue(), 
which was being unstable.
It was observed that for JobQueues with multiple jobs, the second job would get 
marked as IN_PROGRESS even though the first job hadn't completed/failed, 
especially when the queue was being stopped and resumed. This was due to a bug 
in getIncompleteJobCount() because it was not counting jobs in STOPPING state. 
This was fixed and another check was added right before JobDispatcher marks a 
job as STOPPED so that it would not mark it STOPPED if the job state is 
NOT_STARTED.
Changelist:
1. Fix getIncompleteJobCount()
2. Add a check so that we don't mark NOT_STARTED jobs as STOPPED


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3d9c0306
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3d9c0306
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3d9c0306

Branch: refs/heads/master
Commit: 3d9c03064a5c26a9ed9ad674567674f2d9eca160
Parents: 3844ad6
Author: narendly <[email protected]>
Authored: Thu Nov 1 16:55:59 2018 -0700
Committer: narendly <[email protected]>
Committed: Thu Nov 1 16:55:59 2018 -0700

----------------------------------------------------------------------
 helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java | 2 +-
 helix-core/src/main/java/org/apache/helix/task/TaskUtil.java      | 3 ++-
 .../org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java   | 3 ++-
 3 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 06f6ce4..05d8e5c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -133,7 +133,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks 
are being aborted
       // Update running status in workflow context
       if (jobTgtState == TargetState.STOP) {
-        if (TaskUtil.checkJobStopped(jobCtx)) {
+        if (jobState != TaskState.NOT_STARTED && 
TaskUtil.checkJobStopped(jobCtx)) {
           workflowCtx.setJobState(jobName, TaskState.STOPPED);
         } else {
           workflowCtx.setJobState(jobName, TaskState.STOPPING);

http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 958805b..b587408 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -943,7 +943,8 @@ public class TaskUtil {
     int inCompleteCount = 0;
     for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
       TaskState jobState = workflowCtx.getJobState(jobName);
-      if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+      if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED
+          || jobState == TaskState.STOPPING) {
         ++inCompleteCount;
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/3d9c0306/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
index 73fe674..25c486a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
@@ -53,7 +53,7 @@ public class TestGetLastScheduledTaskExecInfo extends 
TaskTestBase {
     Assert.assertEquals(startTimesWithStuckTasks.get(3), lastScheduledTaskTs);
 
     Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_2_job_0");
-    // Workflow 2 will stuck, so its partition state will be RUNNING
+    // Workflow 2 will be stuck, so its partition state will be RUNNING
     Assert.assertEquals(execInfo.getTaskPartitionState(), 
TaskPartitionState.RUNNING);
     Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs);
 
@@ -61,6 +61,7 @@ public class TestGetLastScheduledTaskExecInfo extends 
TaskTestBase {
     // API call needs to return the most recent timestamp (value at last index)
     lastScheduledTaskTs = 
_driver.getLastScheduledTaskTimestamp("TestWorkflow_3");
     execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3");
+    Thread.sleep(200); // Let the tasks run
 
     Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 
1), lastScheduledTaskTs);
     Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");

Reply via email to