Repository: incubator-gobblin Updated Branches: refs/heads/master 5a169b6eb -> a288779df
Use intellij to reformat TaskRunner class to confirm to the code style Closes #2223 from HappyRay/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a288779d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a288779d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a288779d Branch: refs/heads/master Commit: a288779dff19dbc7ae576c1259a15706292e7e72 Parents: 5a169b6 Author: Ray Yang <[email protected]> Authored: Tue Dec 19 15:52:29 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Dec 19 15:52:29 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinTaskRunner.java | 119 +++++++++++-------- 1 file changed, 68 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a288779d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 3274bd7..ca715ba 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -35,11 +35,9 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; @@ -52,7 +50,6 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskStateModelFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +64,6 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; - import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; @@ -137,8 +133,9 @@ public class GobblinTaskRunner { protected final FileSystem fs; - public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, String taskRunnerId, Config config, - Optional<Path> appWorkDirOptional) throws Exception { + public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, + String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) + throws Exception { this.helixInstanceName = helixInstanceName; this.config = config; this.taskRunnerId = taskRunnerId; @@ -146,47 +143,54 @@ public class GobblinTaskRunner { Configuration conf = HadoopUtils.newConfiguration(); this.fs = buildFileSystem(this.config, conf); - String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String zkConnectionString = + config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString); this.helixManager = HelixManagerFactory - .getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), helixInstanceName, - InstanceType.PARTICIPANT, zkConnectionString); + .getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), + helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); Properties properties = ConfigUtils.configToProperties(config); TaskExecutor taskExecutor = new TaskExecutor(properties); TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); - Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : - GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); + Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() + : GobblinClusterUtils + .getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, - new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet()))); + new JMXReportingService( + ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); services.addAll(getServices()); this.serviceManager = new ServiceManager(services); - this.containerMetrics = buildContainerMetrics(this.config, properties, applicationName, this.taskRunnerId); + this.containerMetrics = + buildContainerMetrics(this.config, properties, applicationName, this.taskRunnerId); URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri(); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) - .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(rootPathUri.toString())); + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, + ConfigValueFactory.fromAnyRef(rootPathUri.toString())); // Register task factory for the Helix task state model Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, - new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, appWorkDir, - stateStoreJobConfig, this.helixManager)); + new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, + appWorkDir, stateStoreJobConfig, this.helixManager)); this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); - this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", this.taskStateModelFactory); + this.helixManager.getStateMachineEngine() + .registerStateModelFactory("Task", this.taskStateModelFactory); } /** * Start this {@link GobblinTaskRunner} instance. */ public void start() { - LOGGER.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); + LOGGER.info( + String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); // Add a shutdown hook so the task scheduler gets properly shutdown addShutdownHook(); @@ -196,7 +200,8 @@ public class GobblinTaskRunner { // Start metric reporting if (this.containerMetrics.isPresent()) { this.containerMetrics.get() - .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config), this.taskRunnerId); + .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config), + this.taskRunnerId); } this.serviceManager.startAsync(); @@ -250,8 +255,9 @@ public class GobblinTaskRunner { void connectHelixManager() { try { this.helixManager.connect(); - this.helixManager.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, - new ParticipantShutdownMessageHandlerFactory()); + this.helixManager.getMessagingService() + .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, + new ParticipantShutdownMessageHandlerFactory()); this.helixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), getUserDefinedMessageHandlerFactory()); @@ -289,16 +295,18 @@ public class GobblinTaskRunner { }); } - private FileSystem buildFileSystem(Config config, Configuration conf) throws IOException { - return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? - FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf) : - FileSystem.get(conf); + private FileSystem buildFileSystem(Config config, Configuration conf) + throws IOException { + return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem + .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf) + : FileSystem.get(conf); } - private Optional<ContainerMetrics> buildContainerMetrics(Config config, Properties properties, String applicationName, - String workerId) { + private Optional<ContainerMetrics> buildContainerMetrics(Config config, Properties properties, + String applicationName, String workerId) { if (GobblinMetrics.isEnabled(properties)) { - return Optional.of(ContainerMetrics.get(ConfigUtils.configToState(config), applicationName, workerId)); + return Optional + .of(ContainerMetrics.get(ConfigUtils.configToState(config), applicationName, workerId)); } else { return Optional.absent(); } @@ -340,12 +348,13 @@ public class GobblinTaskRunner { } @Override - public HelixTaskResult handleMessage() throws InterruptedException { + public HelixTaskResult handleMessage() + throws InterruptedException { String messageSubType = this._message.getMsgSubType(); - Preconditions - .checkArgument(messageSubType.equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), - String - .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType)); + Preconditions.checkArgument(messageSubType + .equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), String + .format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, + messageSubType)); HelixTaskResult result = new HelixTaskResult(); @@ -354,7 +363,8 @@ public class GobblinTaskRunner { return result; } - LOGGER.info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()); + LOGGER + .info("Handling message " + HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()); ScheduledExecutorService shutdownMessageHandlingCompletionWatcher = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); @@ -369,8 +379,8 @@ public class GobblinTaskRunner { HelixManager helixManager = _notificationContext.getManager(); HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); - HelixProperty helixProperty = helixDataAccessor - .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), helixManager.getInstanceName())); + HelixProperty helixProperty = helixDataAccessor.getProperty( + _message.getKey(helixDataAccessor.keyBuilder(), helixManager.getInstanceName())); // The absence of the shutdown message indicates it has been removed if (helixProperty == null) { GobblinTaskRunner.this.stop(); @@ -384,8 +394,9 @@ public class GobblinTaskRunner { @Override public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error( - String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); + LOGGER.error(String + .format("Failed to handle message with exception %s, error code %s, error type %s", e, + code, type)); } } } @@ -431,10 +442,10 @@ public class GobblinTaskRunner { } @Override - public HelixTaskResult handleMessage() throws InterruptedException { - LOGGER.warn(String - .format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(), - this._message.getMsgSubType())); + public HelixTaskResult handleMessage() + throws InterruptedException { + LOGGER.warn(String.format("No handling setup for %s message of subtype: %s", + Message.MessageType.USER_DEFINE_MSG.toString(), this._message.getMsgSubType())); HelixTaskResult helixTaskResult = new HelixTaskResult(); helixTaskResult.setSuccess(true); @@ -443,8 +454,9 @@ public class GobblinTaskRunner { @Override public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error( - String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); + LOGGER.error(String + .format("Failed to handle message with exception %s, error code %s, error type %s", e, + code, type)); } } } @@ -459,8 +471,10 @@ public class GobblinTaskRunner { public static Options buildOptions() { Options options = new Options(); - options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Application name"); - options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true, "Helix instance name"); + options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, + "Application name"); + options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true, + "Helix instance name"); return options; } @@ -469,7 +483,8 @@ public class GobblinTaskRunner { formatter.printHelp(GobblinClusterManager.class.getSimpleName(), options); } - public static void main(String[] args) throws Exception { + public static void main(String[] args) + throws Exception { Options options = buildOptions(); try { CommandLine cmd = new DefaultParser().parse(options, args); @@ -481,12 +496,14 @@ public class GobblinTaskRunner { LOGGER.info(JvmUtils.getJvmInputArguments()); - String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME); - String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME); + String applicationName = + cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME); + String helixInstanceName = + cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME); GobblinTaskRunner gobblinWorkUnitRunner = - new GobblinTaskRunner(applicationName, helixInstanceName, getApplicationId(), getTaskRunnerId(), - ConfigFactory.load(), Optional.<Path>absent()); + new GobblinTaskRunner(applicationName, helixInstanceName, getApplicationId(), + getTaskRunnerId(), ConfigFactory.load(), Optional.<Path>absent()); gobblinWorkUnitRunner.start(); } catch (ParseException pe) { printUsage(options);
