Repository: incubator-gobblin Updated Branches: refs/heads/master 381857173 -> f71f59d07
complete task runner run method Closes #2210 from HappyRay/implement-run-single- task Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f71f59d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f71f59d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f71f59d0 Branch: refs/heads/master Commit: f71f59d0731ffac44c6316c9b50df264c21cc292 Parents: 3818571 Author: Ray Yang <[email protected]> Authored: Fri Dec 15 10:00:34 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Dec 15 10:00:44 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/SingleTaskRunner.java | 133 +++++++++++++++++-- .../gobblin/cluster/SingleTaskRunnerMain.java | 15 ++- .../cluster/SingleTaskRunnerMainTest.java | 3 +- 3 files changed, 135 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java index 6541c1d..20226af 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java @@ -17,23 +17,140 @@ package org.apache.gobblin.cluster; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ServiceManager; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.runtime.TaskExecutor; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.HadoopUtils; + +import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; class SingleTaskRunner { - private final Path clusterConfigFilePath; + private static final Logger logger = LoggerFactory.getLogger(SingleTaskRunner.class); + private final String jobId; - private final Path workUnitFilePath; + private final String workUnitFilePath; + private final Config clusterConfig; + private final Path appWorkPath; + private SingleHelixTask task; + private TaskExecutor taskExecutor; + private GobblinHelixTaskStateTracker taskStateTracker; + private ServiceManager serviceManager; SingleTaskRunner(final String clusterConfigFilePath, final String jobId, final String workUnitFilePath) { - this.clusterConfigFilePath = Paths.get(clusterConfigFilePath); this.jobId = jobId; - this.workUnitFilePath = Paths.get(workUnitFilePath); + this.workUnitFilePath = workUnitFilePath; + this.clusterConfig = ConfigFactory.parseFile(new File(clusterConfigFilePath)); + final String workDir = this.clusterConfig.getString(CLUSTER_WORK_DIR); + this.appWorkPath = new Path(workDir); + } + + void run() + throws IOException, InterruptedException { + logger.info("SingleTaskRunner running."); + startServices(); + runTask(); + shutdownServices(); + } + + private void startServices() { + logger.info("SingleTaskRunner start services."); + getServices(); + this.serviceManager.startAsync(); + try { + this.serviceManager.awaitHealthy(10, TimeUnit.SECONDS); + } catch (final TimeoutException e) { + throw new GobblinClusterException("Timeout waiting for services to start.", e); + } + } + + private void shutdownServices() { + logger.info("SingleTaskRunner shutting down services."); + this.serviceManager.stopAsync(); + try { + this.serviceManager.awaitStopped(1, TimeUnit.MINUTES); + } catch (final TimeoutException e) { + logger.error("Timeout waiting for services to shutdown.", e); + } + } + + private void runTask() + throws IOException, InterruptedException { + logger.info("SingleTaskRunner running task."); + getSingleHelixTask(); + this.task.run(); } - void run() { - // tbd + private void getSingleHelixTask() + throws IOException { + final Path jobStateFilePath = getJobStateFilePath(); + final FileSystem fs = getFileSystem(); + final StateStores stateStores = new StateStores(this.clusterConfig, this.appWorkPath, + GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, this.appWorkPath, + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); + + final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores); + + this.task = + new SingleHelixTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs, + taskAttemptBuilder, stateStores); + } + + private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores stateStores) { + final TaskAttemptBuilder taskAttemptBuilder = + new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor); + // No container id is set. Use the default. + taskAttemptBuilder.setTaskStateStore(stateStores.taskStateStore); + return taskAttemptBuilder; + } + + private void getServices() { + final Properties properties = ConfigUtils.configToProperties(this.clusterConfig); + this.taskExecutor = new TaskExecutor(properties); + this.taskStateTracker = new GobblinHelixTaskStateTracker(properties); + + final List<Service> services = Lists.newArrayList(this.taskExecutor, this.taskStateTracker); + this.serviceManager = new ServiceManager(services); + } + + private Path getJobStateFilePath() { + final String jobStateFileName = this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME; + final Path jobStateFilePath = new Path(this.appWorkPath, jobStateFileName); + logger.info("job state file path: " + jobStateFilePath); + return jobStateFilePath; + } + + private FileSystem getFileSystem() + throws IOException { + final Configuration conf = HadoopUtils.newConfiguration(); + + final FileSystem fs = this.clusterConfig.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem + .get(URI.create(this.clusterConfig.getString(ConfigurationKeys.FS_URI_KEY)), conf) + : FileSystem.get(conf); + + return fs; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java index 1c9dac0..c010e75 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java @@ -17,16 +17,16 @@ package org.apache.gobblin.cluster; +import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; -import org.apache.gobblin.util.JvmUtils; - class SingleTaskRunnerMain { private static final Logger logger = LoggerFactory.getLogger(SingleTaskRunnerMain.class); @@ -37,9 +37,8 @@ class SingleTaskRunnerMain { this.builder = builder; } - public static void main(final String[] args) - throws Exception { - logger.info(JvmUtils.getJvmInputArguments()); + public static void main(final String[] args) { + logger.info("SingleTaskRunnerMain starting. args: " + Arrays.toString(args)); final SingleTaskRunnerMain runnerMain = new SingleTaskRunnerMain(new SingleTaskRunnerBuilder()); try { runnerMain.run(args); @@ -49,12 +48,14 @@ class SingleTaskRunnerMain { } } - void run(final String[] args) { + void run(final String[] args) + throws IOException, InterruptedException { final OutputStreamWriter streamWriter = new OutputStreamWriter(System.out, Charsets.UTF_8); final PrintWriter writer = new PrintWriter(streamWriter, true); final SingleTaskRunnerMainOptions options = new SingleTaskRunnerMainOptions(args, writer); final SingleTaskRunner runner = - this.builder.setClusterConfigFilePath(options.getClusterConfigFilePath()).setJobId(options.getJobId()) + this.builder.setClusterConfigFilePath(options.getClusterConfigFilePath()) + .setJobId(options.getJobId()) .setWorkUnitFilePath(options.getWorkUnitFilePath()) .createSingleTaskRunner(); runner.run(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java index f10b2a5..ad9fb53 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java @@ -31,7 +31,8 @@ import static org.mockito.Mockito.verify; public class SingleTaskRunnerMainTest { @Test - public void testRun() { + public void testRun() + throws Exception { final SingleTaskRunnerBuilder builder = spy(SingleTaskRunnerBuilder.class); final SingleTaskRunner taskRunner = mock(SingleTaskRunner.class); doReturn(taskRunner).when(builder).createSingleTaskRunner();
