Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5a169b6eb -> a288779df


Use intellij to reformat TaskRunner class to confirm to the code style

Closes #2223 from HappyRay/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a288779d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a288779d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a288779d

Branch: refs/heads/master
Commit: a288779dff19dbc7ae576c1259a15706292e7e72
Parents: 5a169b6
Author: Ray Yang <[email protected]>
Authored: Tue Dec 19 15:52:29 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Dec 19 15:52:29 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinTaskRunner.java      | 119 +++++++++++--------
 1 file changed, 68 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a288779d/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 3274bd7..ca715ba 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -35,11 +35,9 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -52,7 +50,6 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +64,6 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
@@ -137,8 +133,9 @@ public class GobblinTaskRunner {
 
   protected final FileSystem fs;
 
-  public GobblinTaskRunner(String applicationName, String helixInstanceName, 
String applicationId, String taskRunnerId, Config config,
-      Optional<Path> appWorkDirOptional) throws Exception {
+  public GobblinTaskRunner(String applicationName, String helixInstanceName, 
String applicationId,
+      String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
+      throws Exception {
     this.helixInstanceName = helixInstanceName;
     this.config = config;
     this.taskRunnerId = taskRunnerId;
@@ -146,47 +143,54 @@ public class GobblinTaskRunner {
     Configuration conf = HadoopUtils.newConfiguration();
     this.fs = buildFileSystem(this.config, conf);
 
-    String zkConnectionString = 
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String zkConnectionString =
+        
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString);
 
     this.helixManager = HelixManagerFactory
-        
.getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
 helixInstanceName,
-            InstanceType.PARTICIPANT, zkConnectionString);
+        
.getZKHelixManager(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
+            helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
 
     Properties properties = ConfigUtils.configToProperties(config);
 
     TaskExecutor taskExecutor = new TaskExecutor(properties);
     TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
 
-    Path appWorkDir = appWorkDirOptional.isPresent() ? 
appWorkDirOptional.get() :
-        GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, 
applicationName, applicationId);
+    Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
+        : GobblinClusterUtils
+            .getAppWorkDirPathFromConfig(config, this.fs, applicationName, 
applicationId);
 
     List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
-        new JMXReportingService(ImmutableMap.of("task.executor" 
,taskExecutor.getTaskExecutorQueueMetricSet())));
+        new JMXReportingService(
+            ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
     services.addAll(getServices());
 
     this.serviceManager = new ServiceManager(services);
 
-    this.containerMetrics = buildContainerMetrics(this.config, properties, 
applicationName, this.taskRunnerId);
+    this.containerMetrics =
+        buildContainerMetrics(this.config, properties, applicationName, 
this.taskRunnerId);
 
     URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
-        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
 
     // Register task factory for the Helix task state model
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
     taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME,
-        new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs, appWorkDir,
-            stateStoreJobConfig, this.helixManager));
+        new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs,
+            appWorkDir, stateStoreJobConfig, this.helixManager));
     this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, 
taskFactoryMap);
-    
this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", 
this.taskStateModelFactory);
+    this.helixManager.getStateMachineEngine()
+        .registerStateModelFactory("Task", this.taskStateModelFactory);
   }
 
   /**
    * Start this {@link GobblinTaskRunner} instance.
    */
   public void start() {
-    LOGGER.info(String.format("Starting %s in container %s", 
this.helixInstanceName, this.taskRunnerId));
+    LOGGER.info(
+        String.format("Starting %s in container %s", this.helixInstanceName, 
this.taskRunnerId));
 
     // Add a shutdown hook so the task scheduler gets properly shutdown
     addShutdownHook();
@@ -196,7 +200,8 @@ public class GobblinTaskRunner {
     // Start metric reporting
     if (this.containerMetrics.isPresent()) {
       this.containerMetrics.get()
-          
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config), 
this.taskRunnerId);
+          
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config),
+              this.taskRunnerId);
     }
 
     this.serviceManager.startAsync();
@@ -250,8 +255,9 @@ public class GobblinTaskRunner {
   void connectHelixManager() {
     try {
       this.helixManager.connect();
-      
this.helixManager.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-          new ParticipantShutdownMessageHandlerFactory());
+      this.helixManager.getMessagingService()
+          
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+              new ParticipantShutdownMessageHandlerFactory());
       this.helixManager.getMessagingService()
           
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
               getUserDefinedMessageHandlerFactory());
@@ -289,16 +295,18 @@ public class GobblinTaskRunner {
     });
   }
 
-  private FileSystem buildFileSystem(Config config, Configuration conf) throws 
IOException {
-    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ?
-        
FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), 
conf) :
-        FileSystem.get(conf);
+  private FileSystem buildFileSystem(Config config, Configuration conf)
+      throws IOException {
+    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+        : FileSystem.get(conf);
   }
 
