Repository: helix Updated Branches: refs/heads/master b00d65961 -> b8355b9a6
Add forcefully workflow and job delete API into HelixAdmin and Helix Rest. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b8355b9a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b8355b9a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b8355b9a Branch: refs/heads/master Commit: b8355b9a6b1071c149305f147bdb524d8a67f2d2 Parents: b00d659 Author: Lei Xia <[email protected]> Authored: Wed Apr 11 10:28:22 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Thu Apr 19 14:18:16 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 91 ++++++++++++---- .../java/org/apache/helix/task/TaskUtil.java | 27 +++-- .../integration/task/TestDeleteWorkflow.java | 104 ++++++++++++++++++- .../server/resources/helix/JobAccessor.java | 8 +- .../resources/helix/WorkflowAccessor.java | 7 +- 5 files changed, 197 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 174cae1..4a7707a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -228,12 +228,37 @@ public class TaskDriver { * @param job job name */ public void deleteJob(final String queue, final String job) { - WorkflowConfig workflowCfg = - TaskUtil.getWorkflowConfig(_accessor, queue); + deleteJob(queue, job, false); + } + + /** + * Delete a job from an existing named queue, + * the queue has to be stopped prior to this call + * + * @param queue queue name + * @param job job name + * @param forceDelete CAUTION: if set true, all job's related zk nodes will + * be clean up from zookeeper even if its workflow information can not be found. + */ + public void deleteJob(final String queue, final String job, boolean forceDelete) { + WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue); if (workflowCfg == null) { - throw new IllegalArgumentException("Queue " + queue + " does not yet exist!"); + if (forceDelete) { + // remove all job znodes if its original workflow config was already gone. + LOG.info("Forcefully removing job: " + job + " from queue: " + queue); + boolean success = TaskUtil + .removeJob(_accessor, _propertyStore, TaskUtil.getNamespacedJobName(queue, job)); + if (!success) { + LOG.info("Failed to delete job: " + job + " from queue: " + queue); + throw new HelixException("Failed to delete job: " + job + " from queue: " + queue); + } + } else { + throw new IllegalArgumentException("Queue " + queue + " does not yet exist!"); + } + return; } + if (workflowCfg.isTerminable()) { throw new IllegalArgumentException(queue + " is not a queue!"); } @@ -283,13 +308,14 @@ public class TaskDriver { } String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); - Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName)); + Set<String> jobs = new HashSet<>(Arrays.asList(namespacedJobName)); if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) { LOG.error("Failed to delete job " + job + " from queue " + queue); throw new HelixException("Failed to delete job " + job + " from queue " + queue); } } + /** * Adds a new job to the end an existing named queue. * @@ -549,29 +575,56 @@ public class TaskDriver { * @param workflow */ public void delete(String workflow) { - // After set DELETE state, rebalancer may remove the workflow instantly. - // So record context before set DELETE state. - WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); + delete(workflow, false); + } - setWorkflowTargetState(workflow, TargetState.DELETE); + /** + * Public method to delete a workflow/queue. + * + * @param workflow + * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will + * be clean up immediately from zookeeper, no matter whether there are jobs + * are running or not. + */ + public void delete(String workflow, boolean forceDelete) { + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); + if (forceDelete) { + // if forceDelete, remove the workflow and its jobs immediately from zookeeper. + LOG.info("Forcefully removing workflow: " + workflow); + removeWorkflowFromZK(workflow); + } else { + // Set the workflow target state to DELETE, and let Helix controller to remove the workflow. + // Controller may remove the workflow instantly, so record context before set DELETE state. + setWorkflowTargetState(workflow, TargetState.DELETE); + } - // Delete all finished scheduled workflows. + // Delete all previously scheduled workflows. if (wCtx != null && wCtx.getScheduledWorkflows() != null) { for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) { - WorkflowContext scheduledWorkflowCtx = TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow); - if (scheduledWorkflowCtx != null && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { - Set<String> jobSet = new HashSet<String>(); - // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow - WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, scheduledWorkflow); - if (wCfg != null) { - jobSet.addAll(wCfg.getJobDag().getAllNodes()); - } - TaskUtil.removeWorkflow(_accessor, _propertyStore, scheduledWorkflow, jobSet); + WorkflowContext scheduledWorkflowCtx = + TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow); + if (scheduledWorkflowCtx != null + && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { + removeWorkflowFromZK(scheduledWorkflow); } } } } + private void removeWorkflowFromZK(String workflow) { + Set<String> jobSet = new HashSet<>(); + // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow + WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow); + if (wCfg != null) { + jobSet.addAll(wCfg.getJobDag().getAllNodes()); + } + boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, workflow, jobSet); + if (!success) { + LOG.info("Failed to delete the workflow " + workflow); + throw new HelixException("Failed to delete the workflow " + workflow); + } + } + /** * Public synchronized method to wait for a delete operation to fully complete with timeout. * When this method returns, it means that a queue (workflow) has been completely deleted, meaning @@ -893,4 +946,4 @@ public class TaskDriver { "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS."); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 47a5cfd..c6d7a55 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -537,7 +537,7 @@ public class TaskUtil { String workflowJobResource) { boolean success = true; PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource); - if (accessor.getProperty(isKey) != null) { + if (accessor.getPropertyStat(isKey) != null) { if (!accessor.removeProperty(isKey)) { LOG.warn(String.format( "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.", @@ -548,7 +548,7 @@ public class TaskUtil { // Delete external view PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource); - if (accessor.getProperty(evKey) != null) { + if (accessor.getPropertyStat(evKey) != null) { if (!accessor.removeProperty(evKey)) { LOG.warn(String.format( "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.", @@ -583,17 +583,17 @@ public class TaskUtil { } } + if (!removeWorkflowConfig(accessor, workflow)) { + LOG.warn( + String.format("Error occurred while trying to remove workflow config for %s.", workflow)); + success = false; + } if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) { LOG.warn(String .format("Error occurred while trying to remove workflow idealstate/externalview for %s.", workflow)); success = false; } - if (!removeWorkflowConfig(accessor, workflow)) { - LOG.warn( - String.format("Error occurred while trying to remove workflow config for %s.", workflow)); - success = false; - } if (!removeWorkflowContext(propertyStore, workflow)) { LOG.warn(String .format("Error occurred while trying to remove workflow context for %s.", workflow)); @@ -679,16 +679,15 @@ public class TaskUtil { protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore, String job) { boolean success = true; - if (!cleanupJobIdealStateExtView(accessor, job)) { - LOG.warn(String - .format("Error occurred while trying to remove job idealstate/externalview for %s.", - job)); - success = false; - } if (!removeJobConfig(accessor, job)) { LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job)); success = false; } + if (!cleanupJobIdealStateExtView(accessor, job)) { + LOG.warn(String + .format("Error occurred while trying to remove job idealstate/externalview for %s.", job)); + success = false; + } if (!removeJobContext(propertyStore, job)) { LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job)); success = false; @@ -789,7 +788,7 @@ public class TaskUtil { private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor, String workflowJobResource) { PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource); - if (accessor.getProperty(cfgKey) != null) { + if (accessor.getPropertyStat(cfgKey) != null) { if (!accessor.removeProperty(cfgKey)) { LOG.warn(String.format( "Error occurred while trying to remove config for %s. Failed to remove node %s.", http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java index 91b7f32..a151827 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java @@ -1,8 +1,12 @@ package org.apache.helix.integration.task; import com.google.common.collect.ImmutableMap; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; @@ -14,7 +18,7 @@ import org.testng.annotations.Test; public class TestDeleteWorkflow extends TaskTestBase { - private static final int DELETE_DELAY = 3000; + private static final int DELETE_DELAY = 1000; private HelixAdmin admin; @@ -31,7 +35,7 @@ public class TestDeleteWorkflow extends TaskTestBase { JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) .setMaxAttemptsPerTask(1) .setWorkflow(jobQueueName) - .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000")); + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "100000")); JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); jobQueue.enqueueJob("job1", jobBuilder); @@ -64,4 +68,98 @@ public class TestDeleteWorkflow extends TaskTestBase { Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); } -} \ No newline at end of file + + @Test + public void testDeleteWorkflowForcefully() throws InterruptedException { + String jobQueueName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) + .setMaxAttemptsPerTask(1) + .setWorkflow(jobQueueName) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000")); + + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); + jobQueue.enqueueJob("job1", jobBuilder); + _driver.start(jobQueue.build()); + _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"), + TaskState.IN_PROGRESS); + + // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job queue + Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); + Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + + // Delete the idealstate of workflow + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuild = accessor.keyBuilder(); + accessor.removeProperty(keyBuild.idealStates(jobQueueName)); + Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + + // Attempt the deletion and and it should time out since idealstate does not exist anymore. + try { + _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY); + Assert.fail("Delete must time out and throw a HelixException with the Controller paused, but did not!"); + } catch (HelixException e) { + // Pass + } + + // delete forcefully + _driver.delete(jobQueueName, true); + + Assert.assertNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); + Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + } + + @Test + public void testDeleteHangingJobs() throws InterruptedException { + String jobQueueName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) + .setMaxAttemptsPerTask(1) + .setWorkflow(jobQueueName) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000")); + + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); + jobQueue.enqueueJob("job1", jobBuilder); + _driver.start(jobQueue.build()); + _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"), + TaskState.IN_PROGRESS); + + // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job queue + Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); + Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + + // Delete the idealstate, workflowconfig and context of workflow + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuild = accessor.keyBuilder(); + accessor.removeProperty(keyBuild.idealStates(jobQueueName)); + accessor.removeProperty(keyBuild.resourceConfig(jobQueueName)); + accessor.removeProperty(keyBuild.workflowContext(jobQueueName)); + + Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + Assert.assertNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); + + // attemp to delete the job and it should fail with exception. + try { + _driver.deleteJob(jobQueueName, "job1"); + Assert.fail("Delete must be rejected and throw a HelixException, but did not!"); + } catch (IllegalArgumentException e) { + // Pass + } + + // delete forcefully + _driver.deleteJob(jobQueueName, "job1", true); + + Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + Assert.assertNull(admin + .getResourceIdealState(CLUSTER_NAME, TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java index 2d27f51..9a085f1 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java @@ -26,10 +26,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import org.apache.helix.HelixException; @@ -128,11 +130,13 @@ public class JobAccessor extends AbstractHelixResource { @DELETE @Path("{jobName}") public Response deleteJob(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName, + @QueryParam("force") @DefaultValue("false") String forceDelete) { + boolean force = Boolean.valueOf(forceDelete); TaskDriver driver = getTaskDriver(clusterId); try { - driver.deleteJob(workflowName, jobName); + driver.deleteJob(workflowName, jobName, force); } catch (Exception e) { return badRequest(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java index 5efb26c..679b3cc 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -173,10 +174,12 @@ public class WorkflowAccessor extends AbstractHelixResource { @DELETE @Path("{workflowId}") public Response deleteWorkflow(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId) { + @PathParam("workflowId") String workflowId, + @QueryParam("force") @DefaultValue("false") String forceDelete) { + boolean force = Boolean.valueOf(forceDelete); TaskDriver driver = getTaskDriver(clusterId); try { - driver.delete(workflowId); + driver.delete(workflowId, force); } catch (HelixException e) { return badRequest(String .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage()));
