Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 10318fe9b -> 2c5e25d98


[GOBBLIN-510] Decouple JobExecutionLauncher and JobExecutionDriver

Closes #2380 from yukuai518/jobExec


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2c5e25d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2c5e25d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2c5e25d9

Branch: refs/heads/master
Commit: 2c5e25d98fc0e6f388759cab56f52d2b375ab097
Parents: 10318fe
Author: Kuai Yu <[email protected]>
Authored: Thu Jun 7 11:22:24 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Thu Jun 7 11:22:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/runtime/JobState.java    |  3 +-
 .../gobblin/runtime/api/ExecutionResult.java    | 26 ++++++
 .../gobblin/runtime/api/JobExecutionDriver.java |  2 -
 .../runtime/api/JobExecutionLauncher.java       | 20 ++++-
 .../runtime/api/JobExecutionMonitor.java        | 29 +++++++
 .../gobblin/runtime/api/JobExecutionResult.java |  2 +-
 .../gobblin/runtime/api/JobExecutionStatus.java |  7 +-
 .../gobblin/runtime/api/MonitoredObject.java    | 24 ++++++
 .../DefaultGobblinInstanceDriverImpl.java       | 13 ++-
 .../job_exec/JobLauncherExecutionDriver.java    | 89 +++++++++++++++++---
 .../gobblin/runtime/JobBrokerInjectionTest.java |  9 --
 .../TestStandardGobblinInstanceLauncher.java    |  9 +-
 .../TestJobLauncherExecutionDriver.java         | 16 +++-
 13 files changed, 210 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 47356fc..aa09d2c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.rest.Metric;
 import org.apache.gobblin.rest.MetricArray;
 import org.apache.gobblin.rest.MetricTypeEnum;
 import org.apache.gobblin.rest.TaskExecutionInfoArray;
