Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 381857173 -> f71f59d07


complete task runner run method

Closes #2210 from HappyRay/implement-run-single-
task


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f71f59d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f71f59d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f71f59d0

Branch: refs/heads/master
Commit: f71f59d0731ffac44c6316c9b50df264c21cc292
Parents: 3818571
Author: Ray Yang <[email protected]>
Authored: Fri Dec 15 10:00:34 2017 -0800
Committer: Hung Tran <[email protected]>
Committed: Fri Dec 15 10:00:44 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/SingleTaskRunner.java       | 133 +++++++++++++++++--
 .../gobblin/cluster/SingleTaskRunnerMain.java   |  15 ++-
 .../cluster/SingleTaskRunnerMainTest.java       |   3 +-
 3 files changed, 135 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 6541c1d..20226af 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -17,23 +17,140 @@
 
 package org.apache.gobblin.cluster;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.HadoopUtils;
+
+import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
 
 class SingleTaskRunner {
-  private final Path clusterConfigFilePath;
+  private static final Logger logger = 
LoggerFactory.getLogger(SingleTaskRunner.class);
+
   private final String jobId;
-  private final Path workUnitFilePath;
+  private final String workUnitFilePath;
+  private final Config clusterConfig;
+  private final Path appWorkPath;
+  private SingleHelixTask task;
+  private TaskExecutor taskExecutor;
+  private GobblinHelixTaskStateTracker taskStateTracker;
+  private ServiceManager serviceManager;
 
   SingleTaskRunner(final String clusterConfigFilePath, final String jobId,
       final String workUnitFilePath) {
-    this.clusterConfigFilePath = Paths.get(clusterConfigFilePath);
     this.jobId = jobId;
-    this.workUnitFilePath = Paths.get(workUnitFilePath);
+    this.workUnitFilePath = workUnitFilePath;
+    this.clusterConfig = ConfigFactory.parseFile(new 
File(clusterConfigFilePath));
+    final String workDir = this.clusterConfig.getString(CLUSTER_WORK_DIR);
+    this.appWorkPath = new Path(workDir);
+  }
+
+  void run()
+      throws IOException, InterruptedException {
+    logger.info("SingleTaskRunner running.");
+    startServices();
+    runTask();
+    shutdownServices();
+  }
+
+  private void startServices() {
+    logger.info("SingleTaskRunner start services.");
+    getServices();
+    this.serviceManager.startAsync();
+    try {
+      this.serviceManager.awaitHealthy(10, TimeUnit.SECONDS);
+    } catch (final TimeoutException e) {
+      throw new GobblinClusterException("Timeout waiting for services to 
start.", e);
+    }
+  }
+
+  private void shutdownServices() {
+    logger.info("SingleTaskRunner shutting down services.");
+    this.serviceManager.stopAsync();
+    try {
+      this.serviceManager.awaitStopped(1, TimeUnit.MINUTES);
+    } catch (final TimeoutException e) {
+      logger.error("Timeout waiting for services to shutdown.", e);
+    }
+  }
+
+  private void runTask()
+      throws IOException, InterruptedException {
+    logger.info("SingleTaskRunner running task.");
+    getSingleHelixTask();
+    this.task.run();
   }
 
-  void run() {
-    // tbd
+  private void getSingleHelixTask()
+      throws IOException {
+    final Path jobStateFilePath = getJobStateFilePath();
+    final FileSystem fs = getFileSystem();
+    final StateStores stateStores = new StateStores(this.clusterConfig, 
this.appWorkPath,
+        GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, 
this.appWorkPath,
+        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
+
+    final TaskAttemptBuilder taskAttemptBuilder = 
getTaskAttemptBuilder(stateStores);
+
+    this.task =
+        new SingleHelixTask(this.jobId, new Path(this.workUnitFilePath), 
jobStateFilePath, fs,
+            taskAttemptBuilder, stateStores);
+  }
+
+  private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores 
stateStores) {
+    final TaskAttemptBuilder taskAttemptBuilder =
+        new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor);
+    // No container id is set. Use the default.
+    taskAttemptBuilder.setTaskStateStore(stateStores.taskStateStore);
+    return taskAttemptBuilder;
+  }
+
+  private void getServices() {
+    final Properties properties = 
ConfigUtils.configToProperties(this.clusterConfig);
+    this.taskExecutor = new TaskExecutor(properties);
+    this.taskStateTracker = new GobblinHelixTaskStateTracker(properties);
+
+    final List<Service> services = Lists.newArrayList(this.taskExecutor, 
this.taskStateTracker);
+    this.serviceManager = new ServiceManager(services);
+  }
+
+  private Path getJobStateFilePath() {
+    final String jobStateFileName = this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME;
+    final Path jobStateFilePath = new Path(this.appWorkPath, jobStateFileName);
+    logger.info("job state file path: " + jobStateFilePath);
+    return jobStateFilePath;
+  }
+
+  private FileSystem getFileSystem()
+      throws IOException {
+    final Configuration conf = HadoopUtils.newConfiguration();
+
+    final FileSystem fs = 
this.clusterConfig.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+        
.get(URI.create(this.clusterConfig.getString(ConfigurationKeys.FS_URI_KEY)), 
conf)
+        : FileSystem.get(conf);
+
+    return fs;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java
index 1c9dac0..c010e75 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunnerMain.java
@@ -17,16 +17,16 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
 
-import org.apache.gobblin.util.JvmUtils;
-
 
 class SingleTaskRunnerMain {
   private static final Logger logger = 
LoggerFactory.getLogger(SingleTaskRunnerMain.class);
@@ -37,9 +37,8 @@ class SingleTaskRunnerMain {
     this.builder = builder;
   }
 
-  public static void main(final String[] args)
-      throws Exception {
-    logger.info(JvmUtils.getJvmInputArguments());
+  public static void main(final String[] args) {
+    logger.info("SingleTaskRunnerMain starting. args: " + 
Arrays.toString(args));
     final SingleTaskRunnerMain runnerMain = new SingleTaskRunnerMain(new 
SingleTaskRunnerBuilder());
     try {
       runnerMain.run(args);
@@ -49,12 +48,14 @@ class SingleTaskRunnerMain {
     }
   }
 
-  void run(final String[] args) {
+  void run(final String[] args)
+      throws IOException, InterruptedException {
     final OutputStreamWriter streamWriter = new OutputStreamWriter(System.out, 
Charsets.UTF_8);
     final PrintWriter writer = new PrintWriter(streamWriter, true);
     final SingleTaskRunnerMainOptions options = new 
SingleTaskRunnerMainOptions(args, writer);
     final SingleTaskRunner runner =
-        
this.builder.setClusterConfigFilePath(options.getClusterConfigFilePath()).setJobId(options.getJobId())
+        
this.builder.setClusterConfigFilePath(options.getClusterConfigFilePath())
+            .setJobId(options.getJobId())
             .setWorkUnitFilePath(options.getWorkUnitFilePath())
             .createSingleTaskRunner();
     runner.run();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f71f59d0/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java
index f10b2a5..ad9fb53 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainTest.java
@@ -31,7 +31,8 @@ import static org.mockito.Mockito.verify;
 public class SingleTaskRunnerMainTest {
 
   @Test
-  public void testRun() {
+  public void testRun()
+      throws Exception {
     final SingleTaskRunnerBuilder builder = spy(SingleTaskRunnerBuilder.class);
     final SingleTaskRunner taskRunner = mock(SingleTaskRunner.class);
     doReturn(taskRunner).when(builder).createSingleTaskRunner();

Reply via email to