This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new c08f9d3 TASK: Fix forceDelete for jobs in JobQueue
c08f9d3 is described below
commit c08f9d31522befe525e02e64d06d92996f11ca58
Author: Hunter Lee <[email protected]>
AuthorDate: Wed Sep 4 12:26:10 2019 -0700
TASK: Fix forceDelete for jobs in JobQueue
We observed that the force delete functionality doesn't really work when
the job is running, saying that the job is currently running. Force delete
should go through regardless of the current job status.
Changelist:
1. Change the semantics in deleteJobFromQueue
2. Add an integration test: TestDeleteJobFromJobQueue
---
.../java/org/apache/helix/task/TaskDriver.java | 79 ++++++++++++--------
.../task/TestDeleteJobFromJobQueue.java | 83 ++++++++++++++++++++++
.../rest/server/resources/helix/JobAccessor.java | 2 +-
3 files changed, 132 insertions(+), 32 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 98e7b58..dfca045 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -262,52 +262,69 @@ public class TaskDriver {
* @param queue queue name
* @param job job name: namespaced job name
* @param forceDelete CAUTION: if set true, all job's related zk nodes will
- * be clean up from zookeeper even if its workflow information can
not be found.
+ * be removed from zookeeper even if its JobQueue information can
not be found.
*/
public void deleteNamespacedJob(final String queue, final String job,
boolean forceDelete) {
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
-
- if (workflowCfg == null) {
- if (forceDelete) {
- // remove all job znodes if its original workflow config was already
gone.
- LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
- boolean success = TaskUtil.removeJob(_accessor, _propertyStore, job);
- if (!success) {
- LOG.info("Failed to delete job: " + job + " from queue: " + queue);
- throw new HelixException("Failed to delete job: " + job + " from
queue: " + queue);
+ WorkflowConfig jobQueueConfig = TaskUtil.getWorkflowConfig(_accessor,
queue);
+ boolean isRecurringWorkflow;
+
+ // Force deletion of a job
+ if (forceDelete) {
+ // remove all job-related ZNodes
+ LOG.info("Forcefully removing job: {} from queue: {}", job, queue);
+ if (!TaskUtil.removeJob(_accessor, _propertyStore, job)) {
+ LOG.info("Failed to delete job: {} from queue: {}", job, queue);
+ throw new HelixException("Failed to delete job: " + job + " from
queue: " + queue);
+ }
+ // In case this was a recurrent workflow, remove it from last scheduled
queue as well
+ if (jobQueueConfig != null) {
+ isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null
+ && jobQueueConfig.getScheduleConfig().isRecurring();
+ if (isRecurringWorkflow) {
+ deleteJobFromLastScheduledQueue(queue,
TaskUtil.getDenamespacedJobName(queue, job));
}
- } else {
- throw new IllegalArgumentException("Queue " + queue + " does not yet
exist!");
}
return;
}
- if (!workflowCfg.isJobQueue()) {
- throw new IllegalArgumentException(queue + " is not a queue!");
+ // Regular, non-force, deletion of a job
+ if (jobQueueConfig == null) {
+ throw new IllegalArgumentException(
+ String.format("JobQueue %s's config is not found!", queue));
}
-
- boolean isRecurringWorkflow =
- (workflowCfg.getScheduleConfig() != null &&
workflowCfg.getScheduleConfig().isRecurring());
-
+ if (!jobQueueConfig.isJobQueue()) {
+ throw new IllegalArgumentException(String.format("%s is not a queue!",
queue));
+ }
+ isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null
+ && jobQueueConfig.getScheduleConfig().isRecurring();
String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job);
if (isRecurringWorkflow) {
- // delete job from the last scheduled queue if there exists one.
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore,
queue);
- String lastScheduledQueue = null;
- if (wCtx != null) {
- lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
- }
- if (lastScheduledQueue != null) {
- WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor,
lastScheduledQueue);
- if (lastWorkflowCfg != null) {
- deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
- }
- }
+ deleteJobFromLastScheduledQueue(queue, denamespacedJob);
}
deleteJobFromQueue(queue, denamespacedJob);
}
/**
+ * Delete the given job from the last-scheduled queue for recurrent
workflows.
+ * @param queue
+ * @param denamespacedJob
+ */
+ private void deleteJobFromLastScheduledQueue(String queue, String
denamespacedJob) {
+ // delete job from the last scheduled queue if there exists one.
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ String lastScheduledQueue = null;
+ if (wCtx != null) {
+ lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ }
+ if (lastScheduledQueue != null) {
+ WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor,
lastScheduledQueue);
+ if (lastWorkflowCfg != null) {
+ deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
+ }
+ }
+ }
+
+ /**
* Delete a job from a scheduled (non-recurrent) queue.
* @param queue
* @param job this must be a namespaced job name
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
new file mode 100644
index 0000000..dfe6d4e
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
@@ -0,0 +1,83 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDeleteJobFromJobQueue extends TaskTestBase {
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numPartitions = 1;
+ super.beforeClass();
+ }
+
+ @Test
+ public void testForceDeleteJobFromJobQueue() throws InterruptedException {
+ String jobQueueName = TestHelper.getTestMethodName();
+
+ // Create two jobs: job1 will complete fast, and job2 will be stuck in
progress (taking a long
+ // time to finish). The idea is to force-delete a stuck job (job2).
+ JobConfig.Builder jobBuilder =
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10"));
+ JobConfig.Builder jobBuilder2 =
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY,
"100000")).setTimeout(100000);
+
+ JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+ jobQueue.enqueueJob("job1", jobBuilder);
+ jobQueue.enqueueJob("job2", jobBuilder2);
+ _driver.start(jobQueue.build());
+ _driver.pollForJobState(jobQueueName,
TaskUtil.getNamespacedJobName(jobQueueName, "job2"),
+ TaskState.IN_PROGRESS);
+
+ try {
+ _driver.deleteJob(jobQueueName, "job2");
+ Assert.fail("Regular, non-force deleteJob should fail since the workflow
is in progress!");
+ } catch (IllegalStateException e) {
+ // Expect IllegalStateException because job2 is still in progress
+ }
+
+ // Check that the job ZNodes have not been deleted by regular deleteJob
call
+
Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
"job2")));
+ Assert
+
.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
"job2")));
+
Assert.assertNotNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME,
+ TaskUtil.getNamespacedJobName(jobQueueName, "job2")));
+
+ // The following force delete for the job should go through without
getting an exception
+ _driver.deleteJob(jobQueueName, "job2", true);
+
+ // Check that the job has been force-deleted (fully gone from ZK)
+
Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
"job2")));
+
Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
"job2")));
+
Assert.assertNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME,
+ TaskUtil.getNamespacedJobName(jobQueueName, "job2")));
+ }
+}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
index 5548bd3..a63025b 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -133,7 +133,7 @@ public class JobAccessor extends AbstractHelixResource {
public Response deleteJob(@PathParam("clusterId") String clusterId,
@PathParam("workflowName") String workflowName, @PathParam("jobName")
String jobName,
@QueryParam("force") @DefaultValue("false") String forceDelete) {
- boolean force = Boolean.valueOf(forceDelete);
+ boolean force = Boolean.parseBoolean(forceDelete);
TaskDriver driver = getTaskDriver(clusterId);
try {