Repository: incubator-gobblin Updated Branches: refs/heads/master 10318fe9b -> 2c5e25d98
[GOBBLIN-510] Decouple JobExecutionLauncher and JobExecutionDriver Closes #2380 from yukuai518/jobExec Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2c5e25d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2c5e25d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2c5e25d9 Branch: refs/heads/master Commit: 2c5e25d98fc0e6f388759cab56f52d2b375ab097 Parents: 10318fe Author: Kuai Yu <[email protected]> Authored: Thu Jun 7 11:22:24 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Jun 7 11:22:24 2018 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/runtime/JobState.java | 3 +- .../gobblin/runtime/api/ExecutionResult.java | 26 ++++++ .../gobblin/runtime/api/JobExecutionDriver.java | 2 - .../runtime/api/JobExecutionLauncher.java | 20 ++++- .../runtime/api/JobExecutionMonitor.java | 29 +++++++ .../gobblin/runtime/api/JobExecutionResult.java | 2 +- .../gobblin/runtime/api/JobExecutionStatus.java | 7 +- .../gobblin/runtime/api/MonitoredObject.java | 24 ++++++ .../DefaultGobblinInstanceDriverImpl.java | 13 ++- .../job_exec/JobLauncherExecutionDriver.java | 89 +++++++++++++++++--- .../gobblin/runtime/JobBrokerInjectionTest.java | 9 -- .../TestStandardGobblinInstanceLauncher.java | 9 +- .../TestJobLauncherExecutionDriver.java | 16 +++- 13 files changed, 210 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java index 47356fc..aa09d2c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java @@ -54,6 +54,7 @@ import org.apache.gobblin.rest.Metric; import org.apache.gobblin.rest.MetricArray; import org.apache.gobblin.rest.MetricTypeEnum; import org.apache.gobblin.rest.TaskExecutionInfoArray; +import org.apache.gobblin.runtime.api.MonitoredObject; import org.apache.gobblin.runtime.util.JobMetrics; import org.apache.gobblin.runtime.util.MetricGroup; import org.apache.gobblin.source.extractor.JobCommitPolicy; @@ -85,7 +86,7 @@ public class JobState extends SourceState { * <li> SUCCESSFUL => CANCELLED (cancelled before committing) * </ul> */ - public enum RunningState { + public enum RunningState implements MonitoredObject { /** Pending creation of {@link WorkUnit}s. */ PENDING, /** Starting the execution of {@link WorkUnit}s. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java new file mode 100644 index 0000000..3f77493 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionFuture#get()} + * + * @see JobExecutionResult as a derived class. + */ +public interface ExecutionResult { +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java index 09962be..32db440 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionDriver.java @@ -16,8 +16,6 @@ */ package org.apache.gobblin.runtime.api; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index 7bc9cc0..e414cf3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -16,6 +16,8 @@ */ package org.apache.gobblin.runtime.api; +import java.util.concurrent.Future; + import com.codahale.metrics.Gauge; import org.apache.gobblin.annotation.Alpha; @@ -30,10 +32,24 @@ import lombok.Getter; */ @Alpha public interface JobExecutionLauncher extends Instrumentable { - JobExecutionDriver launchJob(JobSpec jobSpec); + /** + * This method is to launch the job specified by {@param jobSpec} + * The simplest way is to run a {@link JobExecutionDriver} and upon completion return a + * {@link org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver.JobExecutionMonitorAndDriver}. + * + * If {@link JobExecutionDriver} does not run within the same process/node of {@link JobExecutionLauncher}, a simple monitoring + * future object ({@link JobExecutionMonitor}) can be returned. This object can do two things: + * + * 1) Wait for computation of final {@link ExecutionResult} by invoking {@link Future#get()}. + * 2) Monitor current job running status by invoking {@link JobExecutionMonitor#getRunningState()}. + * + * @see JobExecutionMonitor + */ + JobExecutionMonitor launchJob(JobSpec jobSpec); /** - * Common metrics for all launcher implementations. */ + * Common metrics for all launcher implementations. + */ StandardMetrics getMetrics(); public static class StandardMetrics { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java new file mode 100644 index 0000000..604f746 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionMonitor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.util.concurrent.Future; + +/** + * A simple monitoring and future object. This object can be used as a normal {@link Future} object to get {@link ExecutionResult}. + * + * It can also be used to get current job running status, which is described by {@link MonitoredObject}. + */ +public interface JobExecutionMonitor extends Future<ExecutionResult> { + MonitoredObject getRunningState(); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java index 66493a6..c209ffc 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionResult.java @@ -29,7 +29,7 @@ import lombok.Getter; */ @AllArgsConstructor(access=AccessLevel.PROTECTED) @Getter -public class JobExecutionResult { +public class JobExecutionResult implements ExecutionResult { private final RunningState finalState; private final Throwable errorCause; // TODO add TaskExecutionResults http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java index 9b96e7f..b2c6d93 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionStatus.java @@ -16,13 +16,12 @@ */ package org.apache.gobblin.runtime.api; -import org.apache.gobblin.runtime.JobState; - public interface JobExecutionStatus { public static final String UKNOWN_STAGE = "unkown"; - JobExecution getJobExecution(); - JobState.RunningState getRunningState(); + + MonitoredObject getRunningState(); + /** Arbitrary execution stage, e.g. setup, workUnitGeneration, taskExecution, publishing */ String getStage(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java new file mode 100644 index 0000000..e9125e3 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MonitoredObject.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An object which is currently monitored. This object can be retrieved by {@link JobExecutionMonitor} + */ +public interface MonitoredObject { +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java index 65f30d8..c6ef0a5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/instance/DefaultGobblinInstanceDriverImpl.java @@ -36,17 +36,20 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobState.RunningState; import org.apache.gobblin.runtime.api.Configurable; +import org.apache.gobblin.runtime.api.ExecutionResult; import org.apache.gobblin.runtime.api.GobblinInstanceDriver; import org.apache.gobblin.runtime.api.GobblinInstanceLauncher.ConfigAccessor; import org.apache.gobblin.runtime.api.JobCatalog; import org.apache.gobblin.runtime.api.JobExecutionDriver; import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.api.JobExecutionState; import org.apache.gobblin.runtime.api.JobLifecycleListener; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitorFactory; import org.apache.gobblin.runtime.api.JobSpecScheduler; import org.apache.gobblin.runtime.api.MutableJobCatalog; +import org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl; import org.apache.gobblin.runtime.std.DefaultJobExecutionStateListenerImpl; @@ -206,9 +209,13 @@ public class DefaultGobblinInstanceDriverImpl extends AbstractIdleService @Override public void run() { try { - JobExecutionDriver driver = _jobLauncher.launchJob(new ResolvedJobSpec(_jobSpec, _instanceDriver)); - _callbacksDispatcher.onJobLaunch(driver); - driver.registerStateListener(new JobStateTracker()); + JobExecutionMonitor monitor = _jobLauncher.launchJob(new ResolvedJobSpec(_jobSpec, _instanceDriver)); + if (!(monitor instanceof JobLauncherExecutionDriver.JobExecutionMonitorAndDriver)) { + throw new UnsupportedOperationException(JobLauncherExecutionDriver.JobExecutionMonitorAndDriver.class.getName() + " is expected."); + } + JobExecutionDriver driver = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) monitor).getDriver(); + _callbacksDispatcher.onJobLaunch(driver); + driver.registerStateListener(new JobStateTracker()); ExecutorsUtils.newThreadFactory(Optional.of(_log), Optional.of("gobblin-instance-driver")).newThread(driver).start(); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java index cd01cca..8e16ca6 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -53,20 +54,22 @@ import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobLauncherFactory; import org.apache.gobblin.runtime.JobLauncherFactory.JobLauncherType; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.JobState.RunningState; import org.apache.gobblin.runtime.api.Configurable; import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; import org.apache.gobblin.runtime.api.JobExecution; import org.apache.gobblin.runtime.api.JobExecutionDriver; import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.api.JobExecutionResult; import org.apache.gobblin.runtime.api.JobExecutionState; import org.apache.gobblin.runtime.api.JobExecutionStateListener; import org.apache.gobblin.runtime.api.JobExecutionStatus; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.MonitoredObject; import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.runtime.listeners.AbstractJobListener; import org.apache.gobblin.runtime.std.DefaultConfigurableImpl; @@ -95,16 +98,18 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i /** * Creates a new JobExecutionDriver which acts as an adapter to the legacy {@link JobLauncher} API. - * @param sysConfig the system/environment config - * @param jobSpec the JobSpec to be executed - * @param jobLauncherType an optional jobLauncher type; the value follows the convention of - * {@link JobLauncherFactory#newJobLauncher(java.util.Properties, java.util.Properties, String). + * @param sysConfig the system/environment config + * @param jobSpec the JobSpec to be executed + * @param jobLauncherType an optional jobLauncher type; the value follows the convention of + * {@link JobLauncherFactory#newJobLauncher(Properties, Properties)}. * If absent, {@link JobLauncherFactory#newJobLauncher(java.util.Properties, java.util.Properties)} * will be used which looks for the {@link ConfigurationKeys#JOB_LAUNCHER_TYPE_KEY} * in the system configuration. - * @param jobExecStateListener an optional listener to listen for state changes in the execution. - * @param log an optional logger to be used; if none is specified, a default one - * will be instantiated. + * @param log an optional logger to be used; if none is specified, a default one + * will be instantiated. + * @param instrumentationEnabled a flag to control if metrics should be enabled. + * @param launcherMetrics an object to contain metrics related to jobLauncher. + * @param instanceBroker a broker to create difference resources from the same instance scope. */ public static JobLauncherExecutionDriver create(Configurable sysConfig, JobSpec jobSpec, Optional<JobLauncherFactory.JobLauncherType> jobLauncherType, @@ -441,7 +446,8 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i return res; } - @Override public JobExecutionDriver launchJob(JobSpec jobSpec) { + @Override + public JobExecutionMonitor launchJob(JobSpec jobSpec) { Preconditions.checkNotNull(jobSpec); if (!(jobSpec instanceof ResolvedJobSpec)) { try { @@ -450,8 +456,10 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i throw new RuntimeException("Can't launch job " + jobSpec.getUri(), exc); } } - return JobLauncherExecutionDriver.create(getSysConfig(), jobSpec, _jobLauncherType, + + JobLauncherExecutionDriver driver = JobLauncherExecutionDriver.create(getSysConfig(), jobSpec, _jobLauncherType, Optional.of(getLog(jobSpec)), isInstrumentationEnabled(), getMetrics(), getInstanceBroker()); + return new JobExecutionMonitorAndDriver(driver); } @Override public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { @@ -519,20 +527,75 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i } + /** + * Old {@link JobExecutionLauncher#launchJob(JobSpec)} returns a {@link JobExecutionDriver} but new API returns a {@link JobExecutionMonitor}. + * For backward compatibility we wraps {@link JobExecutionDriver} inside of a new {@link JobExecutionMonitorAndDriver}. + */ + @AllArgsConstructor + public static class JobExecutionMonitorAndDriver implements JobExecutionMonitor { + @Getter + JobLauncherExecutionDriver driver; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return this.driver.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return this.driver.isCancelled(); + } + + @Override + public boolean isDone() { + return this.driver.isDone(); + } + + @Override + public JobExecutionResult get() + throws InterruptedException, ExecutionException { + return this.driver.get(); + } + + @Override + public JobExecutionResult get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return this.driver.get(timeout, unit); + } + + @Override + public MonitoredObject getRunningState() { + return this.driver._jobState.getRunningState(); + } + } + @Override public void registerWeakStateListener(JobExecutionStateListener listener) { _callbackDispatcher.registerWeakStateListener(listener); } @Override public boolean isDone() { - RunningState runState = getJobExecutionStatus().getRunningState(); + RunningState runState = fetchRunningState(); + return runState == null ? false : runState.isDone() ; } + private RunningState fetchRunningState() { + MonitoredObject monitoredObject = getJobExecutionStatus().getRunningState(); + if (monitoredObject == null) { + return null; + } + if (!(monitoredObject instanceof RunningState)) { + throw new UnsupportedOperationException("Cannot process monitored object other than " + JobState.RunningState.class.getName()); + } + + return (RunningState) monitoredObject; + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { // FIXME there is a race condition here as the job may complete successfully before we // call cancelJob() below. There isn't an easy way to fix that right now. + RunningState runState = fetchRunningState(); - RunningState runState = getJobExecutionStatus().getRunningState(); if (runState.isCancelled()) { return true; } @@ -549,7 +612,7 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i } @Override public boolean isCancelled() { - return getJobExecutionStatus().getRunningState().isCancelled(); + return fetchRunningState().isCancelled(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java index b724391..9b42e06 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobBrokerInjectionTest.java @@ -155,15 +155,6 @@ public class JobBrokerInjectionTest { Assert.assertEquals(seenTaskObjectIds.size(), 10); } - private void launchJob(StandardGobblinInstanceLauncher instanceLauncher, JobSpec js1, - GobblinInstanceDriver instance) throws TimeoutException, InterruptedException, ExecutionException { - JobExecutionDriver jobDriver = instance.getJobLauncher().launchJob(js1); - new Thread(jobDriver).run(); - JobExecutionResult jobResult = jobDriver.get(5, TimeUnit.SECONDS); - - Assert.assertTrue(jobResult.isSuccessful()); - } - public static class JobBrokerConverter extends Converter<String, String, String, MyRecord> { private MySharedObject instanceSharedObject; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java index 9b97fe9..5b71de7 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceLauncher.java @@ -33,10 +33,12 @@ import com.typesafe.config.ConfigFactory; import org.apache.gobblin.runtime.api.GobblinInstanceDriver; import org.apache.gobblin.runtime.api.JobExecutionDriver; import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.api.JobExecutionResult; import org.apache.gobblin.runtime.api.JobLifecycleListener; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.instance.DefaultGobblinInstanceDriverImpl.JobSpecRunnable; +import org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.runtime.std.DefaultJobLifecycleListenerImpl; import org.apache.gobblin.runtime.std.FilteredJobLifecycleListener; @@ -100,7 +102,12 @@ public class TestStandardGobblinInstanceLauncher { private void checkLaunchJob(StandardGobblinInstanceLauncher instanceLauncher, JobSpec js1, GobblinInstanceDriver instance) throws TimeoutException, InterruptedException, ExecutionException { - JobExecutionDriver jobDriver = instance.getJobLauncher().launchJob(js1); + JobExecutionDriver jobDriver = null; + JobExecutionMonitor monitor = instance.getJobLauncher().launchJob(js1); + if (monitor instanceof JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) { + jobDriver = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) monitor).getDriver(); + } + new Thread(jobDriver).run(); JobExecutionResult jobResult = jobDriver.get(5, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2c5e25d9/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java index 4aa5084..b521de6 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_exec/TestJobLauncherExecutionDriver.java @@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobLauncherFactory; import org.apache.gobblin.runtime.api.JobExecution; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.local.LocalJobLauncher; import org.apache.gobblin.runtime.mapreduce.MRJobLauncher; @@ -70,7 +71,12 @@ public class TestJobLauncherExecutionDriver { .withJobLauncherType(JobLauncherFactory.JobLauncherType.LOCAL) .withLog(log); - JobLauncherExecutionDriver jled = (JobLauncherExecutionDriver)launcher.launchJob(jobSpec1); + JobLauncherExecutionDriver jled = null; + JobExecutionMonitor monitor = launcher.launchJob(jobSpec1); + if (monitor instanceof JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) { + jled = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) monitor).getDriver(); + } + Assert.assertTrue(jled.getLegacyLauncher() instanceof LocalJobLauncher); JobExecution jex1 = jled.getJobExecution(); Assert.assertEquals(jex1.getJobSpecURI(), jobSpec1.getUri()); @@ -95,10 +101,14 @@ public class TestJobLauncherExecutionDriver { .withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false)); JobSpec jobSpec2 = JobSpec.builder().withConfig(jobConf2).build(); - - jled = (JobLauncherExecutionDriver)launcher + jled = null; + monitor = launcher .withJobLauncherType(JobLauncherFactory.JobLauncherType.MAPREDUCE) .launchJob(jobSpec2); + if (monitor instanceof JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) { + jled = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver) monitor).getDriver(); + } + Assert.assertTrue(jled.getLegacyLauncher() instanceof MRJobLauncher); JobExecution jex2 = jled.getJobExecution();
