[GOBBLIN-617] Add distributed job launcher metrics and some refactoring.

Closes #2484 from yukuai518/distMetrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1155cdc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1155cdc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1155cdc5

Branch: refs/heads/master
Commit: 1155cdc5e1944c536ef6c9707bf947dd1a57d67b
Parents: c103a8f
Author: Kuai Yu <[email protected]>
Authored: Tue Oct 23 16:04:44 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Oct 23 16:04:44 2018 -0700

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |   1 +
 .../gobblin/cluster/GobblinClusterManager.java  |  22 +-
 ...blinHelixDistributeJobExecutionLauncher.java |  70 +++---
 .../gobblin/cluster/GobblinHelixJobFactory.java |  26 +-
 .../cluster/GobblinHelixJobLauncher.java        |  28 +--
 .../GobblinHelixJobLauncherListener.java        |  80 +++++++
 .../cluster/GobblinHelixJobLauncherMetrics.java |  74 ++++++
 .../cluster/GobblinHelixJobScheduler.java       | 240 ++++++-------------
 .../GobblinHelixJobSchedulerMetrics.java        |  96 ++++++++
 .../gobblin/cluster/GobblinHelixJobTask.java    |  54 ++++-
 .../cluster/GobblinHelixMultiManager.java       |   6 +-
 .../gobblin/cluster/GobblinTaskRunner.java      |  38 ++-
 .../cluster/GobblinTaskRunnerMetrics.java       |  14 +-
 .../cluster/HelixRetriggeringJobCallable.java   |  98 +++++---
 .../gobblin/cluster/TaskRunnerSuiteBase.java    |   6 +-
 .../cluster/TaskRunnerSuiteProcessModel.java    |   7 +-
 .../cluster/TaskRunnerSuiteThreadModel.java     |  21 +-
 .../TaskRunnerSuiteForJobFactoryTest.java       |  25 +-
 .../instrumented/StandardMetricsBridge.java     |  16 +-
 .../apache/gobblin/runtime/api/JobCatalog.java  |  14 +-
 .../runtime/api/JobExecutionLauncher.java       |  17 +-
 .../apache/gobblin/runtime/api/SpecCatalog.java |  11 +-
 .../modules/core/GobblinServiceManager.java     |   6 +
 23 files changed, 629 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 2b1ba09..3fb665e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -77,6 +77,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob";
   public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + 
"planning.";
   public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey";
+  public static final String PLANNING_JOB_CREATE_TIME = PLANNING_CONF_PREFIX + 
"createTime";
 
   // job spec operation
   public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + 
"job.alwaysDelete";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index b1d0f43..e90a65a 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -54,7 +56,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import javax.annotation.Nonnull;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -105,9 +106,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinClusterManager.class);
 
-  @VisibleForTesting
-  protected GobblinHelixMultiManager multiManager;
-
   private StopStatus stopStatus = new StopStatus(false);
 
   protected ServiceBasedAppLauncher applicationLauncher;
@@ -127,12 +125,11 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   // set to true to stop the idle process thread
   private volatile boolean stopIdleProcessThread = false;
 
-  // flag to keep track of leader and avoid processing duplicate leadership 
change notifications
-  private boolean isLeader = false;
-
   private final boolean isStandaloneMode;
 
   @Getter
+  protected GobblinHelixMultiManager multiManager;
+  @Getter
   private MutableJobCatalog jobCatalog;
   @Getter
   private GobblinHelixJobScheduler jobScheduler;
@@ -142,14 +139,12 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private final String clusterName;
   private final Config config;
   private final MetricContext metricContext;