-  private Optional<ContainerMetrics> buildContainerMetrics(Config config, 
Properties properties, String applicationName,
-      String workerId) {
+  private Optional<ContainerMetrics> buildContainerMetrics(Config config, 
Properties properties,
+      String applicationName, String workerId) {
     if (GobblinMetrics.isEnabled(properties)) {
-      return 
Optional.of(ContainerMetrics.get(ConfigUtils.configToState(config), 
applicationName, workerId));
+      return Optional
+          .of(ContainerMetrics.get(ConfigUtils.configToState(config), 
applicationName, workerId));
     } else {
       return Optional.absent();
     }
@@ -340,12 +348,13 @@ public class GobblinTaskRunner {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage()
+          throws InterruptedException {
         String messageSubType = this._message.getMsgSubType();
-        Preconditions
-            
.checkArgument(messageSubType.equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()),
-                String
-                    .format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
+        Preconditions.checkArgument(messageSubType
+            
.equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), 
String
+            .format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+                messageSubType));
 
         HelixTaskResult result = new HelixTaskResult();
 
@@ -354,7 +363,8 @@ public class GobblinTaskRunner {
           return result;
         }
 
-        LOGGER.info("Handling message " + 
HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
+        LOGGER
+            .info("Handling message " + 
HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
 
         ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
             MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
@@ -369,8 +379,8 @@ public class GobblinTaskRunner {
             HelixManager helixManager = _notificationContext.getManager();
             HelixDataAccessor helixDataAccessor = 
helixManager.getHelixDataAccessor();
 
-            HelixProperty helixProperty = helixDataAccessor
-                .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), 
helixManager.getInstanceName()));
+            HelixProperty helixProperty = helixDataAccessor.getProperty(
+                _message.getKey(helixDataAccessor.keyBuilder(), 
helixManager.getInstanceName()));
             // The absence of the shutdown message indicates it has been 
removed
             if (helixProperty == null) {
               GobblinTaskRunner.this.stop();
@@ -384,8 +394,9 @@ public class GobblinTaskRunner {
 
       @Override
       public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(
-            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
+        LOGGER.error(String
+            .format("Failed to handle message with exception %s, error code 
%s, error type %s", e,
+                code, type));
       }
     }
   }
@@ -431,10 +442,10 @@ public class GobblinTaskRunner {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        LOGGER.warn(String
-            .format("No handling setup for %s message of subtype: %s", 
Message.MessageType.USER_DEFINE_MSG.toString(),
-                this._message.getMsgSubType()));
+      public HelixTaskResult handleMessage()
+          throws InterruptedException {
+        LOGGER.warn(String.format("No handling setup for %s message of 
subtype: %s",
+            Message.MessageType.USER_DEFINE_MSG.toString(), 
this._message.getMsgSubType()));
 
         HelixTaskResult helixTaskResult = new HelixTaskResult();
         helixTaskResult.setSuccess(true);
@@ -443,8 +454,9 @@ public class GobblinTaskRunner {
 
       @Override
       public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(
-            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
+        LOGGER.error(String
+            .format("Failed to handle message with exception %s, error code 
%s, error type %s", e,
+                code, type));
       }
     }
   }
@@ -459,8 +471,10 @@ public class GobblinTaskRunner {
 
   public static Options buildOptions() {
     Options options = new Options();
-    options.addOption("a", 
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, 
"Application name");
-    options.addOption("i", 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true, "Helix 
instance name");
+    options.addOption("a", 
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
+        "Application name");
+    options.addOption("i", 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
+        "Helix instance name");
     return options;
   }
 
@@ -469,7 +483,8 @@ public class GobblinTaskRunner {
     formatter.printHelp(GobblinClusterManager.class.getSimpleName(), options);
   }
 
-  public static void main(String[] args) throws Exception {
+  public static void main(String[] args)
+      throws Exception {
     Options options = buildOptions();
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
@@ -481,12 +496,14 @@ public class GobblinTaskRunner {
 
       LOGGER.info(JvmUtils.getJvmInputArguments());
 
-      String applicationName = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
-      String helixInstanceName = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
+      String applicationName =
+          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+      String helixInstanceName =
+          
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
 
       GobblinTaskRunner gobblinWorkUnitRunner =
-          new GobblinTaskRunner(applicationName, helixInstanceName, 
getApplicationId(), getTaskRunnerId(),
-              ConfigFactory.load(), Optional.<Path>absent());
+          new GobblinTaskRunner(applicationName, helixInstanceName, 
getApplicationId(),
+              getTaskRunnerId(), ConfigFactory.load(), 
Optional.<Path>absent());
       gobblinWorkUnitRunner.start();
     } catch (ParseException pe) {
       printUsage(options);

Reply via email to