Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 45fc9dfca -> 870ca08de


[GOBBLIN-534] switched from helix job queue to work flow

Closes #2397 from arjun4084346/helixChanges


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/870ca08d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/870ca08d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/870ca08d

Branch: refs/heads/master
Commit: 870ca08dee69424344bacb78d53ef2934b22dbf7
Parents: 45fc9df
Author: Arjun <[email protected]>
Authored: Mon Jul 16 13:21:52 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Jul 16 13:21:52 2018 -0700

----------------------------------------------------------------------
 ...blinHelixDistributeJobExecutionLauncher.java |  4 +-
 .../cluster/GobblinHelixJobLauncher.java        | 18 ++++-----
 .../org/apache/gobblin/cluster/HelixUtils.java  | 41 +++++++++++++-------
 3 files changed, 37 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 7ef24fc..e8681b3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -199,7 +199,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
    */
   private void submitJobToHelix(String jobName, String jobId, 
JobConfig.Builder jobConfigBuilder) throws Exception {
     TaskDriver taskDriver = new TaskDriver(this.helixManager);
-    HelixUtils.submitJobToQueue(jobConfigBuilder,
+    HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
         jobName,
         jobId,
         taskDriver,
@@ -243,7 +243,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
           GobblinHelixDistributeJobExecutionLauncher.this.helixManager,
           planningName,
           planningId,
-          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
+          timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
       return getResultFromUserContent();
     } catch (TimeoutException te) {
       helixTaskDriver.waitToStop(planningName, 10L);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 8794a34..60a5405 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -106,7 +106,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
   private final HelixManager helixManager;
   private final TaskDriver helixTaskDriver;
-  private final String helixQueueName;
+  private final String helixWorkFlowName;
   private final String jobResourceName;
   private JobListener jobListener;
 
@@ -141,8 +141,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     this.outputTaskStateDir = new Path(this.appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
         + Path.SEPARATOR + this.jobContext.getJobId());
 
-    this.helixQueueName = this.jobContext.getJobName();
-    this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixQueueName, 
this.jobContext.getJobId());
+    this.helixWorkFlowName = this.jobContext.getJobName();
+    this.jobResourceName = 
TaskUtil.getNamespacedJobName(this.helixWorkFlowName, 
this.jobContext.getJobId());
 
     this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
 
@@ -214,8 +214,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   protected void executeCancellation() {
     if (this.jobSubmitted) {
       try {
-        log.info("[DELETE] workflow {}", this.helixQueueName);
-        this.helixTaskDriver.delete(this.helixQueueName);
+        log.info("[DELETE] workflow {}", this.helixWorkFlowName);
+        this.helixTaskDriver.delete(this.helixWorkFlowName);
       } catch (IllegalArgumentException e) {
         LOGGER.warn("Failed to cancel job {} in Helix", 
this.jobContext.getJobId(), e);
       }
@@ -282,7 +282,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws 
Exception {
-    HelixUtils.submitJobToQueue(jobConfigBuilder, this.helixQueueName, 
this.jobContext.getJobId(),
+    HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, 
this.jobContext.getJobId(),
         this.helixTaskDriver, this.helixManager, 
this.jobQueueDeleteTimeoutSeconds);
   }
 
@@ -368,17 +368,17 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     try {
       HelixUtils.waitJobCompletion(
           this.helixManager,
-          this.helixQueueName,
+          this.helixWorkFlowName,
           this.jobContext.getJobId(),
           timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
     } catch (TimeoutException te) {
-      helixTaskDriver.waitToStop(helixQueueName, 10L);
+      helixTaskDriver.waitToStop(helixWorkFlowName, 10L);
       try {
         cancelJob(this.jobListener);
       } catch (JobException e) {
         throw new RuntimeException("Unable to cancel job " + 
jobContext.getJobName() + ": ", e);
       }
-      this.helixTaskDriver.resume(this.helixQueueName);
+      this.helixTaskDriver.resume(this.helixWorkFlowName);
       LOGGER.info("stopped the queue, deleted the job");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/870ca08d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 8b9d5af..e124e81 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -24,10 +24,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
@@ -89,6 +89,8 @@ public class HelixUtils {
     return namePrefix + "_" + instanceId;
   }
 
+  // We have switched from Helix JobQueue to WorkFlow based job execution.
+  @Deprecated
   public static void submitJobToQueue(
       JobConfig.Builder jobConfigBuilder,
       String queueName,
@@ -96,33 +98,42 @@ public class HelixUtils {
       TaskDriver helixTaskDriver,
       HelixManager helixManager,
       long jobQueueDeleteTimeoutSeconds) throws Exception {
+    submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver, 
helixManager, jobQueueDeleteTimeoutSeconds);
+  }
+
+  public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
+      String workFlowName,
+      String jobName,
+      TaskDriver helixTaskDriver,
+      HelixManager helixManager,
+      long jobQueueDeleteTimeoutSeconds) throws Exception {
 
-    WorkflowConfig workflowConfig = 
helixTaskDriver.getWorkflowConfig(helixManager, queueName);
+    WorkflowConfig workflowConfig = 
helixTaskDriver.getWorkflowConfig(helixManager, workFlowName);
 
-    log.info("[DELETE] workflow {} in the beginning", queueName);
+    log.info("[DELETE] workflow {} in the beginning", workFlowName);
     // If the queue is present, but in delete state then wait for cleanup 
before recreating the queue
     if (workflowConfig != null && workflowConfig.getTargetState() == 
TargetState.DELETE) {
-      new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, 
jobQueueDeleteTimeoutSeconds);
+      // We want synchronous delete otherwise state can be deleted after we 
create it below due to race condition.
+      new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
jobQueueDeleteTimeoutSeconds);
       // if we get here then the workflow was successfully deleted
       workflowConfig = null;
     }
 
-    // Create one queue for each job with the job name being the queue name
+    // Create a work flow for each job with the name being the queue name
     if (workflowConfig == null) {
-      JobQueue jobQueue = new JobQueue.Builder(queueName).build();
-      helixTaskDriver.createQueue(jobQueue);
-      log.info("Created job queue {}", queueName);
+      // Create a workflow and add the job
+      Workflow workflow = new Workflow.Builder(workFlowName).addJob(jobName, 
jobConfigBuilder).build();
+      // start the workflow
+      helixTaskDriver.start(workflow);
+      log.info("Created a work flow {}", workFlowName);
     } else {
-      log.info("Job queue {} already exists", queueName);
+      log.info("Work flow {} already exists", workFlowName);
     }
-
-    // Put the job into the queue
-    helixTaskDriver.enqueueJob(queueName, jobName, jobConfigBuilder);
   }
 
   public static void waitJobCompletion(
       HelixManager helixManager,
-      String queueName,
+      String workFlowName,
       String jobName,
       Optional<Long> timeoutInSeconds) throws InterruptedException, 
TimeoutException {
 
@@ -133,9 +144,9 @@ public class HelixUtils {
     }
 
     while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= 
endTime) {
-      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, queueName);
+      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
       if (workflowContext != null) {
-        org.apache.helix.task.TaskState helixJobState = 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(queueName, jobName));
+        org.apache.helix.task.TaskState helixJobState = 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName));
         if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
             helixJobState == org.apache.helix.task.TaskState.FAILED ||
             helixJobState == org.apache.helix.task.TaskState.STOPPED) {

Reply via email to