-  private final StandardMetrics metrics;
 
   public GobblinClusterManager(String clusterName, String applicationId, 
Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     this.clusterName = clusterName;
     this.config = config;
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
-    this.metrics = new StandardMetrics();
     this.isStandaloneMode = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
 
@@ -446,11 +441,14 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   @Override
-  public StandardMetrics getStandardMetrics() {
-    return this.metrics;
+  public Collection<StandardMetrics> getStandardMetricsCollection() {
+    List<StandardMetrics> list = new ArrayList();
+    list.addAll(this.jobScheduler.getStandardMetricsCollection());
+    list.addAll(this.multiManager.getStandardMetricsCollection());
+    list.addAll(this.jobCatalog.getStandardMetricsCollection());
+    return list;
   }
 
-  @Nonnull
   @Override
   public MetricContext getMetricContext() {
     return this.metricContext;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index eb78938..f7b0a83 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -43,6 +43,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
@@ -53,7 +54,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.api.ExecutionResult;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
@@ -63,19 +63,19 @@ import org.apache.gobblin.runtime.api.MonitoredObject;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
- * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs 
on the same instance (node), this
- * {@link JobExecutionLauncher} can distribute the original job (called 
planning job) to Helix. Helix will
- * assign this job to one participant. The participant can parse the original 
job properties and run the task driver.
+ * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs 
on the same
+ * instance (manager), this {@link JobExecutionLauncher} will distribute the 
original job
+ * to one of the worker (participant) node. The original job will be launched 
there.
  *
  * <p>
  *   For job submission, the Helix workflow name will be the original job name 
with prefix
- *   {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The 
Helix job name will be the auto-generated planning
- *   job ID with prefix {@link 
GobblinClusterConfigurationKeys#PLANNING_ID_KEY}.
+ *   {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The 
Helix job name
+ *   will be the auto-generated planning job ID with prefix
+ *   {@link GobblinClusterConfigurationKeys#PLANNING_ID_KEY}.
  * </p>
  *
  * <p>
@@ -87,10 +87,11 @@ import org.apache.gobblin.util.PropertiesUtils;
 @Alpha
 @Slf4j
 class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher, Closeable {
+
   protected HelixManager helixManager;
   protected TaskDriver helixTaskDriver;
-  protected Properties sysProperties;
-  protected Properties jobProperties;
+  protected Properties sysProps;
+  protected Properties jobPlanningProps;
   protected StateStores stateStores;
 
   protected static final String PLANNING_WORK_UNIT_DIR_NAME = 
"_plan_workunits";
@@ -115,11 +116,12 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws 
Exception {
     this.helixManager = builder.manager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
-    this.sysProperties = builder.sysProperties;
-    this.jobProperties = builder.jobProperties;
+    this.sysProps = builder.sysProps;
+    this.jobPlanningProps = builder.jobPlanningProps;
     this.jobSubmitted = false;
-    Config combined = ConfigUtils.propertiesToConfig(jobProperties)
-        .withFallback(ConfigUtils.propertiesToConfig(sysProperties));
+
+    Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+        .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
     Config stateStoreJobConfig = combined
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
@@ -141,8 +143,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   }
 
   private void executeCancellation() {
-    String planningName = getPlanningJobId(this.jobProperties);
     if (this.jobSubmitted) {
+      String planningName = getPlanningJobId(this.jobPlanningProps);
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
           // TODO : fix this when HELIX-1180 is completed
@@ -159,8 +161,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
 
   @Setter
   public static class Builder {
-    Properties sysProperties;
-    Properties jobProperties;
+    Properties sysProps;
+    Properties jobPlanningProps;
     HelixManager manager;
     Path appWorkDir;
     public GobblinHelixDistributeJobExecutionLauncher build() throws Exception 
{
@@ -168,18 +170,12 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     }
   }
 
-  private String getPlanningJobName (Properties jobProps) {
-    String jobName = JobState.getJobNameFromProps(jobProps);
-    return GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + jobName;
-  }
-
-  protected String getPlanningJobId (Properties jobProps) {
-    if (jobProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) 
{
-      return 
jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+  protected String getPlanningJobId (Properties jobPlanningProps) {
+    if 
(jobPlanningProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) 
{
+      return 
jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+    } else {
+      throw new RuntimeException("Cannot find planning id");
     }
-    String planningId = 
JobLauncherUtils.newJobId(getPlanningJobName(jobProps));
-    jobProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
-    return planningId;
   }
 
   /**
@@ -193,7 +189,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
    *
    * In short, the planning job will run once and requires no timeout.
    */
-  private JobConfig.Builder createPlanningJob (Properties jobProps) {
+  private JobConfig.Builder createJobBuilder (Properties jobProps) {
     // Create a single task for job planning
     String planningId = getPlanningJobId(jobProps);
     Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
@@ -224,7 +220,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   }
 
   /**
-   * Submit job to helix so that it can be re-assigned to one of its 
participants.
+   * Submit a planning job to helix so that it can launched from a remote node.
    * @param jobName A planning job name which has prefix {@link 
GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}.
    * @param jobId   A planning job id created by {@link 
GobblinHelixDistributeJobExecutionLauncher#getPlanningJobId}.
    * @param jobConfigBuilder A job config builder which contains a single task.
@@ -241,19 +237,19 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
   }
 
   @Override
-  public DistributeJobMonitor launchJob(JobSpec jobSpec) {
-    this.jobMonitor = new DistributeJobMonitor(new 
DistributeJobCallable(this.jobProperties));
+  public DistributeJobMonitor launchJob(@Nullable  JobSpec jobSpec) {
+    this.jobMonitor = new DistributeJobMonitor(new 
DistributeJobCallable(this.jobPlanningProps));
     return this.jobMonitor;
   }
 
   @AllArgsConstructor
   private class DistributeJobCallable implements Callable<ExecutionResult> {
-    Properties jobProps;
+    Properties jobPlanningProps;
     @Override
     public DistributeJobResult call()
         throws Exception {
-      String planningId = getPlanningJobId(this.jobProps);
-      JobConfig.Builder builder = createPlanningJob(this.jobProps);
+      String planningId = getPlanningJobId(this.jobPlanningProps);
+      JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps);
       try {
         submitJobToHelix(planningId, planningId, builder);
         return waitForJobCompletion(planningId, planningId);
@@ -265,9 +261,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   }
 
   private DistributeJobResult waitForJobCompletion(String workFlowName, String 
jobName) throws InterruptedException {
-    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
-    long timeoutInSeconds = 
Long.parseLong(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+    long timeoutInSeconds = 
Long.parseLong(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
     try {
@@ -287,7 +283,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   //TODO: change below to Helix UserConentStore
   @VisibleForTesting
   protected DistributeJobResult getResultFromUserContent() {
-    String planningId = getPlanningJobId(this.jobProperties);
+    String planningId = getPlanningJobId(this.jobPlanningProps);
     try {
       TaskState taskState = 
this.stateStores.getTaskStateStore().get(planningId, planningId, planningId);
       return new DistributeJobResult(Optional.of(taskState.getProperties()), 
Optional.empty());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index 83821d4..5bee4e0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -27,10 +27,13 @@ import org.apache.helix.task.TaskFactory;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
 
@@ -38,10 +41,14 @@ import org.apache.gobblin.util.PathUtils;
  * An implementation of Helix's {@link TaskFactory} for {@link 
GobblinHelixJobTask}s.
  */
 @Slf4j
-public class GobblinHelixJobFactory implements TaskFactory {
+class GobblinHelixJobFactory implements TaskFactory {
   protected StateStores stateStores;
 
   protected TaskRunnerSuiteBase.Builder builder;
+  @Getter
+  protected GobblinHelixJobLauncherMetrics launcherMetrics;
+  @Getter
+  protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics;
 
   private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) {
     Config sysConfig = builder.getConfig();
@@ -57,16 +64,29 @@ public class GobblinHelixJobFactory implements TaskFactory {
         appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME);
   }
 
-  public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
+  public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder, 
MetricContext metricContext) {
+
     this.builder = builder;
     // TODO: We can remove below initialization once Helix allow us to persist 
job resut in userContentStore
     initializeStateStore(this.builder);
+    // initialize job related metrics (planning jobs)
+    int metricsWindowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
+        ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+        ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+
+    this.launcherMetrics = new 
GobblinHelixJobLauncherMetrics("launcherInJobFactory",
+        metricContext,
+        metricsWindowSizeInMin);
+    this.jobTaskMetrics = new 
GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext,
+        metricsWindowSizeInMin);
   }
 
   @Override
   public Task createNewTask(TaskCallbackContext context) {
     return new GobblinHelixJobTask(context,
         this.stateStores,
-        this.builder);
+        this.builder,
+        this.launcherMetrics,
+        this.jobTaskMetrics);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 8523e21..8d6d7b2 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -35,7 +35,6 @@ import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +44,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -108,8 +106,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final HelixManager helixManager;
   private final TaskDriver helixTaskDriver;
   private final String helixWorkFlowName;
-  private final String jobResourceName;
-  @Getter
   private JobListener jobListener;
 
   private final FileSystem fs;
@@ -123,15 +119,17 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final TaskStateCollectorService taskStateCollectorService;
 
   private volatile boolean jobSubmitted = false;
-  private volatile boolean jobComplete = false;
   private final ConcurrentHashMap<String, Boolean> runningMap;
   private final StateStores stateStores;
   private final Config jobConfig;
   private final long workFlowExpiryTimeSeconds;
 
-  public GobblinHelixJobLauncher(Properties jobProps, final HelixManager 
helixManager, Path appWorkDir,
-      List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap)
-      throws Exception {
+  public GobblinHelixJobLauncher (Properties jobProps,
+                                  final HelixManager helixManager,
+                                  Path appWorkDir,
+                                  List<? extends Tag<?>> metadataTags,
+                                  ConcurrentHashMap<String, Boolean> 
runningMap) throws Exception {
+
     super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
     LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
 
@@ -144,8 +142,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         + Path.SEPARATOR + this.jobContext.getJobId());
 
     this.helixWorkFlowName = this.jobContext.getJobId();
-    this.jobResourceName = 
TaskUtil.getNamespacedJobName(this.helixWorkFlowName, 
this.jobContext.getJobId());
-
     this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
 
     this.stateSerDeRunnerThreads = 
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
@@ -170,8 +166,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
     this.fs = FileSystem.get(fsUri, new Configuration());
 
-    this.taskStateCollectorService = new TaskStateCollectorService(jobProps, 
this.jobContext.getJobState(),
-        this.eventBus, this.stateStores.getTaskStateStore(), 
outputTaskStateDir);
+    this.taskStateCollectorService = new TaskStateCollectorService(jobProps,
+        this.jobContext.getJobState(),
+        this.eventBus,
+        this.stateStores.getTaskStateStore(),
+        this.outputTaskStateDir);
 
     startCancellationExecutor();
   }
@@ -212,7 +211,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       waitForJobCompletion();
       jobRunTimer.stop();
       LOGGER.info(String.format("Job %s completed", 
this.jobContext.getJobId()));
-      this.jobComplete = true;
     } finally {
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
       this.taskStateCollectorService.stopAsync().awaitTerminated();
@@ -291,7 +289,9 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
 
     if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) {
-      
jobConfigBuilder.setInstanceGroupTag(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY));
+      String jobTag = 
this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY);
+      log.info("Job {} has tags associated : {}", this.jobContext.getJobId(), 
jobTag);
+      jobConfigBuilder.setInstanceGroupTag(jobTag);
     }
 
     if 
(Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING))
 {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java
new file mode 100644
index 0000000..b19d301
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.runtime.listeners.JobListener;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * The {@link GobblinHelixJobLauncherMetrics} will always be passed in because
+ * it will be be updated accordingly.
+ */
+class GobblinHelixJobLauncherListener extends AbstractJobListener {
+
+  private final GobblinHelixJobLauncherMetrics jobLauncherMetrics;
+  private static final String JOB_START_TIME = "jobStartTime";
+
+  GobblinHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics) {
+    this.jobLauncherMetrics = jobLauncherMetrics;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    jobContext.getJobState().setProp(JOB_START_TIME, 
Long.toString(System.nanoTime()));
+    jobLauncherMetrics.totalJobsLaunched.incrementAndGet();
+  }
+
+  /**
+   * From {@link 
org.apache.gobblin.runtime.AbstractJobLauncher#launchJob(JobListener)}, the 
final
+   * job state should only be FAILED or COMMITTED. This means the completed 
jobs metrics covers
+   * both failed jobs and committed jobs.
+   */
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    long startTime = jobContext.getJobState().getPropAsLong(JOB_START_TIME);
+    jobLauncherMetrics.totalJobsCompleted.incrementAndGet();
+    
Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForCompletedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobLauncherMetrics.totalJobsFailed.incrementAndGet();
+      
Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForFailedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+    } else {
+      jobLauncherMetrics.totalJobsCommitted.incrementAndGet();
+      
Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForCommittedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext)
+      throws Exception {
+    super.onJobCancellation(jobContext);
+    jobLauncherMetrics.totalJobsCancelled.incrementAndGet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java
new file mode 100644
index 0000000..b33edae
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+
+/**
+ * Metrics that relates to jobs launched by {@link GobblinHelixJobLauncher}.
+ */
+class GobblinHelixJobLauncherMetrics extends 
StandardMetricsBridge.StandardMetrics {
+    private final String metricsName;
+    final AtomicLong totalJobsLaunched;
+    final AtomicLong totalJobsCompleted;
+    final AtomicLong totalJobsCommitted;
+    final AtomicLong totalJobsFailed;
+    final AtomicLong totalJobsCancelled;
+
+    final ContextAwareTimer timeForCompletedJobs;
+    final ContextAwareTimer timeForFailedJobs;
+    final ContextAwareTimer timeForCommittedJobs;
+
+    public GobblinHelixJobLauncherMetrics(String metricsName, final 
MetricContext metricContext, int windowSizeInMin) {
+      this.metricsName = metricsName;
+
+      // All historical counters
+      this.totalJobsLaunched = new AtomicLong(0);
+      this.totalJobsCompleted = new AtomicLong(0);
+      this.totalJobsCommitted = new AtomicLong(0);
+      this.totalJobsFailed = new AtomicLong(0);
+      this.totalJobsCancelled = new AtomicLong(0);
+
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED,
 ()->this.totalJobsLaunched.get()));
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED,
 ()->this.totalJobsCompleted.get()));
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED,
 ()->this.totalJobsCommitted.get()));
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED,
 ()->this.totalJobsFailed.get()));
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED,
 ()->this.totalJobsCancelled.get()));
+      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
+          
()->(int)(GobblinHelixJobLauncherMetrics.this.totalJobsLaunched.get() - 
GobblinHelixJobLauncherMetrics.this.totalJobsCompleted.get())));
+
+      this.timeForCompletedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS,
 windowSizeInMin, TimeUnit.MINUTES);