+import org.apache.gobblin.runtime.api.MonitoredObject;
 import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.runtime.util.MetricGroup;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -85,7 +86,7 @@ public class JobState extends SourceState {
    *    <li> SUCCESSFUL => CANCELLED  (cancelled before committing)
    * </ul>
    */
-  public enum RunningState {
+  public enum RunningState implements MonitoredObject {
     /** Pending creation of {@link WorkUnit}s. */
     PENDING,
     /** Starting the execution of {@link WorkUnit}s. */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
new file mode 100644
index 0000000..3f77493
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An object which describes the result after job completion. This can be 
retrieved by {@link JobExecutionFuture#get()}
+ *
+ * @see JobExecutionResult as a derived class.
+ */
+public interface ExecutionResult {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java
index 09962be..32db440 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gobblin.runtime.api;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index 7bc9cc0..e414cf3 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gobblin.runtime.api;
 
+import java.util.concurrent.Future;
+
 import com.codahale.metrics.Gauge;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -30,10 +32,24 @@ import lombok.Getter;
  */
 @Alpha
 public interface JobExecutionLauncher extends Instrumentable {
-  JobExecutionDriver launchJob(JobSpec jobSpec);
+  /**
+   * This method is to launch the job specified by {@param jobSpec}
+   * The simplest way is to run a {@link JobExecutionDriver} and upon 
completion return a
+   * {@link 
org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver.JobExecutionMonitorAndDriver}.
+   *
+   * If {@link JobExecutionDriver} does not run within the same process/node 
of {@link JobExecutionLauncher}, a simple monitoring
+   * future object ({@link JobExecutionMonitor}) can be returned. This object 
can do two things:
+   *
+   * 1) Wait for computation of final {@link ExecutionResult} by invoking 
{@link Future#get()}.
+   * 2) Monitor current job running status by invoking {@link 
JobExecutionMonitor#getRunningState()}.
+   *
+   * @see JobExecutionMonitor
+   */
+  JobExecutionMonitor launchJob(JobSpec jobSpec);
 
   /**
-   * Common metrics for all launcher implementations. */
+   * Common metrics for all launcher implementations.
+   */
   StandardMetrics getMetrics();
 
   public static class StandardMetrics {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java
new file mode 100644
index 0000000..604f746
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.concurrent.Future;
+
+/**
+ * A simple monitoring and future object. This object can be used as a normal 
{@link Future} object to get {@link ExecutionResult}.
+ *
+ * It can also be used to get current job running status, which is described 
by {@link MonitoredObject}.
+ */
+public interface JobExecutionMonitor extends Future<ExecutionResult> {
+  MonitoredObject getRunningState();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java
index 66493a6..c209ffc 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java
@@ -29,7 +29,7 @@ import lombok.Getter;
  */
 @AllArgsConstructor(access=AccessLevel.PROTECTED)
 @Getter
-public class JobExecutionResult {
+public class JobExecutionResult implements ExecutionResult {
   private final RunningState finalState;
   private final Throwable errorCause;
   // TODO add TaskExecutionResults

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java
index 9b96e7f..b2c6d93 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java
@@ -16,13 +16,12 @@
  */
 package org.apache.gobblin.runtime.api;
 
-import org.apache.gobblin.runtime.JobState;
-
 public interface JobExecutionStatus {
   public static final String UKNOWN_STAGE = "unkown";
-
   JobExecution getJobExecution();
-  JobState.RunningState getRunningState();
+
+  MonitoredObject getRunningState();
+
   /** Arbitrary execution stage, e.g. setup, workUnitGeneration, 
taskExecution, publishing */
   String getStage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java
new file mode 100644
index 0000000..e9125e3
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An object which is currently monitored. This object can be retrieved by 
{@link JobExecutionMonitor}
+ */
+public interface MonitoredObject {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java
index 65f30d8..c6ef0a5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java
@@ -36,17 +36,20 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobState.RunningState;
 import org.apache.gobblin.runtime.api.Configurable;
+import org.apache.gobblin.runtime.api.ExecutionResult;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
 import org.apache.gobblin.runtime.api.GobblinInstanceLauncher.ConfigAccessor;
 import org.apache.gobblin.runtime.api.JobCatalog;
 import org.apache.gobblin.runtime.api.JobExecutionDriver;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobExecutionState;
 import org.apache.gobblin.runtime.api.JobLifecycleListener;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
 import org.apache.gobblin.runtime.api.JobSpecScheduler;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
 import org.apache.gobblin.runtime.std.DefaultJobExecutionStateListenerImpl;
@@ -206,9 +209,13 @@ public class DefaultGobblinInstanceDriverImpl extends 
AbstractIdleService
     @Override
     public void run() {
       try {
-         JobExecutionDriver driver = _jobLauncher.launchJob(new 
ResolvedJobSpec(_jobSpec, _instanceDriver));
-         _callbacksDispatcher.onJobLaunch(driver);
-         driver.registerStateListener(new JobStateTracker());
+        JobExecutionMonitor monitor = _jobLauncher.launchJob(new 
ResolvedJobSpec(_jobSpec, _instanceDriver));
+        if (!(monitor instanceof 
JobLauncherExecutionDriver.JobExecutionMonitorAndDriver)) {
+          throw new 
UnsupportedOperationException(JobLauncherExecutionDriver.JobExecutionMonitorAndDriver.class.getName()
 + " is expected.");
+        }
+        JobExecutionDriver driver = 
((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) monitor).getDriver();
+        _callbacksDispatcher.onJobLaunch(driver);
+        driver.registerStateListener(new JobStateTracker());
         ExecutorsUtils.newThreadFactory(Optional.of(_log), 
Optional.of("gobblin-instance-driver")).newThread(driver).start();
       }
       catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
index cd01cca..8e16ca6 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -53,20 +54,22 @@ import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobLauncherFactory;
 import org.apache.gobblin.runtime.JobLauncherFactory.JobLauncherType;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.JobState.RunningState;
 import org.apache.gobblin.runtime.api.Configurable;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.api.JobExecution;
 import org.apache.gobblin.runtime.api.JobExecutionDriver;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.runtime.api.JobExecutionState;
 import org.apache.gobblin.runtime.api.JobExecutionStateListener;
 import org.apache.gobblin.runtime.api.JobExecutionStatus;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.MonitoredObject;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.runtime.listeners.AbstractJobListener;
 import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
@@ -95,16 +98,18 @@ public class JobLauncherExecutionDriver extends 
FutureTask<JobExecutionResult> i
 
   /**
    * Creates a new JobExecutionDriver which acts as an adapter to the legacy 
{@link JobLauncher} API.
-   * @param sysConfig             the system/environment config
-   * @param jobSpec               the JobSpec to be executed
-   * @param jobLauncherType       an optional jobLauncher type; the value 
follows the convention of
-   *        {@link JobLauncherFactory#newJobLauncher(java.util.Properties, 
java.util.Properties, String).
+   * @param sysConfig               the system/environment config
+   * @param jobSpec                 the JobSpec to be executed
+   * @param jobLauncherType         an optional jobLauncher type; the value 
follows the convention of
+   *        {@link JobLauncherFactory#newJobLauncher(Properties, Properties)}.
    *        If absent, {@link 
JobLauncherFactory#newJobLauncher(java.util.Properties, java.util.Properties)}
    *        will be used which looks for the {@link 
ConfigurationKeys#JOB_LAUNCHER_TYPE_KEY}
    *        in the system configuration.
-   * @param jobExecStateListener  an optional listener to listen for state 
changes in the execution.
-   * @param log                   an optional logger to be used; if none is 
specified, a default one
-   *                              will be instantiated.
+   * @param log                     an optional logger to be used; if none is 
specified, a default one
+   *                                will be instantiated.
+   * @param instrumentationEnabled  a flag to control if metrics should be 
enabled.
+   * @param launcherMetrics         an object to contain metrics related to 
jobLauncher.
+   * @param instanceBroker          a broker to create difference resources 
from the same instance scope.
    */
   public static JobLauncherExecutionDriver create(Configurable sysConfig, 
JobSpec jobSpec,
       Optional<JobLauncherFactory.JobLauncherType> jobLauncherType,
@@ -441,7 +446,8 @@ public class JobLauncherExecutionDriver extends 
FutureTask<JobExecutionResult> i
       return res;
     }
 
-    @Override public JobExecutionDriver launchJob(JobSpec jobSpec) {
+    @Override
+    public JobExecutionMonitor launchJob(JobSpec jobSpec) {
       Preconditions.checkNotNull(jobSpec);
       if (!(jobSpec instanceof ResolvedJobSpec)) {
         try {
@@ -450,8 +456,10 @@ public class JobLauncherExecutionDriver extends 
FutureTask<JobExecutionResult> i
           throw new RuntimeException("Can't launch job " + jobSpec.getUri(), 
exc);
         }
       }
-      return JobLauncherExecutionDriver.create(getSysConfig(), jobSpec, 
_jobLauncherType,
+
+      JobLauncherExecutionDriver driver = 
JobLauncherExecutionDriver.create(getSysConfig(), jobSpec, _jobLauncherType,
           Optional.of(getLog(jobSpec)), isInstrumentationEnabled(), 
getMetrics(), getInstanceBroker());
+      return new JobExecutionMonitorAndDriver(driver);
     }
 
     @Override public List<Tag<?>> 
generateTags(org.apache.gobblin.configuration.State state) {
@@ -519,20 +527,75 @@ public class JobLauncherExecutionDriver extends 
FutureTask<JobExecutionResult> i
 
   }
 
+  /**
+   * Old {@link JobExecutionLauncher#launchJob(JobSpec)} returns a {@link 
JobExecutionDriver} but new API returns a {@link JobExecutionMonitor}.
+   * For backward compatibility we wraps {@link JobExecutionDriver} inside of 
a new {@link JobExecutionMonitorAndDriver}.
+   */
+  @AllArgsConstructor
+  public static class JobExecutionMonitorAndDriver implements 
JobExecutionMonitor {
+    @Getter
+    JobLauncherExecutionDriver driver;
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return this.driver.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return this.driver.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+      return this.driver.isDone();
+    }
+
+    @Override
+    public JobExecutionResult get()
+        throws InterruptedException, ExecutionException {
+      return this.driver.get();
+    }
+
+    @Override
+    public JobExecutionResult get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return this.driver.get(timeout, unit);
+    }
+
+    @Override
+    public MonitoredObject getRunningState() {
+      return this.driver._jobState.getRunningState();
+    }
+  }
+
   @Override public void registerWeakStateListener(JobExecutionStateListener 
listener) {
     _callbackDispatcher.registerWeakStateListener(listener);
   }
 
   @Override public boolean isDone() {
-    RunningState runState = getJobExecutionStatus().getRunningState();
+    RunningState runState = fetchRunningState();
+
     return runState == null ? false : runState.isDone() ;
   }
 
+  private RunningState fetchRunningState() {
+    MonitoredObject monitoredObject = 
getJobExecutionStatus().getRunningState();
+    if (monitoredObject == null) {
+      return null;
+    }
+    if (!(monitoredObject instanceof RunningState)) {
+      throw new UnsupportedOperationException("Cannot process monitored object 
other than " + JobState.RunningState.class.getName());
+    }
+
+    return (RunningState) monitoredObject;
+  }
+
   @Override public boolean cancel(boolean mayInterruptIfRunning) {
     // FIXME there is a race condition here as the job may complete 
successfully before we
     // call cancelJob() below. There isn't an easy way to fix that right now.
+    RunningState runState = fetchRunningState();
 
-    RunningState runState = getJobExecutionStatus().getRunningState();
     if (runState.isCancelled()) {
       return true;
     }
@@ -549,7 +612,7 @@ public class JobLauncherExecutionDriver extends 
FutureTask<JobExecutionResult> i
   }
 
   @Override public boolean isCancelled() {
-    return getJobExecutionStatus().getRunningState().isCancelled();
+    return fetchRunningState().isCancelled();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java
index b724391..9b42e06 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java
@@ -155,15 +155,6 @@ public class JobBrokerInjectionTest {
     Assert.assertEquals(seenTaskObjectIds.size(), 10);
   }
 
-  private void launchJob(StandardGobblinInstanceLauncher instanceLauncher, 
JobSpec js1,
-      GobblinInstanceDriver instance) throws TimeoutException, 
InterruptedException, ExecutionException {
-    JobExecutionDriver jobDriver = instance.getJobLauncher().launchJob(js1);
-    new Thread(jobDriver).run();
-    JobExecutionResult jobResult = jobDriver.get(5, TimeUnit.SECONDS);
-
-    Assert.assertTrue(jobResult.isSuccessful());
-  }
-
   public static class JobBrokerConverter extends Converter<String, String, 
String, MyRecord> {
 
     private MySharedObject instanceSharedObject;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java
index 9b97fe9..5b71de7 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java
@@ -33,10 +33,12 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
 import org.apache.gobblin.runtime.api.JobExecutionDriver;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.runtime.api.JobLifecycleListener;
 import org.apache.gobblin.runtime.api.JobSpec;
 import 
org.apache.gobblin.runtime.instance.DefaultGobblinInstanceDriverImpl.JobSpecRunnable;
+import org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.runtime.std.DefaultJobLifecycleListenerImpl;
 import org.apache.gobblin.runtime.std.FilteredJobLifecycleListener;
@@ -100,7 +102,12 @@ public class TestStandardGobblinInstanceLauncher {
 
   private void checkLaunchJob(StandardGobblinInstanceLauncher 
instanceLauncher, JobSpec js1,
       GobblinInstanceDriver instance) throws TimeoutException, 
InterruptedException, ExecutionException {
-    JobExecutionDriver jobDriver = instance.getJobLauncher().launchJob(js1);
+    JobExecutionDriver jobDriver = null;
+    JobExecutionMonitor monitor = instance.getJobLauncher().launchJob(js1);
+    if (monitor instanceof 
JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) {
+      jobDriver = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) 
monitor).getDriver();
+    }
+
     new Thread(jobDriver).run();
     JobExecutionResult jobResult = jobDriver.get(5, TimeUnit.SECONDS);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java
index 4aa5084..b521de6 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobLauncherFactory;
 import org.apache.gobblin.runtime.api.JobExecution;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.local.LocalJobLauncher;
 import org.apache.gobblin.runtime.mapreduce.MRJobLauncher;
@@ -70,7 +71,12 @@ public class TestJobLauncherExecutionDriver {
               .withJobLauncherType(JobLauncherFactory.JobLauncherType.LOCAL)
               .withLog(log);
 
-      JobLauncherExecutionDriver jled = 
(JobLauncherExecutionDriver)launcher.launchJob(jobSpec1);
+      JobLauncherExecutionDriver jled = null;
+      JobExecutionMonitor monitor = launcher.launchJob(jobSpec1);
+      if (monitor instanceof 
JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) {
+        jled = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) 
monitor).getDriver();
+      }
+
       Assert.assertTrue(jled.getLegacyLauncher() instanceof LocalJobLauncher);
       JobExecution jex1 = jled.getJobExecution();
       Assert.assertEquals(jex1.getJobSpecURI(), jobSpec1.getUri());
@@ -95,10 +101,14 @@ public class TestJobLauncherExecutionDriver {
           .withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, 
ConfigValueFactory.fromAnyRef(false));
 
       JobSpec jobSpec2 = JobSpec.builder().withConfig(jobConf2).build();
-
-      jled = (JobLauncherExecutionDriver)launcher
+      jled = null;
+      monitor = launcher
           .withJobLauncherType(JobLauncherFactory.JobLauncherType.MAPREDUCE)
           .launchJob(jobSpec2);
+      if (monitor instanceof 
JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) {
+        jled = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) 
monitor).getDriver();
+      }
+
       Assert.assertTrue(jled.getLegacyLauncher() instanceof MRJobLauncher);
       JobExecution jex2 = jled.getJobExecution();
 

Reply via email to