Helix job should fail immediately if the target resource is disabled For targeted jobs, once the resource has been disabled, Helix should not keep scheduling the jobs. We should fail it out. If the job is already in progress, we can mark it as failed but in running task will not actually got cancelled.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/68fe74e6 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/68fe74e6 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/68fe74e6 Branch: refs/heads/master Commit: 68fe74e696c92d332627935148f645f64b42d389 Parents: eac4940 Author: Junkai Xue <[email protected]> Authored: Thu May 31 17:55:36 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Thu Jul 12 16:53:24 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/JobRebalancer.java | 6 ++- .../TestFailTargetJobWhenResourceDisabled.java | 53 ++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/68fe74e6/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 0f09166..11abb25 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -258,8 +258,10 @@ public class JobRebalancer extends TaskRebalancer { addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg); - if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg - .getFailureThreshold()) { + if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold() + || (jobCfg.getTargetResource() != null + && cache.getIdealState(jobCfg.getTargetResource()) != null && !cache + .getIdealState(jobCfg.getTargetResource()).isEnabled())) { if (isJobFinished(jobCtx, jobResource, currStateOutput)) { failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); return buildEmptyAssignment(jobResource, currStateOutput); http://git-wip-us.apache.org/repos/asf/helix/blob/68fe74e6/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java new file mode 100644 index 0000000..676c053 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java @@ -0,0 +1,53 @@ +package org.apache.helix.integration.task; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.model.IdealState; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestFailTargetJobWhenResourceDisabled extends TaskTestBase { + private JobConfig.Builder _jobCfg; + private String _jobName; + + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + _jobName = "TestJob"; + _jobCfg = new JobConfig.Builder().setJobId(_jobName).setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB); + } + + @Test + public void testJobScheduleAfterResourceDisabled() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + _gSetupTool.getClusterManagementTool() + .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false); + Workflow.Builder workflow = new Workflow.Builder(workflowName); + workflow.addJob(_jobName, _jobCfg); + _driver.start(workflow.build()); + + _driver.pollForWorkflowState(workflowName, TaskState.FAILED); + } + + @Test + public void testJobScheduleBeforeResourceDisabled() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder workflow = new Workflow.Builder(workflowName); + _jobCfg.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000000")); + workflow.addJob(_jobName, _jobCfg); + _driver.start(workflow.build()); + Thread.sleep(1000); + _gSetupTool.getClusterManagementTool() + .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false); + _driver.pollForWorkflowState(workflowName, TaskState.FAILED); + } + + +}
