Repository: helix
Updated Branches:
  refs/heads/master b00d65961 -> b8355b9a6


Add forcefully workflow and job delete API into HelixAdmin and Helix Rest.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b8355b9a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b8355b9a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b8355b9a

Branch: refs/heads/master
Commit: b8355b9a6b1071c149305f147bdb524d8a67f2d2
Parents: b00d659
Author: Lei Xia <[email protected]>
Authored: Wed Apr 11 10:28:22 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Thu Apr 19 14:18:16 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  91 ++++++++++++----
 .../java/org/apache/helix/task/TaskUtil.java    |  27 +++--
 .../integration/task/TestDeleteWorkflow.java    | 104 ++++++++++++++++++-
 .../server/resources/helix/JobAccessor.java     |   8 +-
 .../resources/helix/WorkflowAccessor.java       |   7 +-
 5 files changed, 197 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 174cae1..4a7707a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -228,12 +228,37 @@ public class TaskDriver {
    * @param job  job name
    */
   public void deleteJob(final String queue, final String job) {
-    WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowConfig(_accessor, queue);
+    deleteJob(queue, job, false);
+  }
+
+  /**
+   * Delete a job from an existing named queue,
+   * the queue has to be stopped prior to this call
+   *
+   * @param queue queue name
+   * @param job  job name
+   * @param forceDelete  CAUTION: if set true, all job's related zk nodes will
+   *                     be clean up from zookeeper even if its workflow 
information can not be found.
+   */
+  public void deleteJob(final String queue, final String job, boolean 
forceDelete) {
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queue + " does not yet 
exist!");
+      if (forceDelete) {
+        // remove all job znodes if its original workflow config was already 
gone.
+        LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
+        boolean success = TaskUtil
+            .removeJob(_accessor, _propertyStore, 
TaskUtil.getNamespacedJobName(queue, job));
+        if (!success) {
+          LOG.info("Failed to delete job: " + job + " from queue: " + queue);
+          throw new HelixException("Failed to delete job: " + job + " from 
queue: " + queue);
+        }
+      } else {
+        throw new IllegalArgumentException("Queue " + queue + " does not yet 
exist!");
+      }
+      return;
     }
+
     if (workflowCfg.isTerminable()) {
       throw new IllegalArgumentException(queue + " is not a queue!");
     }
@@ -283,13 +308,14 @@ public class TaskDriver {
     }
 
     String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-    Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+    Set<String> jobs = new HashSet<>(Arrays.asList(namespacedJobName));
     if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, 
jobs, true)) {
       LOG.error("Failed to delete job " + job + " from queue " + queue);
       throw new HelixException("Failed to delete job " + job + " from queue " 
+ queue);
     }
   }
 
+
   /**
    * Adds a new job to the end an existing named queue.
    *
@@ -549,29 +575,56 @@ public class TaskDriver {
    * @param workflow
    */
   public void delete(String workflow) {
-    // After set DELETE state, rebalancer may remove the workflow instantly.
-    // So record context before set DELETE state.
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, 
workflow);
+    delete(workflow, false);
+  }
 
-    setWorkflowTargetState(workflow, TargetState.DELETE);
+  /**
+   * Public method to delete a workflow/queue.
+   *
+   * @param workflow
+   * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' 
related zk nodes will
+   *                     be clean up immediately from zookeeper, no matter 
whether there are jobs
+   *                     are running or not.
+   */
+  public void delete(String workflow, boolean forceDelete) {
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, 
workflow);
+    if (forceDelete) {
+      // if forceDelete, remove the workflow and its jobs immediately from 
zookeeper.
+      LOG.info("Forcefully removing workflow: " + workflow);
+      removeWorkflowFromZK(workflow);
+    } else {
+      // Set the workflow target state to DELETE, and let Helix controller to 
remove the workflow.
+      // Controller may remove the workflow instantly, so record context 
before set DELETE state.
+      setWorkflowTargetState(workflow, TargetState.DELETE);
+    }
 
