Repository: incubator-gobblin Updated Branches: refs/heads/master 56be9b230 -> 98fdc504f
[GOBBLIN-539] set expiry for helix work flow Closes #2402 from arjun4084346/workFlowNameChange Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/98fdc504 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/98fdc504 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/98fdc504 Branch: refs/heads/master Commit: 98fdc504fd1dd5e14f3f953e0dd928a98d6708df Parents: 56be9b2 Author: Arjun <[email protected]> Authored: Wed Jul 18 22:15:23 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Jul 18 22:15:23 2018 -0700 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 4 +-- ...blinHelixDistributeJobExecutionLauncher.java | 10 +++---- .../cluster/GobblinHelixJobLauncher.java | 22 ++++++++++----- .../org/apache/gobblin/cluster/HelixUtils.java | 28 +++++--------------- 4 files changed, 29 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index b0badc4..b6c11e3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -114,8 +114,8 @@ public class GobblinClusterConfigurationKeys { public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds"; public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60; - public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds"; - public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300; + public static final String HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflow.expirySeconds"; + public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60; public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index e8681b3..f887f0b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -99,7 +99,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps."; - private final long jobQueueDeleteTimeoutSeconds; + private final long workFlowExpiryTimeSeconds; private boolean jobSubmitted; @@ -122,9 +122,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME, builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME); - this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(combined, - GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, - GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); + this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined, + GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); } @Override @@ -204,7 +204,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher jobId, taskDriver, this.helixManager, - this.jobQueueDeleteTimeoutSeconds); + this.workFlowExpiryTimeSeconds); this.jobSubmitted = true; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 60a5405..cd37342 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -125,7 +125,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final ConcurrentHashMap<String, Boolean> runningMap; private final StateStores stateStores; private final Config jobConfig; - private final long jobQueueDeleteTimeoutSeconds; + private final long workFlowExpiryTimeSeconds; public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir, List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap) @@ -141,7 +141,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + Path.SEPARATOR + this.jobContext.getJobId()); - this.helixWorkFlowName = this.jobContext.getJobName(); + this.helixWorkFlowName = this.jobContext.getJobId(); this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixWorkFlowName, this.jobContext.getJobId()); this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER); @@ -151,9 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfig = ConfigUtils.propertiesToConfig(jobProps); - this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, - GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, - GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); + this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig, + GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps) .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( @@ -215,7 +215,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { if (this.jobSubmitted) { try { log.info("[DELETE] workflow {}", this.helixWorkFlowName); - this.helixTaskDriver.delete(this.helixWorkFlowName); + if (this.cancellationRequested) { + // TODO : fix this when HELIX-1180 is completed + // work flow should never be deleted explicitly because it has a expiry time + // If cancellation is requested, we should set the job state to CANCELLED/ABORT + this.helixTaskDriver.delete(this.helixWorkFlowName); + } } catch (IllegalArgumentException e) { LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); } @@ -275,6 +280,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfigBuilder.setRebalanceRunningTask(true); } + jobConfigBuilder.setExpiry(this.jobContext.getJobState().getPropAsLong( + GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS)); + return jobConfigBuilder; } @@ -283,7 +291,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { */ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception { HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(), - this.helixTaskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds); + this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds); } public void launchJob(@Nullable JobListener jobListener) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/98fdc504/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index e124e81..1a11ee3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -18,6 +18,7 @@ package org.apache.gobblin.cluster; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.helix.HelixManager; @@ -106,29 +107,14 @@ public class HelixUtils { String jobName, TaskDriver helixTaskDriver, HelixManager helixManager, - long jobQueueDeleteTimeoutSeconds) throws Exception { - - WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, workFlowName); - - log.info("[DELETE] workflow {} in the beginning", workFlowName); - // If the queue is present, but in delete state then wait for cleanup before recreating the queue - if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { - // We want synchronous delete otherwise state can be deleted after we create it below due to race condition. - new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, jobQueueDeleteTimeoutSeconds); - // if we get here then the workflow was successfully deleted - workflowConfig = null; - } + long workFlowExpiryTime) throws Exception { + WorkflowConfig workFlowConfig = new WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, TimeUnit.SECONDS).build(); // Create a work flow for each job with the name being the queue name - if (workflowConfig == null) { - // Create a workflow and add the job - Workflow workflow = new Workflow.Builder(workFlowName).addJob(jobName, jobConfigBuilder).build(); - // start the workflow - helixTaskDriver.start(workflow); - log.info("Created a work flow {}", workFlowName); - } else { - log.info("Work flow {} already exists", workFlowName); - } + Workflow workFlow = new Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName, jobConfigBuilder).build(); + // start the workflow + helixTaskDriver.start(workFlow); + log.info("Created a work flow {}", workFlowName); } public static void waitJobCompletion(