+      this.timeForFailedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS,
 windowSizeInMin, TimeUnit.MINUTES);
+      this.timeForCommittedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS,
 windowSizeInMin, TimeUnit.MINUTES);
+
+      this.contextAwareMetrics.add(timeForCommittedJobs);
+      this.contextAwareMetrics.add(timeForCompletedJobs);
+      this.contextAwareMetrics.add(timeForFailedJobs);
+    }
+
+    @Override
+    public String getName() {
+      return this.metricsName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index e29fe61..a215ffa 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -19,22 +19,21 @@ package org.apache.gobblin.cluster;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
@@ -48,16 +47,11 @@ import 
org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.api.JobExecutionLauncher;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.listeners.AbstractJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
@@ -66,10 +60,19 @@ import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
- * An extension to {@link JobScheduler} that schedules and runs Gobblin jobs 
on Helix using
- * {@link GobblinHelixJobLauncher}s.
+ * An extension to {@link JobScheduler} that schedules and runs
+ * Gobblin jobs on Helix.
  *
- * @author Yinan Li
+ * <p> The actual job running logic is handled by
+ * {@link HelixRetriggeringJobCallable}. This callable will first
+ * determine if this job should be launched from the same node
+ * where the scheduler is running, or from a remote node.
+ *
+ * <p> If the job should be launched from the scheduler node,
+ * {@link GobblinHelixJobLauncher} is invoked. Else the
+ * {@link GobblinHelixDistributeJobExecutionLauncher} is invoked.
+ *
+ * <p> More details can be found at {@link HelixRetriggeringJobCallable}.
  */
 @Alpha
 public class GobblinHelixJobScheduler extends JobScheduler implements 
StandardMetricsBridge{
@@ -84,12 +87,19 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   private final ConcurrentHashMap<String, Boolean> jobRunningMap;
   private final MutableJobCatalog jobCatalog;
   private final MetricContext metricContext;
-  private final Metrics metrics;
+
+  final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
+  final GobblinHelixJobLauncherMetrics launcherMetrics;
+
   private boolean startServicesCompleted;
 
-  public GobblinHelixJobScheduler(Properties properties, HelixManager 
helixManager, EventBus eventBus,
-      Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService 
schedulerService,
-      MutableJobCatalog jobCatalog) throws Exception {
+  public GobblinHelixJobScheduler(Properties properties,
+                                  HelixManager helixManager,
+                                  EventBus eventBus,
+                                  Path appWorkDir, List<? extends Tag<?>> 
metadataTags,
+                                  SchedulerService schedulerService,
+                                  MutableJobCatalog jobCatalog) throws 
Exception {
+
     super(properties, schedulerService);
     this.properties = properties;
     this.helixManager = helixManager;
@@ -99,7 +109,19 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     this.metadataTags = metadataTags;
     this.jobCatalog = jobCatalog;
     this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(properties), this.getClass());
-    this.metrics = new Metrics(this.metricContext);
+
+    int metricsWindowSizeInMin = 
ConfigUtils.getInt(ConfigUtils.propertiesToConfig(this.properties),
+                                                    
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+                                                    
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+
+    this.launcherMetrics = new 
GobblinHelixJobLauncherMetrics("launcherInScheduler",
+                                                              
this.metricContext,
+                                                              
metricsWindowSizeInMin);
+
+    this.jobSchedulerMetrics = new 
GobblinHelixJobSchedulerMetrics(this.jobExecutor,
+                                                                   
this.metricContext,
+                                                                   
metricsWindowSizeInMin);
+
     this.startServicesCompleted = false;
   }
 
@@ -115,137 +137,8 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
   }
 
   @Override
-  public StandardMetrics getStandardMetrics() {
-    return metrics;
-  }
-
-  private class Metrics extends StandardMetrics {
-
-    private final AtomicLong totalJobsLaunched;
-    private final AtomicLong totalJobsCompleted;
-    private final AtomicLong totalJobsCommitted;
-    private final AtomicLong totalJobsFailed;
-    private final AtomicLong totalJobsCancelled;
-
-    private final ContextAwareTimer timeForCompletedJobs;
-    private final ContextAwareTimer timeForFailedJobs;
-    private final ContextAwareTimer timeForCommittedJobs;
-    private final ContextAwareTimer timeBeforeJobScheduling;
-    private final ContextAwareTimer timeBeforeJobLaunching;
-    private final ContextAwareTimer timeBetwenJobSchedulingAndLaunching;
-
-    private final ThreadPoolExecutor threadPoolExecutor;
-
-    public Metrics(final MetricContext metricContext) {
-      // Thread executor reference from job scheduler
-      this.threadPoolExecutor = 
(ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor;
-
-      // timer duration setup
-      int windowSize = 
ConfigUtils.getInt(ConfigUtils.propertiesToConfig(GobblinHelixJobScheduler.this.properties),
-          ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
-          ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
-
-      // All historical counters
-      this.totalJobsLaunched = new AtomicLong(0);
-      this.totalJobsCompleted = new AtomicLong(0);
-      this.totalJobsCommitted = new AtomicLong(0);
-      this.totalJobsFailed = new AtomicLong(0);
-      this.totalJobsCancelled = new AtomicLong(0);
-
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED,
 ()->this.totalJobsLaunched.get()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED,
 ()->this.totalJobsCompleted.get()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED,
 ()->this.totalJobsCommitted.get()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED,
 ()->this.totalJobsFailed.get()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED,
 ()->this.totalJobsCancelled.get()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
-          ()->(int)(Metrics.this.totalJobsLaunched.get() - 
Metrics.this.totalJobsCompleted.get())));
-
-      this.timeForCompletedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS,
 windowSize, TimeUnit.MINUTES);
-      this.timeForFailedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS,
 windowSize, TimeUnit.MINUTES);
-      this.timeForCommittedJobs = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS,
 windowSize, TimeUnit.MINUTES);
-      this.timeBeforeJobScheduling = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING,
 windowSize, TimeUnit.MINUTES);
