Repository: incubator-gobblin Updated Branches: refs/heads/master d0784cad9 -> 1d0ec852c
[GOBBLIN-369] Clean up the helix job queue after the job execution is⦠Closes #2244 from htran1/helix_queue_cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1d0ec852 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1d0ec852 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1d0ec852 Branch: refs/heads/master Commit: 1d0ec852c84037ac7c8f24f6694f4757dae21a00 Parents: d0784ca Author: Hung Tran <[email protected]> Authored: Thu Jan 11 17:58:09 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Jan 11 17:58:17 2018 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 3 ++ .../cluster/GobblinHelixJobLauncher.java | 8 +++++- .../gobblin/cluster/GobblinHelixTaskDriver.java | 29 ++++++++++++++++++++ .../cluster/GobblinHelixJobLauncherTest.java | 9 ++++++ 4 files changed, 48 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 8fd9bfd..5e25194 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -82,4 +82,7 @@ public class GobblinClusterConfigurationKeys { public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds"; public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60; + + public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds"; + public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 73e5330..79f3b9e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -126,6 +126,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final ConcurrentHashMap<String, Boolean> runningMap; private final StateStores stateStores; private final Config jobConfig; + private final long jobQueueDeleteTimeoutSeconds; public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir, List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap) @@ -150,6 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfig = ConfigUtils.propertiesToConfig(jobProps); + this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); + Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps) .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(), @@ -240,7 +244,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager); taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId()); LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId()); - } catch (IllegalArgumentException e) { + + taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds); + } catch (InterruptedException | IllegalArgumentException e) { LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java index bb5c551..a39c5ca 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java @@ -26,6 +26,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; @@ -268,4 +269,32 @@ public class GobblinHelixTaskDriver { LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName); } } + + /** + * Delete the workflow + * + * @param workflow The workflow name + * @param timeout The timeout for deleting the workflow/queue in seconds + */ + public void deleteWorkflow(String workflow, long timeout) throws InterruptedException { + _taskDriver.delete(workflow); + + long endTime = System.currentTimeMillis() + (timeout * 1000); + + // check for completion of deletion request + while (System.currentTimeMillis() <= endTime) { + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow); + + if (workflowContext != null) { + Thread.sleep(1000); + } else { + // Successfully deleted + return; + } + } + + // Failed to complete deletion within timeout + throw new HelixException(String + .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout)); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 243c652..cc327fc 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -40,6 +40,8 @@ import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -303,6 +305,13 @@ public class GobblinHelixJobLauncherTest { // job context should have been deleted Assert.assertNull(jobContext); + // job queue should have been deleted + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(jobName); + Assert.assertNull(workflowConfig); + + WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName); + Assert.assertNull(workflowContext); + // check that workunit and taskstate directory for the job are cleaned up final File workunitsDir = new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME
