Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2e6ace666 -> 0fabaa7a4


[GOBBLIN-336] Only start necessary services in cluster workers

Closes #2225 from HappyRay/improve-task-runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0fabaa7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0fabaa7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0fabaa7a

Branch: refs/heads/master
Commit: 0fabaa7a4cb692222456ba3a41e6948f64684825
Parents: 2e6ace6
Author: Ray Yang <[email protected]>
Authored: Wed Dec 20 14:59:23 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Wed Dec 20 14:59:23 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinTaskRunner.java      | 123 +++++++++++--------
 1 file changed, 73 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0fabaa7a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 5f2802c..2580b0e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -111,14 +111,14 @@ import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
 @Alpha
 public class GobblinTaskRunner {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTaskRunner.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(GobblinTaskRunner.class);
   static final java.nio.file.Path CLUSTER_CONF_PATH = 
Paths.get("generated-gobblin-cluster.conf");
 
   static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";
 
   private final String helixInstanceName;
 
-  private final HelixManager helixManager;
+  private HelixManager helixManager;
 
   private final ServiceManager serviceManager;
 
@@ -137,67 +137,90 @@ public class GobblinTaskRunner {
   protected final Config config;
 
   protected final FileSystem fs;
+  private final List<Service> services = Lists.newArrayList();
+  private final String applicationName;
+  private final String applicationId;
+  private final Path appWorkPath;
 
   public GobblinTaskRunner(String applicationName, String helixInstanceName, 
String applicationId,
       String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
       throws Exception {
     this.helixInstanceName = helixInstanceName;
     this.taskRunnerId = taskRunnerId;
+    this.applicationName = applicationName;
+    this.applicationId = applicationId;
 
     Configuration conf = HadoopUtils.newConfiguration();
     this.fs = buildFileSystem(config, conf);
-    Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
-        : GobblinClusterUtils
-            .getAppWorkDirPathFromConfig(config, this.fs, applicationName, 
applicationId);
 
-    this.config = saveConfigToFile(config, appWorkDir);
+    this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
 
-    String zkConnectionString =
-        
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString);
+    this.config = saveConfigToFile(config);
 
-    this.helixManager = HelixManagerFactory
-        
.getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
-            helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
+    initHelixManager();
 
-    Properties properties = ConfigUtils.configToProperties(config);
+    this.containerMetrics = buildContainerMetrics();
 
-    TaskExecutor taskExecutor = new TaskExecutor(properties);
-    TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
+    this.taskStateModelFactory = registerHelixTaskFactory();
 
-    List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
-        new JMXReportingService(
-            ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
     services.addAll(getServices());
-
     this.serviceManager = new ServiceManager(services);
+  }
 
-    this.containerMetrics =
-        buildContainerMetrics(this.config, properties, applicationName, 
this.taskRunnerId);
+  private Path initAppWorkDir(Config config, Optional<Path> 
appWorkDirOptional) {
+    return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : 
GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, 
this.applicationId);
+  }
 
-    URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
-    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
-        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
-            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+  private void initHelixManager() {
+    String zkConnectionString =
+        
this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    logger.info("Using ZooKeeper connection string: " + zkConnectionString);
+
+    this.helixManager = HelixManagerFactory.getZKHelixManager(
+        
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
+        this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
+  }
 
-    // Register task factory for the Helix task state model
+  private TaskStateModelFactory registerHelixTaskFactory() {
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
 
-    Boolean isRunTaskInSeparateProcessEnabled = 
getIsRunTaskInSeparateProcessEnabled();
+    boolean isRunTaskInSeparateProcessEnabled = 
getIsRunTaskInSeparateProcessEnabled();
     TaskFactory taskFactory;
     if (isRunTaskInSeparateProcessEnabled) {
-      LOGGER.info("Running a task in a separate process is enabled.");
+      logger.info("Running a task in a separate process is enabled.");
       taskFactory = new HelixTaskFactory(this.containerMetrics, 
CLUSTER_CONF_PATH);
     } else {
-      taskFactory =
-          new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker,
-              this.fs, appWorkDir, stateStoreJobConfig, this.helixManager);
+      taskFactory = getInProcessTaskFactory();
     }
 
     taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory);
-    this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, 
taskFactoryMap);
+    TaskStateModelFactory taskStateModelFactory =
+        new TaskStateModelFactory(this.helixManager, taskFactoryMap);
     this.helixManager.getStateMachineEngine()
-        .registerStateModelFactory("Task", this.taskStateModelFactory);
+        .registerStateModelFactory("Task", taskStateModelFactory);
+    return taskStateModelFactory;
+  }
+
+  private TaskFactory getInProcessTaskFactory() {
+    Properties properties = ConfigUtils.configToProperties(this.config);
+    URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri();
+    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+
+    TaskExecutor taskExecutor = new TaskExecutor(properties);
+    TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
+
+    services.add(taskExecutor);
+    services.add(taskStateTracker);
+    services.add(new JMXReportingService(
+        ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
+
+    TaskFactory taskFactory =
+        new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs,
+            this.appWorkPath, stateStoreJobConfig, this.helixManager);
+    return taskFactory;
   }
 
   private Boolean getIsRunTaskInSeparateProcessEnabled() {
@@ -209,10 +232,10 @@ public class GobblinTaskRunner {
     return enabled;
   }
 
-  private Config saveConfigToFile(Config config, Path appWorkDir)
+  private Config saveConfigToFile(Config config)
       throws IOException {
-    Config newConf =
-        config.withValue(CLUSTER_WORK_DIR, 
ConfigValueFactory.fromAnyRef(appWorkDir.toString()));
+    Config newConf = config
+        .withValue(CLUSTER_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
     ConfigUtils configUtils = new ConfigUtils(new FileUtils());
     configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
     return newConf;
@@ -222,7 +245,7 @@ public class GobblinTaskRunner {
    * Start this {@link GobblinTaskRunner} instance.
    */
   public void start() {
-    LOGGER.info(
+    logger.info(
         String.format("Starting %s in container %s", this.helixInstanceName, 
this.taskRunnerId));
 
     // Add a shutdown hook so the task scheduler gets properly shutdown
@@ -248,7 +271,7 @@ public class GobblinTaskRunner {
 
     this.stopInProgress = true;
 
-    LOGGER.info("Stopping the Gobblin Task runner");
+    logger.info("Stopping the Gobblin Task runner");
 
     // Stop metric reporting
     if (this.containerMetrics.isPresent()) {
@@ -259,7 +282,7 @@ public class GobblinTaskRunner {
       // Give the services 5 minutes to stop to ensure that we are responsive 
to shutdown requests
       this.serviceManager.stopAsync().awaitStopped(5, TimeUnit.MINUTES);
     } catch (TimeoutException te) {
-      LOGGER.error("Timeout in stopping the service manager", te);
+      logger.error("Timeout in stopping the service manager", te);
     } finally {
       this.taskStateModelFactory.shutdown();
 
@@ -295,7 +318,7 @@ public class GobblinTaskRunner {
           
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
               getUserDefinedMessageHandlerFactory());
     } catch (Exception e) {
-      LOGGER.error("HelixManager failed to connect", e);
+      logger.error("HelixManager failed to connect", e);
       throw Throwables.propagate(e);
     }
   }
@@ -322,7 +345,7 @@ public class GobblinTaskRunner {
 
       @Override
       public void run() {
-        LOGGER.info("Running the shutdown hook");
+        logger.info("Running the shutdown hook");
         GobblinTaskRunner.this.stop();
       }
     });
@@ -335,11 +358,11 @@ public class GobblinTaskRunner {
         : FileSystem.get(conf);
   }
 
-  private Optional<ContainerMetrics> buildContainerMetrics(Config config, 
Properties properties,
-      String applicationName, String workerId) {
+  private Optional<ContainerMetrics> buildContainerMetrics() {
+    Properties properties = ConfigUtils.configToProperties(this.config);
     if (GobblinMetrics.isEnabled(properties)) {
-      return Optional
-          .of(ContainerMetrics.get(ConfigUtils.configToState(config), 
applicationName, workerId));
+      return Optional.of(ContainerMetrics
+          .get(ConfigUtils.configToState(config), this.applicationName, 
this.taskRunnerId));
     } else {
       return Optional.absent();
     }
@@ -396,7 +419,7 @@ public class GobblinTaskRunner {
           return result;
         }
 
-        LOGGER
+        logger
             .info("Handling message " + 
HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
 
         ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
@@ -427,7 +450,7 @@ public class GobblinTaskRunner {
 
       @Override
       public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(String
+        logger.error(String
             .format("Failed to handle message with exception %s, error code 
%s, error type %s", e,
                 code, type));
       }
@@ -477,7 +500,7 @@ public class GobblinTaskRunner {
       @Override
       public HelixTaskResult handleMessage()
           throws InterruptedException {
-        LOGGER.warn(String.format("No handling setup for %s message of 
subtype: %s",
+        logger.warn(String.format("No handling setup for %s message of 
subtype: %s",
             Message.MessageType.USER_DEFINE_MSG.toString(), 
this._message.getMsgSubType()));
 
         HelixTaskResult helixTaskResult = new HelixTaskResult();
@@ -487,7 +510,7 @@ public class GobblinTaskRunner {
 
       @Override
       public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(String
+        logger.error(String
             .format("Failed to handle message with exception %s, error code 
%s, error type %s", e,
                 code, type));
       }
@@ -527,7 +550,7 @@ public class GobblinTaskRunner {
         System.exit(1);
       }
 
-      LOGGER.info(JvmUtils.getJvmInputArguments());
+      logger.info(JvmUtils.getJvmInputArguments());
 
       String applicationName =
           
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);

Reply via email to