-      this.timeBeforeJobLaunching = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING,
 windowSize, TimeUnit.MINUTES);
-      this.timeBetwenJobSchedulingAndLaunching = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING,
 windowSize, TimeUnit.MINUTES);
-
-      // executor metrics
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT,
 ()->this.threadPoolExecutor.getActiveCount()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE,
 ()->this.threadPoolExecutor.getMaximumPoolSize()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE,
 ()->this.threadPoolExecutor.getPoolSize()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE,
 ()->this.threadPoolExecutor.getCorePoolSize()));
-      
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE,
 ()->this.threadPoolExecutor.getQueue().size()));
-
-      this.contextAwareMetrics.add(timeForCommittedJobs);
-      this.contextAwareMetrics.add(timeForCompletedJobs);
-      this.contextAwareMetrics.add(timeForFailedJobs);
-      this.contextAwareMetrics.add(timeBeforeJobScheduling);
-      this.contextAwareMetrics.add(timeBeforeJobLaunching);
-      this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching);
-    }
-
-    private void updateTimeBeforeJobScheduling (Properties jobConfig) {
-      long jobCreationTime = 
Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
"0"));
-      Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), 
System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
-    }
-
-    private void updateTimeBeforeJobLaunching (Properties jobConfig) {
-      long jobCreationTime = 
Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
"0"));
-      Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), 
System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
-    }
-
-    private void updateTimeBetweenJobSchedulingAndJobLaunching (long 
scheduledTime, long launchingTime) {
-      
Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), 
launchingTime - scheduledTime, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public String getName() {
-      return GobblinHelixJobScheduler.class.getName();
-    }
-  }
-
-  private class MetricsTrackingListener extends AbstractJobListener {
-    private final Metrics metrics;
-    private static final String START_TIME = "startTime";
-    MetricsTrackingListener(Metrics metrics) {
-      this.metrics = metrics;
-    }
-
-    @Override
-    public void onJobPrepare(JobContext jobContext)
-        throws Exception {
-      super.onJobPrepare(jobContext);
-      jobContext.getJobState().setProp(START_TIME, 
Long.toString(System.nanoTime()));
-      if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.totalJobsLaunched.incrementAndGet();
-      }
-    }
-
-    @Override
-    public void onJobCompletion(JobContext jobContext)
-        throws Exception {
-      super.onJobCompletion(jobContext);
-      long startTime = jobContext.getJobState().getPropAsLong(START_TIME);
-      if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.totalJobsCompleted.incrementAndGet();
-        Instrumented.updateTimer(Optional.of(metrics.timeForCompletedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-        if (jobContext.getJobState().getState() == 
JobState.RunningState.FAILED) {
-            metrics.totalJobsFailed.incrementAndGet();
-            Instrumented.updateTimer(Optional.of(metrics.timeForFailedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-        } else {
-            metrics.totalJobsCommitted.incrementAndGet();
-            
Instrumented.updateTimer(Optional.of(metrics.timeForCommittedJobs), 
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-        }
-      }
-    }
-
-    @Override
-    public void onJobCancellation(JobContext jobContext)
-        throws Exception {
-      super.onJobCancellation(jobContext);
-      if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.totalJobsCancelled.incrementAndGet();
-      }
-    }
-
+  public Collection<StandardMetrics> getStandardMetricsCollection() {
+    return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics);
   }
 
   @Override
@@ -262,7 +155,12 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         LOGGER.info("{} service is not fully up, waiting here...", 
this.getClass().getName());
         Thread.sleep(1000);
       }
-      scheduleJob(jobProps, jobListener, Maps.newHashMap(), 
GobblinHelixJob.class);
+
+      scheduleJob(jobProps,
+                  jobListener,
+                  Maps.newHashMap(),
+                  GobblinHelixJob.class);
+
     } catch (Exception e) {
       throw new JobException("Failed to schedule job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
@@ -274,7 +172,12 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
-    new HelixRetriggeringJobCallable(this, this.properties, jobProps, 
jobListener, this.appWorkDir, this.helixManager).call();
+    new HelixRetriggeringJobCallable(this,
+        this.properties,
+        jobProps,
+        jobListener,
+        this.appWorkDir,
+        this.helixManager).call();
   }
 
   @Override
@@ -341,17 +244,19 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
   public void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {
     LOGGER.info("Received new job configuration of job " + 
newJobArrival.getJobName());
     try {
-      Properties jobConfig = new Properties();
-      jobConfig.putAll(newJobArrival.getJobConfig());
+      Properties jobProps = new Properties();
+      jobProps.putAll(newJobArrival.getJobConfig());
 
-      metrics.updateTimeBeforeJobScheduling(jobConfig);
+      this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
 
-      if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + newJobArrival.getJobName());
-        scheduleJob(jobConfig, new MetricsTrackingListener(metrics));
+        scheduleJob(jobProps,
+                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
       } else {
         LOGGER.info("No job schedule found, so running job " + 
newJobArrival.getJobName());
-        this.jobExecutor.execute(new 
NonScheduledJobRunner(newJobArrival.getJobName(), jobConfig, new 
MetricsTrackingListener(metrics)));
+        this.jobExecutor.execute(new 
NonScheduledJobRunner(newJobArrival.getJobName(), jobProps,
+                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + 
newJobArrival.getJobName(), je);
@@ -391,13 +296,16 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
   class NonScheduledJobRunner implements Runnable {
 
     private final String jobUri;
-    private final Properties jobConfig;
-    private final JobListener jobListener;
+    private final Properties jobProps;
+    private final GobblinHelixJobLauncherListener jobListener;
     private final Long creationTimeInMillis;
 
-    public NonScheduledJobRunner(String jobUri, Properties jobConfig, 
JobListener jobListener) {
+    public NonScheduledJobRunner(String jobUri,
+                                 Properties jobProps,
+                                 GobblinHelixJobLauncherListener jobListener) {
+
       this.jobUri = jobUri;
-      this.jobConfig = jobConfig;
+      this.jobProps = jobProps;
       this.jobListener = jobListener;
       this.creationTimeInMillis = System.currentTimeMillis();
     }
@@ -414,13 +322,15 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
     @Override
     public void run() {
-      boolean alwaysDelete = PropertiesUtils
-          .getPropAsBoolean(this.jobConfig, 
GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false");
+      boolean alwaysDelete = PropertiesUtils.getPropAsBoolean(this.jobProps,
+                                                              
GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE,
+                                                              "false");
       boolean isDeleted = false;
+
       try {
-        
((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
-        
((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
 System.currentTimeMillis());
-        GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener);
+        
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps);
+        
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
 System.currentTimeMillis());
+        GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
 
         // remove non-scheduled job catalog once done so it won't be 
re-executed
         if (GobblinHelixJobScheduler.this.jobCatalog != null) {
@@ -433,7 +343,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         }
       } catch (JobException je) {
         deleteJobSpec(alwaysDelete, isDeleted);
-        LOGGER.error("Failed to run job " + 
this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+        LOGGER.error("Failed to run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
       } catch (Exception e) {
         deleteJobSpec(alwaysDelete, isDeleted);
         throw e;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java
new file mode 100644
index 0000000..1045647
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+
+
+class GobblinHelixJobSchedulerMetrics extends 
StandardMetricsBridge.StandardMetrics {
+  public static final String SCHEDULE_CANCELLATION_START = 
"scheduleCancellationStart";
+  public static final String SCHEDULE_CANCELLATION_END = 
"scheduleCancellationStart";
+  public static final String TIMER_BEFORE_JOB_SCHEDULING = 
"timerBeforeJobScheduling";
+  public static final String TIMER_BEFORE_JOB_LAUNCHING = 
"timerBeforeJobLaunching";
+  public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = 
"timerBetwenJobSchedulingAndLaunching";
+
+  final AtomicLong numCancellationStart;
+  final AtomicLong numCancellationComplete;
+  final ContextAwareTimer timeBeforeJobScheduling;
+  final ContextAwareTimer timeBeforeJobLaunching;
+  final ContextAwareTimer timeBetwenJobSchedulingAndLaunching;
+
+  final ThreadPoolExecutor threadPoolExecutor;
+
+  public GobblinHelixJobSchedulerMetrics (final ExecutorService jobExecutor, 
final MetricContext metricContext, int windowSizeInMin) {
+    this.timeBeforeJobScheduling = 
metricContext.contextAwareTimer(TIMER_BEFORE_JOB_SCHEDULING,
+        windowSizeInMin, TimeUnit.MINUTES);
+    this.timeBeforeJobLaunching = 
metricContext.contextAwareTimer(TIMER_BEFORE_JOB_LAUNCHING,
+        windowSizeInMin, TimeUnit.MINUTES);
+    this.timeBetwenJobSchedulingAndLaunching = 
metricContext.contextAwareTimer(TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING,
+        windowSizeInMin, TimeUnit.MINUTES);
+    this.numCancellationStart = new AtomicLong(0);
+    this.numCancellationComplete = new AtomicLong(0);
+
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(SCHEDULE_CANCELLATION_START,
 ()->this.numCancellationStart.get()));
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(SCHEDULE_CANCELLATION_END,
 ()->this.numCancellationComplete.get()));
+    this.contextAwareMetrics.add(timeBeforeJobScheduling);
+    this.contextAwareMetrics.add(timeBeforeJobLaunching);
+    this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching);
+
+    this.threadPoolExecutor = (ThreadPoolExecutor) jobExecutor;
+
+    // executor metrics
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT,
 ()->this.threadPoolExecutor.getActiveCount()));
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE,
 ()->this.threadPoolExecutor.getMaximumPoolSize()));
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE,
 ()->this.threadPoolExecutor.getPoolSize()));
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE,
 ()->this.threadPoolExecutor.getCorePoolSize()));
+    
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE,
 ()->this.threadPoolExecutor.getQueue().size()));