-    // Delete all finished scheduled workflows.
+    // Delete all previously scheduled workflows.
     if (wCtx != null && wCtx.getScheduledWorkflows() != null) {
       for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) {
-        WorkflowContext scheduledWorkflowCtx = 
TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
-        if (scheduledWorkflowCtx != null && 
scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-          Set<String> jobSet = new HashSet<String>();
-          // Note that even WorkflowConfig is null, if WorkflowContext exists, 
still need to remove workflow
-          WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, 
scheduledWorkflow);
-          if (wCfg != null) {
-            jobSet.addAll(wCfg.getJobDag().getAllNodes());
-          }
-          TaskUtil.removeWorkflow(_accessor, _propertyStore, 
scheduledWorkflow, jobSet);
+        WorkflowContext scheduledWorkflowCtx =
+            TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
+        if (scheduledWorkflowCtx != null
+            && scheduledWorkflowCtx.getFinishTime() != 
WorkflowContext.UNFINISHED) {
+          removeWorkflowFromZK(scheduledWorkflow);
         }
       }
     }
   }
 
+  private void removeWorkflowFromZK(String workflow) {
+    Set<String> jobSet = new HashSet<>();
+    // Note that even WorkflowConfig is null, if WorkflowContext exists, still 
need to remove workflow
+    WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
+    if (wCfg != null) {
+      jobSet.addAll(wCfg.getJobDag().getAllNodes());
+    }
+    boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, 
workflow, jobSet);
+    if (!success) {
+      LOG.info("Failed to delete the workflow " + workflow);
+      throw new HelixException("Failed to delete the workflow " + workflow);
+    }
+  }
+
   /**
    * Public synchronized method to wait for a delete operation to fully 
complete with timeout.
    * When this method returns, it means that a queue (workflow) has been 
completely deleted, meaning
@@ -893,4 +946,4 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too 
many items created in the path CONFIGS.");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 47a5cfd..c6d7a55 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -537,7 +537,7 @@ public class TaskUtil {
       String workflowJobResource) {
     boolean success = true;
     PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
-    if (accessor.getProperty(isKey) != null) {
+    if (accessor.getPropertyStat(isKey) != null) {
       if (!accessor.removeProperty(isKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove IdealState for %s. Failed 
to remove node %s.",
@@ -548,7 +548,7 @@ public class TaskUtil {
 
     // Delete external view
     PropertyKey evKey = 
accessor.keyBuilder().externalView(workflowJobResource);
-    if (accessor.getProperty(evKey) != null) {
+    if (accessor.getPropertyStat(evKey) != null) {
       if (!accessor.removeProperty(evKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove ExternalView of resource 
%s. Failed to remove node %s.",
@@ -583,17 +583,17 @@ public class TaskUtil {
       }
     }
 
+    if (!removeWorkflowConfig(accessor, workflow)) {
+      LOG.warn(
+          String.format("Error occurred while trying to remove workflow config 
for %s.", workflow));
+      success = false;
+    }
     if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
       LOG.warn(String
           .format("Error occurred while trying to remove workflow 
idealstate/externalview for %s.",
               workflow));
       success = false;
     }
-    if (!removeWorkflowConfig(accessor, workflow)) {
-      LOG.warn(
-          String.format("Error occurred while trying to remove workflow config 
for %s.", workflow));
-      success = false;
-    }
     if (!removeWorkflowContext(propertyStore, workflow)) {
       LOG.warn(String
           .format("Error occurred while trying to remove workflow context for 
%s.", workflow));
@@ -679,16 +679,15 @@ public class TaskUtil {
   protected static boolean removeJob(HelixDataAccessor accessor, 
HelixPropertyStore propertyStore,
       String job) {
     boolean success = true;
-    if (!cleanupJobIdealStateExtView(accessor, job)) {
-      LOG.warn(String
-          .format("Error occurred while trying to remove job 
idealstate/externalview for %s.",
-              job));
-      success = false;
-    }
     if (!removeJobConfig(accessor, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job config 
for %s.", job));
       success = false;
     }
+    if (!cleanupJobIdealStateExtView(accessor, job)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove job 
idealstate/externalview for %s.", job));
+      success = false;
+    }
     if (!removeJobContext(propertyStore, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job 
context for %s.", job));
       success = false;
@@ -789,7 +788,7 @@ public class TaskUtil {
   private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
       String workflowJobResource) {
     PropertyKey cfgKey = 
accessor.keyBuilder().resourceConfig(workflowJobResource);
-    if (accessor.getProperty(cfgKey) != null) {
+    if (accessor.getPropertyStat(cfgKey) != null) {
       if (!accessor.removeProperty(cfgKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove config for %s. Failed to 
remove node %s.",

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
index 91b7f32..a151827 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -1,8 +1,12 @@
 package org.apache.helix.integration.task;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
@@ -14,7 +18,7 @@ import org.testng.annotations.Test;
 
 
 public class TestDeleteWorkflow extends TaskTestBase  {
-  private static final int DELETE_DELAY = 3000;
+  private static final int DELETE_DELAY = 1000;
 
   private HelixAdmin admin;
 
@@ -31,7 +35,7 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
         .setMaxAttemptsPerTask(1)
         .setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, 
"1000000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, 
"100000"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1", jobBuilder);
@@ -64,4 +68,98 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
     Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testDeleteWorkflowForcefully() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, 
"1000000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, 
TaskUtil.getNamespacedJobName(jobQueueName, "job1"),
+        TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed 
created for this job queue
+    Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+    
Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    
Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, 
jobQueueName));
+
+    // Delete the idealstate of workflow
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuild = accessor.keyBuilder();
+    accessor.removeProperty(keyBuild.idealStates(jobQueueName));
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+    // Attempt the deletion and and it should time out since idealstate does 
not exist anymore.
+    try {
+      _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+      Assert.fail("Delete must time out and throw a HelixException with the 
Controller paused, but did not!");
+    } catch (HelixException e) {
+      // Pass
+    }
+
+    // delete forcefully
+    _driver.delete(jobQueueName, true);
+
+    Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+    
Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    
Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+  }
+
+  @Test
+  public void testDeleteHangingJobs() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, 
"1000000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, 
TaskUtil.getNamespacedJobName(jobQueueName, "job1"),
+        TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed 
created for this job queue
+    Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+    
Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    
Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, 
jobQueueName));
+
+    // Delete the idealstate, workflowconfig and context of workflow
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuild = accessor.keyBuilder();
+    accessor.removeProperty(keyBuild.idealStates(jobQueueName));
+    accessor.removeProperty(keyBuild.resourceConfig(jobQueueName));
+    accessor.removeProperty(keyBuild.workflowContext(jobQueueName));
+
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+    Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+
+    // attemp to delete the job and it should fail with exception.
+    try {
+      _driver.deleteJob(jobQueueName, "job1");
+      Assert.fail("Delete must be rejected and throw a HelixException, but did 
not!");
+    } catch (IllegalArgumentException e) {
+      // Pass
+    }
+
+    // delete forcefully
+    _driver.deleteJob(jobQueueName, "job1", true);
+
+    
Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    
Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
 "job1")));
+    Assert.assertNull(admin
+        .getResourceIdealState(CLUSTER_NAME, 
TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
index 2d27f51..9a085f1 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -26,10 +26,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
 
 import org.apache.helix.HelixException;
@@ -128,11 +130,13 @@ public class JobAccessor extends AbstractHelixResource {
   @DELETE
   @Path("{jobName}")
   public Response deleteJob(@PathParam("clusterId") String clusterId,
-      @PathParam("workflowName") String workflowName, @PathParam("jobName") 
String jobName) {
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") 
String jobName,
+      @QueryParam("force") @DefaultValue("false") String forceDelete) {
+    boolean force = Boolean.valueOf(forceDelete);
     TaskDriver driver = getTaskDriver(clusterId);
 
     try {
-      driver.deleteJob(workflowName, jobName);
+      driver.deleteJob(workflowName, jobName, force);
     } catch (Exception e) {
       return badRequest(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 5efb26c..679b3cc 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -173,10 +174,12 @@ public class WorkflowAccessor extends 
AbstractHelixResource {
   @DELETE
   @Path("{workflowId}")
   public Response deleteWorkflow(@PathParam("clusterId") String clusterId,
-      @PathParam("workflowId") String workflowId) {
+      @PathParam("workflowId") String workflowId,
+      @QueryParam("force") @DefaultValue("false") String forceDelete) {
+    boolean force = Boolean.valueOf(forceDelete);
     TaskDriver driver = getTaskDriver(clusterId);
     try {
-      driver.delete(workflowId);
+      driver.delete(workflowId, force);
     } catch (HelixException e) {
       return badRequest(String
           .format("Failed to delete workflow %s for reason : %s", workflowId, 
e.getMessage()));

Reply via email to