This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new c08f9d3  TASK: Fix forceDelete for jobs in JobQueue
c08f9d3 is described below

commit c08f9d31522befe525e02e64d06d92996f11ca58
Author: Hunter Lee <[email protected]>
AuthorDate: Wed Sep 4 12:26:10 2019 -0700

    TASK: Fix forceDelete for jobs in JobQueue
    
    We observed that the force delete functionality doesn't really work when 
the job is running, saying that the job is currently running. Force delete 
should go through regardless of the current job status.
    Changelist:
    1. Change the semantics in deleteJobFromQueue
    2. Add an integration test: TestDeleteJobFromJobQueue
---
 .../java/org/apache/helix/task/TaskDriver.java     | 79 ++++++++++++--------
 .../task/TestDeleteJobFromJobQueue.java            | 83 ++++++++++++++++++++++
 .../rest/server/resources/helix/JobAccessor.java   |  2 +-
 3 files changed, 132 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 98e7b58..dfca045 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -262,52 +262,69 @@ public class TaskDriver {
    * @param queue queue name
    * @param job job name: namespaced job name
    * @param forceDelete CAUTION: if set true, all job's related zk nodes will
-   *          be clean up from zookeeper even if its workflow information can 
not be found.
+   *          be removed from zookeeper even if its JobQueue information can 
not be found.
    */
   public void deleteNamespacedJob(final String queue, final String job, 
boolean forceDelete) {
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
-
-    if (workflowCfg == null) {
-      if (forceDelete) {
-        // remove all job znodes if its original workflow config was already 
gone.
-        LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
-        boolean success = TaskUtil.removeJob(_accessor, _propertyStore, job);
-        if (!success) {
-          LOG.info("Failed to delete job: " + job + " from queue: " + queue);
-          throw new HelixException("Failed to delete job: " + job + " from 
queue: " + queue);
+    WorkflowConfig jobQueueConfig = TaskUtil.getWorkflowConfig(_accessor, 
queue);
+    boolean isRecurringWorkflow;
+
+    // Force deletion of a job
+    if (forceDelete) {
+      // remove all job-related ZNodes
+      LOG.info("Forcefully removing job: {} from queue: {}", job, queue);
+      if (!TaskUtil.removeJob(_accessor, _propertyStore, job)) {
+        LOG.info("Failed to delete job: {} from queue: {}", job, queue);
+        throw new HelixException("Failed to delete job: " + job + " from 
queue: " + queue);
+      }
+      // In case this was a recurrent workflow, remove it from last scheduled 
queue as well
+      if (jobQueueConfig != null) {
+        isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null
+            && jobQueueConfig.getScheduleConfig().isRecurring();
+        if (isRecurringWorkflow) {
+          deleteJobFromLastScheduledQueue(queue, 
TaskUtil.getDenamespacedJobName(queue, job));
         }
-      } else {
-        throw new IllegalArgumentException("Queue " + queue + " does not yet 
exist!");
       }
       return;
     }
 
-    if (!workflowCfg.isJobQueue()) {
-      throw new IllegalArgumentException(queue + " is not a queue!");
+    // Regular, non-force, deletion of a job
+    if (jobQueueConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("JobQueue %s's config is not found!", queue));
     }
-
-    boolean isRecurringWorkflow =
-        (workflowCfg.getScheduleConfig() != null && 
workflowCfg.getScheduleConfig().isRecurring());
-
+    if (!jobQueueConfig.isJobQueue()) {
+      throw new IllegalArgumentException(String.format("%s is not a queue!", 
queue));
+    }
+    isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null
+        && jobQueueConfig.getScheduleConfig().isRecurring();
     String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job);
     if (isRecurringWorkflow) {
-      // delete job from the last scheduled queue if there exists one.
-      WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, 
queue);
-      String lastScheduledQueue = null;
-      if (wCtx != null) {
-        lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-      }
-      if (lastScheduledQueue != null) {
-        WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, 
lastScheduledQueue);
-        if (lastWorkflowCfg != null) {
-          deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
-        }
-      }
+      deleteJobFromLastScheduledQueue(queue, denamespacedJob);
     }
     deleteJobFromQueue(queue, denamespacedJob);
   }
 
   /**
+   * Delete the given job from the last-scheduled queue for recurrent 
workflows.
+   * @param queue
+   * @param denamespacedJob
+   */
+  private void deleteJobFromLastScheduledQueue(String queue, String 
denamespacedJob) {
+    // delete job from the last scheduled queue if there exists one.
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+    String lastScheduledQueue = null;
+    if (wCtx != null) {
+      lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    }
+    if (lastScheduledQueue != null) {
+      WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, 
lastScheduledQueue);
+      if (lastWorkflowCfg != null) {
+        deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
+      }
+    }
+  }
+
+  /**
    * Delete a job from a scheduled (non-recurrent) queue.
    * @param queue
    * @param job this must be a namespaced job name
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
new file mode 100644
index 0000000..dfe6d4e
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java
@@ -0,0 +1,83 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDeleteJobFromJobQueue extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testForceDeleteJobFromJobQueue() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+
+    // Create two jobs: job1 will complete fast, and job2 will be stuck in 
progress (taking a long
+    // time to finish). The idea is to force-delete a stuck job (job2).
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10"));
+    JobConfig.Builder jobBuilder2 = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, 
"100000")).setTimeout(100000);
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    jobQueue.enqueueJob("job2", jobBuilder2);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, 
TaskUtil.getNamespacedJobName(jobQueueName, "job2"),
+        TaskState.IN_PROGRESS);
+
+    try {
+      _driver.deleteJob(jobQueueName, "job2");
+      Assert.fail("Regular, non-force deleteJob should fail since the workflow 
is in progress!");
+    } catch (IllegalStateException e) {
+      // Expect IllegalStateException because job2 is still in progress
+    }
+
+    // Check that the job ZNodes have not been deleted by regular deleteJob 
call
+    
Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job2")));
+    Assert
+        
.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job2")));
+    
Assert.assertNotNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2")));
+
+    // The following force delete for the job should go through without 
getting an exception
+    _driver.deleteJob(jobQueueName, "job2", true);
+
+    // Check that the job has been force-deleted (fully gone from ZK)
+    
Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job2")));
+    
Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job2")));
+    
Assert.assertNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2")));
+  }
+}
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
index 5548bd3..a63025b 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -133,7 +133,7 @@ public class JobAccessor extends AbstractHelixResource {
   public Response deleteJob(@PathParam("clusterId") String clusterId,
       @PathParam("workflowName") String workflowName, @PathParam("jobName") 
String jobName,
       @QueryParam("force") @DefaultValue("false") String forceDelete) {
-    boolean force = Boolean.valueOf(forceDelete);
+    boolean force = Boolean.parseBoolean(forceDelete);
     TaskDriver driver = getTaskDriver(clusterId);
 
     try {

Reply via email to