Repository: incubator-gobblin Updated Branches: refs/heads/master 2e6ace666 -> 0fabaa7a4
[GOBBLIN-336] Only start necessary services in cluster workers Closes #2225 from HappyRay/improve-task-runner Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0fabaa7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0fabaa7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0fabaa7a Branch: refs/heads/master Commit: 0fabaa7a4cb692222456ba3a41e6948f64684825 Parents: 2e6ace6 Author: Ray Yang <[email protected]> Authored: Wed Dec 20 14:59:23 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Dec 20 14:59:23 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinTaskRunner.java | 123 +++++++++++-------- 1 file changed, 73 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0fabaa7a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 5f2802c..2580b0e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -111,14 +111,14 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER @Alpha public class GobblinTaskRunner { - private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class); + private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class); static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory"; private final String helixInstanceName; - private final HelixManager helixManager; + private HelixManager helixManager; private final ServiceManager serviceManager; @@ -137,67 +137,90 @@ public class GobblinTaskRunner { protected final Config config; protected final FileSystem fs; + private final List<Service> services = Lists.newArrayList(); + private final String applicationName; + private final String applicationId; + private final Path appWorkPath; public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.helixInstanceName = helixInstanceName; this.taskRunnerId = taskRunnerId; + this.applicationName = applicationName; + this.applicationId = applicationId; Configuration conf = HadoopUtils.newConfiguration(); this.fs = buildFileSystem(config, conf); - Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() - : GobblinClusterUtils - .getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); - this.config = saveConfigToFile(config, appWorkDir); + this.appWorkPath = initAppWorkDir(config, appWorkDirOptional); - String zkConnectionString = - config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); - LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString); + this.config = saveConfigToFile(config); - this.helixManager = HelixManagerFactory - .getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), - helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); + initHelixManager(); - Properties properties = ConfigUtils.configToProperties(config); + this.containerMetrics = buildContainerMetrics(); - TaskExecutor taskExecutor = new TaskExecutor(properties); - TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); + this.taskStateModelFactory = registerHelixTaskFactory(); - List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, - new JMXReportingService( - ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); services.addAll(getServices()); - this.serviceManager = new ServiceManager(services); + } - this.containerMetrics = - buildContainerMetrics(this.config, properties, applicationName, this.taskRunnerId); + private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) { + return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils + .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId); + } - URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri(); - Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) - .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, - ConfigValueFactory.fromAnyRef(rootPathUri.toString())); + private void initHelixManager() { + String zkConnectionString = + this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + logger.info("Using ZooKeeper connection string: " + zkConnectionString); + + this.helixManager = HelixManagerFactory.getZKHelixManager( + this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), + this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); + } - // Register task factory for the Helix task state model + private TaskStateModelFactory registerHelixTaskFactory() { Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); - Boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(); + boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(); TaskFactory taskFactory; if (isRunTaskInSeparateProcessEnabled) { - LOGGER.info("Running a task in a separate process is enabled."); + logger.info("Running a task in a separate process is enabled."); taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH); } else { - taskFactory = - new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, - this.fs, appWorkDir, stateStoreJobConfig, this.helixManager); + taskFactory = getInProcessTaskFactory(); } taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory); - this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); + TaskStateModelFactory taskStateModelFactory = + new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine() - .registerStateModelFactory("Task", this.taskStateModelFactory); + .registerStateModelFactory("Task", taskStateModelFactory); + return taskStateModelFactory; + } + + private TaskFactory getInProcessTaskFactory() { + Properties properties = ConfigUtils.configToProperties(this.config); + URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri(); + Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, + ConfigValueFactory.fromAnyRef(rootPathUri.toString())); + + TaskExecutor taskExecutor = new TaskExecutor(properties); + TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); + + services.add(taskExecutor); + services.add(taskStateTracker); + services.add(new JMXReportingService( + ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); + + TaskFactory taskFactory = + new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, + this.appWorkPath, stateStoreJobConfig, this.helixManager); + return taskFactory; } private Boolean getIsRunTaskInSeparateProcessEnabled() { @@ -209,10 +232,10 @@ public class GobblinTaskRunner { return enabled; } - private Config saveConfigToFile(Config config, Path appWorkDir) + private Config saveConfigToFile(Config config) throws IOException { - Config newConf = - config.withValue(CLUSTER_WORK_DIR, ConfigValueFactory.fromAnyRef(appWorkDir.toString())); + Config newConf = config + .withValue(CLUSTER_WORK_DIR, ConfigValueFactory.fromAnyRef(this.appWorkPath.toString())); ConfigUtils configUtils = new ConfigUtils(new FileUtils()); configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH); return newConf; @@ -222,7 +245,7 @@ public class GobblinTaskRunner { * Start this {@link GobblinTaskRunner} instance. */ public void start() { - LOGGER.info( + logger.info( String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); // Add a shutdown hook so the task scheduler gets properly shutdown @@ -248,7 +271,7 @@ public class GobblinTaskRunner { this.stopInProgress = true; - LOGGER.info("Stopping the Gobblin Task runner"); + logger.info("Stopping the Gobblin Task runner"); // Stop metric reporting if (this.containerMetrics.isPresent()) { @@ -259,7 +282,7 @@ public class GobblinTaskRunner { // Give the services 5 minutes to stop to ensure that we are responsive to shutdown requests this.serviceManager.stopAsync().awaitStopped(5, TimeUnit.MINUTES); } catch (TimeoutException te) { - LOGGER.error("Timeout in stopping the service manager", te); + logger.error("Timeout in stopping the service manager", te); } finally { this.taskStateModelFactory.shutdown(); @@ -295,7 +318,7 @@ public class GobblinTaskRunner { .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), getUserDefinedMessageHandlerFactory()); } catch (Exception e) { - LOGGER.error("HelixManager failed to connect", e); + logger.error("HelixManager failed to connect", e); throw Throwables.propagate(e); } } @@ -322,7 +345,7 @@ public class GobblinTaskRunner { @Override public void run() { - LOGGER.info("Running the shutdown hook"); + logger.info("Running the shutdown hook"); GobblinTaskRunner.this.stop(); } }); @@ -335,11 +358,11 @@ public class GobblinTaskRunner { : FileSystem.get(conf); } - private Optional<ContainerMetrics> buildContainerMetrics(Config config, Properties properties, - String applicationName, String workerId) { + private Optional<ContainerMetrics> buildContainerMetrics() { + Properties properties = ConfigUtils.configToProperties(this.config); if (GobblinMetrics.isEnabled(properties)) { - return Optional - .of(ContainerMetrics.get(ConfigUtils.configToState(config), applicationName, workerId)); + return Optional.of(ContainerMetrics + .get(ConfigUtils.configToState(config), this.applicationName, this.taskRunnerId)); } else { return Optional.absent(); } @@ -396,7 +419,7 @@ public class GobblinTaskRunner { return result; } - LOGGER + logger .info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()); ScheduledExecutorService shutdownMessageHandlingCompletionWatcher = @@ -427,7 +450,7 @@ public class GobblinTaskRunner { @Override public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error(String + logger.error(String .format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); } @@ -477,7 +500,7 @@ public class GobblinTaskRunner { @Override public HelixTaskResult handleMessage() throws InterruptedException { - LOGGER.warn(String.format("No handling setup for %s message of subtype: %s", + logger.warn(String.format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(), this._message.getMsgSubType())); HelixTaskResult helixTaskResult = new HelixTaskResult(); @@ -487,7 +510,7 @@ public class GobblinTaskRunner { @Override public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error(String + logger.error(String .format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); } @@ -527,7 +550,7 @@ public class GobblinTaskRunner { System.exit(1); } - LOGGER.info(JvmUtils.getJvmInputArguments()); + logger.info(JvmUtils.getJvmInputArguments()); String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