+  }
+
+  void updateTimeBeforeJobScheduling (Properties jobProps) {
+    long jobCreationTime = 
Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
"0"));
+    Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling),
+        System.currentTimeMillis() - jobCreationTime,
+        TimeUnit.MILLISECONDS);
+  }
+
+  void updateTimeBeforeJobLaunching (Properties jobProps) {
+    long jobCreationTime = 
Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
"0"));
+    Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching),
+        System.currentTimeMillis() - jobCreationTime,
+        TimeUnit.MILLISECONDS);
+  }
+
+  void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long 
launchingTime) {
+    Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching),
+        launchingTime - scheduledTime,
+        TimeUnit.MILLISECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 4fb198c..2f5bcdf 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
@@ -40,6 +42,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.TaskState;
@@ -51,7 +57,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * An implementation of Helix's {@link org.apache.helix.task.Task} that runs 
original {@link GobblinHelixJobLauncher}.
  */
 @Slf4j
-public class GobblinHelixJobTask implements Task {
+class GobblinHelixJobTask implements Task {
 
   private final TaskConfig taskConfig;
   private final Config sysConfig;
@@ -61,15 +67,22 @@ public class GobblinHelixJobTask implements Task {
   private final HelixManager helixManager;
   private final Path appWorkDir;
   private final List<? extends Tag<?>> metadataTags;
-
   private GobblinHelixJobLauncher launcher;
-  public GobblinHelixJobTask(TaskCallbackContext context,
-      StateStores stateStores,
-      TaskRunnerSuiteBase.Builder builder) {
+  private GobblinHelixJobTaskMetrics jobTaskMetrics;
+  private GobblinHelixJobLauncherListener jobLauncherListener;
+
+  public GobblinHelixJobTask (TaskCallbackContext context,
+                              StateStores stateStores,
+                              TaskRunnerSuiteBase.Builder builder,
+                              GobblinHelixJobLauncherMetrics launcherMetrics,
+                              GobblinHelixJobTaskMetrics jobTaskMetrics) {
+    this.jobTaskMetrics = jobTaskMetrics;
     this.taskConfig = context.getTaskConfig();
     this.sysConfig = builder.getConfig();
     this.helixManager = builder.getHelixManager();
     this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig);
+    this.jobLauncherListener = new 
GobblinHelixJobLauncherListener(launcherMetrics);
+
     Map<String, String> configMap = this.taskConfig.getConfigMap();
     for (Map.Entry<String, String> entry: configMap.entrySet()) {
       if 
(entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX))
 {
@@ -79,7 +92,7 @@ public class GobblinHelixJobTask implements Task {
     }
 
     if 
(!jobPlusSysConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY))
 {
-      throw new RuntimeException("Job doesn't have plannning ID");
+      throw new RuntimeException("Job doesn't have planning ID");
     }
 
     this.planningJobId = 
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
@@ -91,6 +104,25 @@ public class GobblinHelixJobTask implements Task {
         .build());
   }
 
+   static class GobblinHelixJobTaskMetrics extends 
StandardMetricsBridge.StandardMetrics {
+    static final String TIME_BETWEEN_JOB_SUBMISSION_AND_EXECUTION = 
"timeBetweenJobSubmissionAndExecution";
+    final ContextAwareTimer timeBetweenJobSubmissionAndExecution;
+
+    public GobblinHelixJobTaskMetrics(MetricContext metricContext, int 
windowSizeInMin) {
+      timeBetweenJobSubmissionAndExecution = 
metricContext.contextAwareTimer(TIME_BETWEEN_JOB_SUBMISSION_AND_EXECUTION,
+          windowSizeInMin, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(timeBetweenJobSubmissionAndExecution);
+    }
+
+    public void updateTimeBetweenJobSubmissionAndExecution(Properties 
jobProps) {
+      long jobSubmitTime = 
Long.parseLong(jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 "0"));
+      if (jobSubmitTime != 0) {
+        
Instrumented.updateTimer(Optional.of(this.timeBetweenJobSubmissionAndExecution),
+            System.currentTimeMillis() - jobSubmitTime,
+            TimeUnit.MILLISECONDS);
+      }
+    }
+  }
 
   private GobblinHelixJobLauncher createJobLauncher()
       throws Exception {
@@ -101,14 +133,16 @@ public class GobblinHelixJobTask implements Task {
         new ConcurrentHashMap<>());
   }
 
+  /**
+   * Launch the actual {@link GobblinHelixJobLauncher}.
+   */
   @Override
   public TaskResult run() {
     log.info("Running planning job {}", this.planningJobId);
-    // Launch the job
+    
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
     try (Closer closer = Closer.create()) {
       this.launcher = createJobLauncher();
-      //TODO: we will provide additional listener
-      closer.register(launcher).launchJob(null);
+      closer.register(launcher).launchJob(this.jobLauncherListener);
       setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, 
"false"));
     } catch (Exception e) {
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for 
job " + planningJobId + ":" + ExceptionUtils
@@ -135,7 +169,7 @@ public class GobblinHelixJobTask implements Task {
     log.info("Cancelling planning job {}", this.planningJobId);
     if (launcher != null) {
       try {
-        launcher.cancelJob(launcher.getJobListener());
+        launcher.cancelJob(this.jobLauncherListener);
       } catch (JobException e) {
         throw new RuntimeException("Unable to cancel planning job " + 
this.planningJobId + ": ", e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index f659978..c59625e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import org.apache.helix.task.WorkflowConfig;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -488,7 +490,7 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
   }
 
   @Override
-  public StandardMetrics getStandardMetrics() {
-    return this.metrics;
+  public Collection<StandardMetrics> getStandardMetricsCollection() {
+    return ImmutableList.of(this.metrics);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 9d0e6cc..3f63e2d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -148,11 +149,15 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   private final Path appWorkPath;
 
   private final MetricContext metricContext;
-  private final StandardMetricsBridge.StandardMetrics metrics;
+  private final Collection<StandardMetricsBridge.StandardMetrics> 
metricsCollection;
+
+  public GobblinTaskRunner(String applicationName,
+      String helixInstanceName,
+      String applicationId,
+      String taskRunnerId,
+      Config config,
+      Optional<Path> appWorkDirOptional) throws Exception {
 
-  public GobblinTaskRunner(String applicationName, String helixInstanceName, 
String applicationId,
-      String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
-      throws Exception {
     this.helixInstanceName = helixInstanceName;
     this.taskRunnerId = taskRunnerId;
     this.applicationName = applicationName;
@@ -168,9 +173,13 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
     this.containerMetrics = buildContainerMetrics();
 
-    String builderStr = ConfigUtils.getString(this.config, 
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, 
TaskRunnerSuiteBase.Builder.class.getName());
+    String builderStr = ConfigUtils.getString(this.config,
+        GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
+        TaskRunnerSuiteBase.Builder.class.getName());
+
     TaskRunnerSuiteBase.Builder builder = 
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
-          new 
ClassAliasResolver(TaskRunnerSuiteBase.Builder.class).resolveClass(builderStr), 
this.config);
+          new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+              .resolveClass(builderStr), this.config);
 
     TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
         .setContainerMetrics(this.containerMetrics)
@@ -181,7 +190,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         .build();
 
     this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactoryMap());
-    this.metrics = suite.getTaskMetrics();
+    this.metricsCollection = suite.getMetricsCollection();
     this.metricContext = suite.getMetricContext();
     this.services.addAll(suite.getServices());
 
@@ -193,7 +202,12 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     }
 
     logger.debug("GobblinTaskRunner: applicationName {}, helixInstanceName {}, 
applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
-        applicationName, helixInstanceName, applicationId, taskRunnerId, 
config, appWorkDirOptional);
+        applicationName,
+        helixInstanceName,
+        applicationId,
+        taskRunnerId,
+        config,
+        appWorkDirOptional);
   }
 
   private Path initAppWorkDir(Config config, Optional<Path> 
appWorkDirOptional) {
@@ -231,8 +245,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
    * Start this {@link GobblinTaskRunner} instance.
    */
   public void start() {
-    logger.info(
-        String.format("Starting %s in container %s", this.helixInstanceName, 
this.taskRunnerId));
+    logger.info(String.format("Starting %s in container %s", 
this.helixInstanceName, this.taskRunnerId));
 
     // Add a shutdown hook so the task scheduler gets properly shutdown
     addShutdownHook();
@@ -329,6 +342,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   private void addInstanceTags() {
     if (this.helixManager.isConnected()) {
       List<String> tags = ConfigUtils.getStringList(this.config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
+      logger.info("Adding tags binding " + tags);
       tags.forEach(tag -> 
helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, 
this.helixInstanceName, tag));
     }
   }
@@ -379,8 +393,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   @Override
-  public StandardMetrics getStandardMetrics() {
-    return this.metrics;
+  public Collection<StandardMetrics> getStandardMetricsCollection() {
+    return this.metricsCollection;
   }
 
   @Nonnull

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
index 6435ff4..9fc8bc0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
@@ -25,7 +25,10 @@ import org.apache.gobblin.runtime.TaskExecutor;
 
 public class GobblinTaskRunnerMetrics {
 
-  static class InProcessTaskRunnerMetrics extends 
StandardMetricsBridge.StandardMetrics {
+  /**
+   * This metrics shows the task execution that correlates to a work unit.
+   */
+  static class TaskExecutionMetrics extends 
StandardMetricsBridge.StandardMetrics {
     private TaskExecutor taskExecutor;
     private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
     private static String HISTORICAL_QUEUED_TASK_COUNT = 
"historicalQueuedTaskCount";
@@ -37,7 +40,7 @@ public class GobblinTaskRunnerMetrics {
     private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
     private static String RUNNING_TASK_COUNT = "runningTaskCount";
 
-    public InProcessTaskRunnerMetrics (TaskExecutor executor, MetricContext 
context) {
+    public TaskExecutionMetrics (TaskExecutor executor, MetricContext context) 
{
       taskExecutor = executor;
       
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, 
()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
       
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
@@ -48,21 +51,20 @@ public class GobblinTaskRunnerMetrics {
       contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, 
()->this.taskExecutor.getFailedTaskCount().getCount()));
       
contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, 
()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
       contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, 
()->this.taskExecutor.getRunningTaskCount().getCount()));
-
       
this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, 
this.taskExecutor.getTaskCreateAndRunTimer());
     }
 
     @Override
     public String getName() {
-      return InProcessTaskRunnerMetrics.class.getName();
+      return TaskExecutionMetrics.class.getName();
     }
   }
 
-  static class JvmTaskRunnerMetrics extends 
StandardMetricsBridge.StandardMetrics {
+  static class JvmTaskMetrics extends StandardMetricsBridge.StandardMetrics {
     //TODO: add metrics to monitor the process execution status (will be 
revisited after process isolation work is done)
     @Override
     public String getName() {
-      return JvmTaskRunnerMetrics.class.getName();
+      return JvmTaskMetrics.class.getName();
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 7b9fd3c..6b400d3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -30,11 +30,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.api.ExecutionResult;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -44,39 +45,48 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  *  1) Re-triggering is enabled and
  *  2) Job stops early.
  *
- * Moreover based on the job properties, a job can be processed immediately 
(non-distributed) or forwarded to a remote
- * node (distributed) for handling. Details are illustrated as follows:
+ * Based on the job properties, a job can be processed immediately 
(non-distribution mode) or forwarded to a remote
+ * node (distribution mode). Details are as follows:
  *
- * <p>
- *   If {@link 
GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the 
job will be handled
- *   by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which 
simply submits the job to Helix for execution.
+ * <p> Non-Distribution Mode:
+ *    If {@link 
GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the 
job will be handled
+ *    by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which 
simply launches {@link GobblinHelixJobLauncher}
+ *    and submit the work units to Helix. Helix will dispatch the work units 
to different worker nodes. The worker node will
+ *    handle the work units by {@link GobblinHelixTask}.
  *
- *   See {@link GobblinHelixJobLauncher} for job launcher details.
+ *    See {@link GobblinHelixJobLauncher} for job launcher details.
+ *    See {@link GobblinHelixTask} for work unit handling details.
  * </p>
  *
- * <p>
+ * <p> Distribution Mode:
  *   If {@link 
GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the 
job will be handled
- *   by {@link 
HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}. It will first 
create a planning job with
- *   {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} pre-configured, so 
that Helix can forward this planning job to
- *   any nodes that has implemented the Helix task factory model matching the 
same name. See {@link TaskRunnerSuiteThreadModel}
- *   implementation of how task factory model is setup.
+ *   by {@link 
HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}, which simply 
launches
+ *   {@link GobblinHelixDistributeJobExecutionLauncher} and submit a planning 
job to Helix. Helix will dispatch this
+ *   planning job to a worker node. The worker node will handle this planning 
job by {@link GobblinHelixJobTask}.
  *
- *   Once the planning job reaches to the remote end, it will be handled by 
{@link GobblinHelixJobTask} which is
- *   created by {@link GobblinHelixJobTask}. The actual handling is similar to 
the non-distributed mode, where
- *   {@link GobblinHelixJobLauncher} is invoked.
+ *   The {@link GobblinHelixJobTask} will launch {@link 
GobblinHelixJobLauncher} and it will again submit the actual
+ *   work units to Helix. Helix will dispatch the work units to other worker 
nodes. Similar to Non-Distribution Node,
+ *   some worker nodes will handle those work units by {@link 
GobblinHelixTask}.
+ *
+ *    See {@link GobblinHelixDistributeJobExecutionLauncher} for planning job 
launcher details.
+ *    See {@link GobblinHelixJobTask} for planning job handling details.
+ *    See {@link GobblinHelixJobLauncher} for job launcher details.
+ *    See {@link GobblinHelixTask} for work unit handling details.
  * </p>
  */
 @Slf4j
 @Alpha
 class HelixRetriggeringJobCallable implements Callable {
-  private GobblinHelixJobScheduler jobScheduler;
-  private Properties sysProps;
-  private Properties jobProps;
-  private JobListener jobListener;
-  private JobLauncher currentJobLauncher = null;
+  private final GobblinHelixJobScheduler jobScheduler;
+  private final Properties sysProps;
+  private final Properties jobProps;
+  private final JobListener jobListener;
+  private final Path appWorkDir;
+  private final HelixManager helixManager;
+
+  private GobblinHelixJobLauncher currentJobLauncher = null;
   private JobExecutionMonitor currentJobMonitor = null;
-  private Path appWorkDir;
-  private HelixManager helixManager;
+  private boolean isDistributeJobEnabled = false;
 
   public HelixRetriggeringJobCallable(
       GobblinHelixJobScheduler jobScheduler,
@@ -91,6 +101,7 @@ class HelixRetriggeringJobCallable implements Callable {
     this.jobListener = jobListener;
     this.appWorkDir = appWorkDir;
     this.helixManager = helixManager;
+    this.isDistributeJobEnabled = isDistributeJobEnabled();
   }
 
   private boolean isRetriggeringEnabled() {
@@ -109,7 +120,7 @@ class HelixRetriggeringJobCallable implements Callable {
 
   @Override
   public Void call() throws JobException {
-    if (isDistributeJobEnabled()) {
+    if (this.isDistributeJobEnabled) {
       launchJobExecutionLauncherLoop();
     } else {
       launchJobLauncherLoop();
@@ -140,12 +151,25 @@ class HelixRetriggeringJobCallable implements Callable {
   private void launchJobExecutionLauncherLoop() throws JobException {
     try {
       while (true) {
-        String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
 GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
-        GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
-            new 
ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
+        String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
+            
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
+
+        GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils
+            
.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new
 ClassAliasResolver(
+                
GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
+
+        // Make a separate copy because we could update some of attributes in 
job properties (like adding planning id).
+        Properties jobPlanningProps = new Properties();
+        jobPlanningProps.putAll(this.jobProps);
 
-        builder.setSysProperties(this.sysProps);
-        builder.setJobProperties(this.jobProps);
+        // Inject planning id and start time
+        String planningId = 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+            + JobState.getJobNameFromProps(jobPlanningProps));
+        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
+        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 String.valueOf(System.currentTimeMillis()));
+
+        builder.setSysProps(this.sysProps);
+        builder.setJobPlanningProps(jobPlanningProps);
         builder.setManager(this.helixManager);
         builder.setAppWorkDir(this.appWorkDir);
 
@@ -172,11 +196,19 @@ class HelixRetriggeringJobCallable implements Callable {
     }
   }
 
-  public void cancel() throws JobException {
-    if (currentJobLauncher != null) {
-      currentJobLauncher.cancelJob(this.jobListener);
-    } else if (currentJobMonitor != null) {
-      currentJobMonitor.cancel(false);
+  void cancel() throws JobException {
+    
this.jobScheduler.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
+
+    if (isDistributeJobEnabled) {
+      if (currentJobMonitor != null) {
+        currentJobMonitor.cancel(false);
+      }
+    } else {
+      if (currentJobLauncher != null) {
+        currentJobLauncher.cancelJob(this.jobListener);
+      }
     }
+
+    
this.jobScheduler.jobSchedulerMetrics.numCancellationComplete.incrementAndGet();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index e65b968..182cd07 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -51,11 +52,10 @@ import org.apache.gobblin.util.ConfigUtils;
 @Alpha
 public abstract class TaskRunnerSuiteBase {
   protected TaskFactory taskFactory;
-  protected TaskFactory jobFactory;
+  protected GobblinHelixJobFactory jobFactory;
   protected MetricContext metricContext;
   protected String applicationId;
   protected String applicationName;
-  protected StandardMetricsBridge.StandardMetrics taskMetrics;
   protected List<Service> services = Lists.newArrayList();
 
   protected TaskRunnerSuiteBase(Builder builder) {
@@ -68,7 +68,7 @@ public abstract class TaskRunnerSuiteBase {
     return this.metricContext;
   }
 
-  protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics();
+  protected abstract Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection();
 
   protected abstract Map<String, TaskFactory> getTaskFactoryMap();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
index bf21a4a..72ed17b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
@@ -17,12 +17,14 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Service;
 
@@ -44,12 +46,11 @@ class TaskRunnerSuiteProcessModel extends 
TaskRunnerSuiteBase {
     taskFactory = new HelixTaskFactory(builder.getContainerMetrics(),
         GobblinTaskRunner.CLUSTER_CONF_PATH,
         builder.getConfig());
-    taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics();
   }
 
   @Override
-  protected StandardMetricsBridge.StandardMetrics getTaskMetrics() {
-    return this.taskMetrics;
+  protected Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection() {
+    return ImmutableList.of(new GobblinTaskRunnerMetrics.JvmTaskMetrics());
   }
 
   @Override

Reply via email to