Repository: incubator-gobblin
Updated Branches:
  refs/heads/master dc96e3e78 -> 79878f992


[GOBBLIN-652] Add helix metrics

Closes #2521 from kyuamazon/helixmetrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/79878f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/79878f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/79878f99

Branch: refs/heads/master
Commit: 79878f992db27e60865bd09b6295737d31e2fe8e
Parents: dc96e3e
Author: Kuai Yu <[email protected]>
Authored: Tue Dec 11 15:23:04 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Dec 11 15:23:04 2018 -0800

----------------------------------------------------------------------
 ...blinHelixDistributeJobExecutionLauncher.java | 13 +++-
 .../gobblin/cluster/GobblinHelixJobFactory.java |  8 ++-
 .../cluster/GobblinHelixJobLauncher.java        | 18 +++++-
 .../cluster/GobblinHelixJobScheduler.java       | 19 +++++-
 .../gobblin/cluster/GobblinHelixJobTask.java    |  8 ++-
 .../gobblin/cluster/GobblinHelixMetrics.java    | 64 ++++++++++++++++++++
 .../GobblinHelixPlanningJobLauncherMetrics.java | 10 ---
 .../cluster/HelixRetriggeringJobCallable.java   |  4 ++
 .../cluster/TaskRunnerSuiteThreadModel.java     |  5 +-
 .../cluster/GobblinHelixJobLauncherTest.java    | 15 +++--
 .../TaskRunnerSuiteForJobFactoryTest.java       |  9 ++-
 11 files changed, 143 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 1d592c4..b4405c3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -63,7 +63,7 @@ import org.apache.gobblin.util.PropertiesUtils;
 /**
  * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs 
on the same
  * instance (manager), this {@link JobExecutionLauncher} will distribute the 
original job
- * to one of the worker (participant) node. The original job will be launched 
there.
+ * to one of the task driver instance. The original task driver logic will be 
launched there.
  *
  * <p>
  *   For job submission, the Helix workflow name will be the original job name 
with prefix
@@ -88,6 +88,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   protected Properties jobPlanningProps;
   protected HelixJobsMapping jobsMapping;
   protected GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+  protected GobblinHelixMetrics helixMetrics;
 
   protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
 
@@ -125,6 +126,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
     this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
+    this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
+    this.helixMetrics = builder.helixMetrics;
   }
 
   @Override
@@ -156,6 +159,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
     Optional<HelixManager> taskDriverHelixManager;
     Path appWorkDir;
     GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+    GobblinHelixMetrics helixMetrics;
     public GobblinHelixDistributeJobExecutionLauncher build() throws Exception 
{
       return new GobblinHelixDistributeJobExecutionLauncher(this);
     }
@@ -249,10 +253,13 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
       String planningId = getPlanningJobId(this.jobPlanningProps);
       JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps);
       try {
+        long submitStartTime = System.currentTimeMillis();
+        
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.submitMeter.mark();
         submitJobToHelix(planningId, planningId, builder);
-        long startTime = System.currentTimeMillis();
+        
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixSubmit(submitStartTime);
+        long waitStartTime = System.currentTimeMillis();
         DistributeJobResult rst = waitForJobCompletion(planningId, planningId);
-        
GobblinHelixDistributeJobExecutionLauncher.this.planningJobLauncherMetrics.updateTimeForHelixWait(startTime);
+        
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime);
         return rst;
       } catch (Exception e) {
         log.error(planningId + " is not able to submit.");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index f631abf..a54a8e7 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -48,6 +48,8 @@ class GobblinHelixJobFactory implements TaskFactory {
   protected GobblinHelixJobLauncherMetrics launcherMetrics;
   @Getter
   protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics;
+  @Getter
+  protected GobblinHelixMetrics helixMetrics;
 
   private void initJobMapping(TaskRunnerSuiteBase.Builder builder) {
     Config sysConfig = builder.getConfig();
@@ -73,6 +75,9 @@ class GobblinHelixJobFactory implements TaskFactory {
         metricsWindowSizeInMin);
     this.jobTaskMetrics = new 
GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext,
         metricsWindowSizeInMin);
+    this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobFactory",
+        metricContext,
+        metricsWindowSizeInMin);
   }
 
   @Override
@@ -81,6 +86,7 @@ class GobblinHelixJobFactory implements TaskFactory {
         this.jobsMapping,
         this.builder,
         this.launcherMetrics,
-        this.jobTaskMetrics);
+        this.jobTaskMetrics,
+        this.helixMetrics);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 5798da0..3f72781 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -117,7 +117,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final int stateSerDeRunnerThreads;
 
   private final TaskStateCollectorService taskStateCollectorService;
-
+  private final Optional<GobblinHelixMetrics> helixMetrics;
   private volatile boolean jobSubmitted = false;
   private final ConcurrentHashMap<String, Boolean> runningMap;
   private final StateStores stateStores;
@@ -128,11 +128,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
                                   final HelixManager helixManager,
                                   Path appWorkDir,
                                   List<? extends Tag<?>> metadataTags,
-                                  ConcurrentHashMap<String, Boolean> 
runningMap) throws Exception {
+                                  ConcurrentHashMap<String, Boolean> 
runningMap,
+                                  Optional<GobblinHelixMetrics> helixMetrics) 
throws Exception {
 
     super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
     LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
-
     this.helixManager = helixManager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
     this.runningMap = runningMap;
@@ -172,6 +172,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         this.stateStores.getTaskStateStore(),
         this.outputTaskStateDir);
 
+    this.helixMetrics = helixMetrics;
     startCancellationExecutor();
   }
 
@@ -202,7 +203,14 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
       synchronized (this.cancellationRequest) {
         if (!this.cancellationRequested) {
+          long submitStart = System.currentTimeMillis();
+          if (helixMetrics.isPresent()) {
+            helixMetrics.get().submitMeter.mark();
+          }
           submitJobToHelix(createJob(workUnits));
+          if (helixMetrics.isPresent()) {
+            this.helixMetrics.get().updateTimeForHelixSubmit(submitStart);
+          }
           jobSubmissionTimer.stop();
           LOGGER.info(String.format("Submitted job %s to Helix", 
this.jobContext.getJobId()));
           this.jobSubmitted = true;
@@ -212,7 +220,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       }
 
       TimingEvent jobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
+      long waitStart = System.currentTimeMillis();
       waitForJobCompletion();
+      if (helixMetrics.isPresent()) {
+        this.helixMetrics.get().updateTimeForHelixWait(waitStart);
+      }
       jobRunTimer.stop();
       LOGGER.info(String.format("Job %s completed", 
this.jobContext.getJobId()));
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 134d382..9cf757c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -87,6 +87,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   private final MutableJobCatalog jobCatalog;
   private final MetricContext metricContext;
 
+  final GobblinHelixMetrics helixMetrics;
   final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
   final GobblinHelixJobLauncherMetrics launcherMetrics;
   final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
@@ -128,12 +129,19 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
                                                                           
this.metricContext,
                                                                           
metricsWindowSizeInMin);
 
+    this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler",
+                                                  this.metricContext,
+                                                  metricsWindowSizeInMin);
+
     this.startServicesCompleted = false;
   }
 
   @Override
   public Collection<StandardMetrics> getStandardMetricsCollection() {
-    return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics, 
this.planningJobLauncherMetrics);
+    return ImmutableList.of(this.launcherMetrics,
+                            this.jobSchedulerMetrics,
+                            this.planningJobLauncherMetrics,
+                            this.helixMetrics);
   }
 
   @Override
@@ -172,6 +180,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         jobProps,
         jobListener,
         this.planningJobLauncherMetrics,
+        this.helixMetrics,
         this.appWorkDir,
         this.jobHelixManager,
         this.taskDriverHelixManager).call();
@@ -184,7 +193,12 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     combinedProps.putAll(properties);
     combinedProps.putAll(jobProps);
 
-    return new GobblinHelixJobLauncher(combinedProps, this.jobHelixManager, 
this.appWorkDir, this.metadataTags, this.jobRunningMap);
+    return new GobblinHelixJobLauncher(combinedProps,
+        this.jobHelixManager,
+        this.appWorkDir,
+        this.metadataTags,
+        this.jobRunningMap,
+        Optional.of(this.helixMetrics));
   }
 
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener 
jobListener) {
@@ -193,6 +207,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         jobProps,
         jobListener,
         this.planningJobLauncherMetrics,
+        this.helixMetrics,
         this.appWorkDir,
         this.jobHelixManager,
         this.taskDriverHelixManager);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 9447b94..ff61ea6 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -68,16 +68,19 @@ class GobblinHelixJobTask implements Task {
   private final List<? extends Tag<?>> metadataTags;
   private GobblinHelixJobLauncher launcher;
   private GobblinHelixJobTaskMetrics jobTaskMetrics;
+  private GobblinHelixMetrics helixMetrics;
   private GobblinHelixJobLauncherListener jobLauncherListener;
 
   public GobblinHelixJobTask (TaskCallbackContext context,
                               HelixJobsMapping jobsMapping,
                               TaskRunnerSuiteBase.Builder builder,
                               GobblinHelixJobLauncherMetrics launcherMetrics,
-                              GobblinHelixJobTaskMetrics jobTaskMetrics) {
+                              GobblinHelixJobTaskMetrics jobTaskMetrics,
+                              GobblinHelixMetrics helixMetrics) {
     this.applicationName = builder.getApplicationName();
     this.instanceName = builder.getInstanceName();
     this.jobTaskMetrics = jobTaskMetrics;
+    this.helixMetrics = helixMetrics;
     this.taskConfig = context.getTaskConfig();
     this.sysConfig = builder.getConfig();
     this.jobHelixManager = builder.getJobHelixManager();
@@ -131,7 +134,8 @@ class GobblinHelixJobTask implements Task {
         this.jobHelixManager,
         this.appWorkDir,
         this.metadataTags,
-        new ConcurrentHashMap<>());
+        new ConcurrentHashMap<>(),
+        Optional.of(this.helixMetrics));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java
new file mode 100644
index 0000000..81f49d4
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+
+
+public class GobblinHelixMetrics extends StandardMetricsBridge.StandardMetrics 
{
+  public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait";
+  public static final String TIMER_FOR_HELIX_SUBMIT = "timeForHelixSubmit";
+  public static final String METER_FOR_HELIX_SUBMIT = "meterForHelixSubmit";
+  final String metricsName;
+  final ContextAwareTimer timeForHelixWait;
+  final ContextAwareTimer timeForHelixSubmit;
+  final ContextAwareMeter submitMeter;
+
+  public GobblinHelixMetrics(String metricsName, final MetricContext 
metricContext, int windowSizeInMin) {
+    this.metricsName = metricsName;
+    this.timeForHelixWait = 
metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, 
TimeUnit.MINUTES);
+    this.timeForHelixSubmit = 
metricContext.contextAwareTimer(TIMER_FOR_HELIX_SUBMIT, windowSizeInMin, 
TimeUnit.MINUTES);
+    this.submitMeter = metricContext.contextAwareMeter(METER_FOR_HELIX_SUBMIT);
+    this.contextAwareMetrics.add(timeForHelixWait);
+    this.contextAwareMetrics.add(timeForHelixSubmit);
+    this.contextAwareMetrics.add(submitMeter);
+  }
+
+  public void updateTimeForHelixSubmit(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForHelixSubmit),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  public void updateTimeForHelixWait(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForHelixWait),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public String getName() {
+    return this.metricsName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
index a2e0991..593e72d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
@@ -30,11 +30,9 @@ public class GobblinHelixPlanningJobLauncherMetrics extends 
StandardMetricsBridg
 
   public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = 
"timeForCompletedPlanningJobs";
   public static final String TIMER_FOR_FAILED_PLANNING_JOBS = 
"timeForFailedPlanningJobs";
-  public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait";
 
   final ContextAwareTimer timeForCompletedPlanningJobs;
   final ContextAwareTimer timeForFailedPlanningJobs;
-  final ContextAwareTimer timeForHelixWait;
 
   public GobblinHelixPlanningJobLauncherMetrics(String metricsName,
       final MetricContext metricContext,
@@ -44,11 +42,9 @@ public class GobblinHelixPlanningJobLauncherMetrics extends 
StandardMetricsBridg
 
     this.timeForCompletedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
     this.timeForFailedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
-    this.timeForHelixWait = 
metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, 
TimeUnit.MINUTES);
 
     this.contextAwareMetrics.add(timeForCompletedPlanningJobs);
     this.contextAwareMetrics.add(timeForFailedPlanningJobs);
-    this.contextAwareMetrics.add(timeForHelixWait);
   }
 
   public void updateTimeForCompletedPlanningJobs(long startTime) {
@@ -63,12 +59,6 @@ public class GobblinHelixPlanningJobLauncherMetrics extends 
StandardMetricsBridg
         System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
   }
 
-  public void updateTimeForHelixWait(long startTime) {
-    Instrumented.updateTimer(
-        com.google.common.base.Optional.of(this.timeForHelixWait),
-        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
-  }
-
   @Override
   public String getName() {
     return this.metricsName;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 4cda2ed..f563198 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -87,6 +87,7 @@ class HelixRetriggeringJobCallable implements Callable {
   private final Properties jobProps;
   private final JobListener jobListener;
   private final GobblinHelixPlanningJobLauncherMetrics 
planningJobLauncherMetrics;
+  private final GobblinHelixMetrics helixMetrics;
   private final Path appWorkDir;
   private final HelixManager jobHelixManager;
   private final Optional<HelixManager> taskDriverHelixManager;
@@ -101,6 +102,7 @@ class HelixRetriggeringJobCallable implements Callable {
       Properties jobProps,
       JobListener jobListener,
       GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics,
+      GobblinHelixMetrics helixMetrics,
       Path appWorkDir,
       HelixManager jobHelixManager,
       Optional<HelixManager> taskDriverHelixManager) {
@@ -109,6 +111,7 @@ class HelixRetriggeringJobCallable implements Callable {
     this.jobProps = jobProps;
     this.jobListener = jobListener;
     this.planningJobLauncherMetrics = planningJobLauncherMetrics;
+    this.helixMetrics = helixMetrics;
     this.appWorkDir = appWorkDir;
     this.jobHelixManager = jobHelixManager;
     this.taskDriverHelixManager = taskDriverHelixManager;
@@ -221,6 +224,7 @@ class HelixRetriggeringJobCallable implements Callable {
       builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
       builder.setAppWorkDir(this.appWorkDir);
       builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
+      builder.setHelixMetrics(this.helixMetrics);
 
       try (Closer closer = Closer.create()) {
         log.info("Planning job {} started.", planningId);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index 92d3427..e375736 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -59,7 +59,10 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase 
{
 
   @Override
   protected Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection() {
-    return ImmutableList.of(this.taskExecutionMetrics, 
this.jobFactory.getJobTaskMetrics(), this.jobFactory.getLauncherMetrics());
+    return ImmutableList.of(this.taskExecutionMetrics,
+                            this.jobFactory.getJobTaskMetrics(),
+                            this.jobFactory.getLauncherMetrics(),
+                            this.jobFactory.getHelixMetrics());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 1e1db0e..1f98807 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -205,7 +205,8 @@ public class GobblinHelixJobLauncherTest {
     // Normal job launcher
     final Properties properties = generateJobProperties(this.baseConfig, "1", 
"_1504201348470");
     final GobblinHelixJobLauncher gobblinHelixJobLauncher = 
this.closer.register(
-        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
 
     gobblinHelixJobLauncher.launchJob(null);
 
@@ -254,12 +255,14 @@ public class GobblinHelixJobLauncherTest {
     // Job launcher(1) to test parallel job running
     final Properties properties1 = generateJobProperties(this.baseConfig, "2", 
"_1504201348471");
     final GobblinHelixJobLauncher gobblinHelixJobLauncher1 = 
this.closer.register(
-        new GobblinHelixJobLauncher(properties1, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+        new GobblinHelixJobLauncher(properties1, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
 
     // Job launcher(2) to test parallel job running
     final Properties properties2 = generateJobProperties(this.baseConfig, "2", 
"_1504201348472");
     final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = 
this.closer.register(
-        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
 
     CountDownLatch stg1 = new CountDownLatch(1);
     CountDownLatch stg2 = new CountDownLatch(1);
@@ -288,11 +291,13 @@ public class GobblinHelixJobLauncherTest {
 
     final Properties properties = generateJobProperties(this.baseConfig, "3", 
"_1504201348473");
     final GobblinHelixJobLauncher gobblinHelixJobLauncher =
-        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap);
+        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty());
 
     final Properties properties2 = generateJobProperties(this.baseConfig, 
"33", "_1504201348474");
     final GobblinHelixJobLauncher gobblinHelixJobLauncher2 =
-        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap);
+        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty());
 
     gobblinHelixJobLauncher.launchJob(null);
     gobblinHelixJobLauncher2.launchJob(null);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 2e98cfb..dc88a4e 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -63,7 +63,8 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
           jobsMapping,
           builder,
           new GobblinHelixJobLauncherMetrics("launcherInJobFactory", 
metricContext, 5),
-          new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 
5));
+          new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 5),
+          new GobblinHelixMetrics("helixMetricsInJobFactory", metricContext, 
5));
     }
   }
 
@@ -72,12 +73,14 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
                             HelixJobsMapping jobsMapping,
                             TaskRunnerSuiteBase.Builder builder,
                             GobblinHelixJobLauncherMetrics launcherMetrics,
-                            GobblinHelixJobTaskMetrics jobTaskMetrics) {
+                            GobblinHelixJobTaskMetrics jobTaskMetrics,
+                            GobblinHelixMetrics helixMetrics) {
       super(context,
             jobsMapping,
             builder,
             launcherMetrics,
-            jobTaskMetrics);
+            jobTaskMetrics,
+            helixMetrics);
     }
   }
 

Reply via email to