Repository: incubator-gobblin Updated Branches: refs/heads/master 45fc9dfca -> 870ca08de
[GOBBLIN-534] switched from helix job queue to work flow Closes #2397 from arjun4084346/helixChanges Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/870ca08d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/870ca08d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/870ca08d Branch: refs/heads/master Commit: 870ca08dee69424344bacb78d53ef2934b22dbf7 Parents: 45fc9df Author: Arjun <[email protected]> Authored: Mon Jul 16 13:21:52 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jul 16 13:21:52 2018 -0700 ---------------------------------------------------------------------- ...blinHelixDistributeJobExecutionLauncher.java | 4 +- .../cluster/GobblinHelixJobLauncher.java | 18 ++++----- .../org/apache/gobblin/cluster/HelixUtils.java | 41 +++++++++++++------- 3 files changed, 37 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 7ef24fc..e8681b3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -199,7 +199,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher */ private void submitJobToHelix(String jobName, String jobId, JobConfig.Builder jobConfigBuilder) throws Exception { TaskDriver taskDriver = new TaskDriver(this.helixManager); - HelixUtils.submitJobToQueue(jobConfigBuilder, + HelixUtils.submitJobToWorkFlow(jobConfigBuilder, jobName, jobId, taskDriver, @@ -243,7 +243,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher GobblinHelixDistributeJobExecutionLauncher.this.helixManager, planningName, planningId, - timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); + timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty()); return getResultFromUserContent(); } catch (TimeoutException te) { helixTaskDriver.waitToStop(planningName, 10L); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 8794a34..60a5405 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -106,7 +106,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final HelixManager helixManager; private final TaskDriver helixTaskDriver; - private final String helixQueueName; + private final String helixWorkFlowName; private final String jobResourceName; private JobListener jobListener; @@ -141,8 +141,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + Path.SEPARATOR + this.jobContext.getJobId()); - this.helixQueueName = this.jobContext.getJobName(); - this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixQueueName, this.jobContext.getJobId()); + this.helixWorkFlowName = this.jobContext.getJobName(); + this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixWorkFlowName, this.jobContext.getJobId()); this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER); @@ -214,8 +214,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { protected void executeCancellation() { if (this.jobSubmitted) { try { - log.info("[DELETE] workflow {}", this.helixQueueName); - this.helixTaskDriver.delete(this.helixQueueName); + log.info("[DELETE] workflow {}", this.helixWorkFlowName); + this.helixTaskDriver.delete(this.helixWorkFlowName); } catch (IllegalArgumentException e) { LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); } @@ -282,7 +282,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { * Submit a job to run. */ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception { - HelixUtils.submitJobToQueue(jobConfigBuilder, this.helixQueueName, this.jobContext.getJobId(), + HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(), this.helixTaskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds); } @@ -368,17 +368,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { try { HelixUtils.waitJobCompletion( this.helixManager, - this.helixQueueName, + this.helixWorkFlowName, this.jobContext.getJobId(), timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); } catch (TimeoutException te) { - helixTaskDriver.waitToStop(helixQueueName, 10L); + helixTaskDriver.waitToStop(helixWorkFlowName, 10L); try { cancelJob(this.jobListener); } catch (JobException e) { throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e); } - this.helixTaskDriver.resume(this.helixQueueName); + this.helixTaskDriver.resume(this.helixWorkFlowName); LOGGER.info("stopped the queue, deleted the job"); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 8b9d5af..e124e81 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -24,10 +24,10 @@ import org.apache.helix.HelixManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.task.JobConfig; -import org.apache.helix.task.JobQueue; import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; @@ -89,6 +89,8 @@ public class HelixUtils { return namePrefix + "_" + instanceId; } + // We have switched from Helix JobQueue to WorkFlow based job execution. + @Deprecated public static void submitJobToQueue( JobConfig.Builder jobConfigBuilder, String queueName, @@ -96,33 +98,42 @@ public class HelixUtils { TaskDriver helixTaskDriver, HelixManager helixManager, long jobQueueDeleteTimeoutSeconds) throws Exception { + submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver, helixManager, jobQueueDeleteTimeoutSeconds); + } + + public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder, + String workFlowName, + String jobName, + TaskDriver helixTaskDriver, + HelixManager helixManager, + long jobQueueDeleteTimeoutSeconds) throws Exception { - WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, queueName); + WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, workFlowName); - log.info("[DELETE] workflow {} in the beginning", queueName); + log.info("[DELETE] workflow {} in the beginning", workFlowName); // If the queue is present, but in delete state then wait for cleanup before recreating the queue if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { - new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds); + // We want synchronous delete otherwise state can be deleted after we create it below due to race condition. + new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, jobQueueDeleteTimeoutSeconds); // if we get here then the workflow was successfully deleted workflowConfig = null; } - // Create one queue for each job with the job name being the queue name + // Create a work flow for each job with the name being the queue name if (workflowConfig == null) { - JobQueue jobQueue = new JobQueue.Builder(queueName).build(); - helixTaskDriver.createQueue(jobQueue); - log.info("Created job queue {}", queueName); + // Create a workflow and add the job + Workflow workflow = new Workflow.Builder(workFlowName).addJob(jobName, jobConfigBuilder).build(); + // start the workflow + helixTaskDriver.start(workflow); + log.info("Created a work flow {}", workFlowName); } else { - log.info("Job queue {} already exists", queueName); + log.info("Work flow {} already exists", workFlowName); } - - // Put the job into the queue - helixTaskDriver.enqueueJob(queueName, jobName, jobConfigBuilder); } public static void waitJobCompletion( HelixManager helixManager, - String queueName, + String workFlowName, String jobName, Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException { @@ -133,9 +144,9 @@ public class HelixUtils { } while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) { - WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName); + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); if (workflowContext != null) { - org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(queueName, jobName)); + org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)); if (helixJobState == org.apache.helix.task.TaskState.COMPLETED || helixJobState == org.apache.helix.task.TaskState.FAILED || helixJobState == org.apache.helix.task.TaskState.STOPPED) {
