This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e38ce3  [GOBBLIN-703] Allow planning job to be running in 
non-blocking mode
3e38ce3 is described below

commit 3e38ce3b963b746f1ee55e5a6261f454f8eefaa2
Author: Kuai Yu <[email protected]>
AuthorDate: Mon Mar 25 17:13:05 2019 -0700

    [GOBBLIN-703] Allow planning job to be running in non-blocking mode
    
    Closes #2573 from yukuai518/nonblock
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  2 ++
 ...GobblinHelixDistributeJobExecutionLauncher.java | 31 +++++++++++++++-------
 .../cluster/HelixRetriggeringJobCallable.java      | 25 +++++++++++++----
 .../org/apache/gobblin/cluster/HelixUtils.java     |  1 -
 .../cluster/TaskRunnerSuiteForJobFactoryTest.java  |  1 -
 5 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index faeaf17..a760c65 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -151,6 +151,8 @@ public class GobblinClusterConfigurationKeys {
   public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + 
"bootup.clean.dist.jobs";
   public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
 
+  public static final String NON_BLOCKING_PLANNING_JOB_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "nonBlocking.planningJob.enabled";
+  public static final boolean DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED = 
false;
   public static final String KILL_DUPLICATE_PLANNING_JOB = 
GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob";
   public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true;
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index b14b700..36239a2 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -105,6 +105,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   private volatile boolean cancellationRequested = false;
   // A flag indicating whether a cancellation has been executed or not
   private volatile boolean cancellationExecuted = false;
+  // A flag indicating wheter a planning job should wait for its completion
+  private boolean nonBlockingMode = false;
+
   @Getter
   private DistributeJobMonitor jobMonitor;
 
@@ -126,6 +129,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
     this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
+    this.nonBlockingMode = ConfigUtils.getBoolean(combined,
+        GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+        
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
     this.helixMetrics = builder.helixMetrics;
     this.jobsMapping = builder.jobsMapping;
     this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
@@ -274,12 +280,18 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
         submitJobToHelix(planningId, planningId, builder);
         
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixSubmit(submitStartTime);
         long waitStartTime = System.currentTimeMillis();
-        DistributeJobResult rst = waitForJobCompletion(planningId, planningId);
-        
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime);
+
+        // we should not wait if in non-blocking mode.
+        DistributeJobResult rst = new DistributeJobResult();
+        if (!GobblinHelixDistributeJobExecutionLauncher.this.nonBlockingMode) {
+          rst = waitForJobCompletion(planningId, planningId);
+          
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime);
+        }
+
         return rst;
       } catch (Exception e) {
         log.error(planningId + " is not able to submit.");
-        return new DistributeJobResult(false);
+        return new DistributeJobResult();
       }
     }
   }
@@ -302,19 +314,20 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     } catch (TimeoutException te) {
       HelixUtils.handleJobTimeout(workFlowName, jobName,
           planningJobHelixManager, this, null);
-      return new DistributeJobResult(false);
+      return new DistributeJobResult();
     }
   }
 
-  //TODO: we will remove this logic after we change to polling model.
   protected DistributeJobResult getResultFromUserContent() {
-    return new DistributeJobResult(false);
+    return new DistributeJobResult();
   }
 
-  @Getter
-  @AllArgsConstructor
+  /**
+   * If {@link 
GobblinClusterConfigurationKeys#NON_BLOCKING_PLANNING_JOB_ENABLED} is enabled
+   * this result object contains nothing; otherwise this result object can be 
used to contain
+   * any values returned from other task-driver instances.
+   */
   static class DistributeJobResult implements ExecutionResult {
-    boolean isEarlyStopped = false;
   }
 
   private class DistributeJobMonitor extends FutureTask<ExecutionResult> 
implements JobExecutionMonitor {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 5223c71..4a6571f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixManager;
 
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.Striped;
+import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,6 +42,7 @@ import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -234,7 +236,7 @@ class HelixRetriggeringJobCallable implements Callable {
 
       // Check if any existing planning job is running
       Optional<String> planningJobIdFromStore = 
jobsMapping.getPlanningJobId(jobName);
-
+      boolean nonblocking = false;
       // start of critical section to check if a job with same job name is 
running
       Lock jobLock = locks.get(jobName);
       jobLock.lock();
@@ -290,6 +292,14 @@ class HelixRetriggeringJobCallable implements Callable {
         builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
         builder.setHelixMetrics(this.helixMetrics);
 
+        // if the distributed job launcher should wait for planning job 
completion
+
+        Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+            .withFallback(ConfigUtils.propertiesToConfig(sysProps));
+
+        nonblocking = ConfigUtils.getBoolean(combined, 
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+            
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
+
         log.info("Planning job {} started.", newPlanningId);
         GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
         closer.register(launcher);
@@ -297,7 +307,7 @@ class HelixRetriggeringJobCallable implements Callable {
         startTime = System.currentTimeMillis();
         this.currentJobMonitor = launcher.launchJob(null);
 
-        // make sure the planning job will be visible to other parallel 
running threads,
+        // make sure the planning job is initialized (or visible) to other 
parallel running threads,
         // so that the same critical section check (querying Helix for job 
completeness)
         // can be applied.
         HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, 
newPlanningId, 300_000);
@@ -310,11 +320,16 @@ class HelixRetriggeringJobCallable implements Callable {
       // we can remove the job spec from the catalog because Helix will drive 
this job to the end.
       this.deleteJobSpec();
 
+      // If we are using non-blocking mode, this get() only guarantees the 
plannning job is submitted.
+      // It doesn't guarantee the job will finish because internally we won't 
wait for Helix completion.
       this.currentJobMonitor.get();
       this.currentJobMonitor = null;
-      log.info("Planning job {} finished.", newPlanningId);
-      
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
-
+      if (nonblocking) {
+        log.info("Planning job {} submitted successfully.", newPlanningId);
+      } else {
+        log.info("Planning job {} finished.", newPlanningId);
+        
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
+      }
     } catch (Exception e) {
       if (startTime != 0) {
         
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 69ae823..8b587ef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -47,7 +47,6 @@ import static org.apache.helix.task.TaskState.STOPPED;
  *
  * @author Yinan Li
  */
-@Alpha
 @Slf4j
 public class HelixUtils {
 
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 0d973af..d16a62f 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -93,7 +93,6 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
 
     protected DistributeJobResult getResultFromUserContent() {
       DistributeJobResult rst = super.getResultFromUserContent();
-      Assert.assertTrue(!rst.isEarlyStopped());
       String jobName = 
this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
       try {
         
Assert.assertFalse(this.jobsMapping.getPlanningJobId(jobName).isPresent());

Reply via email to