This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be50a02  [GOBBLIN-679] Refactor GobblinHelixTask metrics
be50a02 is described below

commit be50a02896bd9e8bb79a41f5f415183ba4ebaaa7
Author: Kuai Yu <[email protected]>
AuthorDate: Mon Feb 11 15:07:25 2019 -0800

    [GOBBLIN-679] Refactor GobblinHelixTask metrics
    
    Closes #2553 from yukuai518/taskm2
---
 .../gobblin/cluster/GobblinHelixJobFactory.java    |  3 +-
 .../apache/gobblin/cluster/GobblinHelixTask.java   | 14 +++-
 .../gobblin/cluster/GobblinHelixTaskFactory.java   | 24 ++++--
 .../gobblin/cluster/GobblinHelixTaskMetrics.java   | 94 ++++++++++++++++++++++
 .../gobblin/cluster/GobblinTaskRunnerMetrics.java  | 71 ----------------
 .../gobblin/cluster/TaskRunnerSuiteBase.java       |  2 -
 .../cluster/TaskRunnerSuiteProcessModel.java       |  4 +-
 .../cluster/TaskRunnerSuiteThreadModel.java        | 29 ++++---
 .../gobblin/cluster/GobblinHelixTaskTest.java      |  4 +-
 9 files changed, 144 insertions(+), 101 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index a54a8e7..8c963e4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -73,7 +73,8 @@ class GobblinHelixJobFactory implements TaskFactory {
     this.launcherMetrics = new 
GobblinHelixJobLauncherMetrics("launcherInJobFactory",
         metricContext,
         metricsWindowSizeInMin);
-    this.jobTaskMetrics = new 
GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext,
+    this.jobTaskMetrics = new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(
+        metricContext,
         metricsWindowSizeInMin);
     this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobFactory",
         metricContext,
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index c93d9ac..8e608f0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -70,16 +70,18 @@ public class GobblinHelixTask implements Task {
   private String jobKey;
   private String taskId;
   private Path workUnitFilePath;
-
+  private GobblinHelixTaskMetrics taskMetrics;
   private SingleTask task;
 
   public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                           TaskCallbackContext taskCallbackContext,
                           TaskAttemptBuilder taskAttemptBuilder,
-                          StateStores stateStores) {
+                          StateStores stateStores,
+                          GobblinHelixTaskMetrics taskMetrics) {
     this.taskConfig = taskCallbackContext.getTaskConfig();
     this.applicationName = builder.getApplicationName();
     this.instanceName = builder.getInstanceName();
+    this.taskMetrics = taskMetrics;
     getInfoFromTaskConfig();
 
     Path jobStateFilePath = GobblinClusterUtils
@@ -107,20 +109,28 @@ public class GobblinHelixTask implements Task {
 
   @Override
   public TaskResult run() {
+    this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
+    long startTime = System.currentTimeMillis();
     log.info("Actual task {} started. [{} {}]", this.taskId, 
this.applicationName, this.instanceName);
     try (Closer closer = Closer.create()) {
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, 
this.jobName));
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, 
this.jobKey));
       this.task.run();
       log.info("Actual task {} completed.", this.taskId);
+      this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
       log.error("Actual task {} interrupted.", this.taskId);
+      this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
       return new TaskResult(TaskResult.Status.CANCELED, "");
     } catch (Throwable t) {
       log.error("Actual task {} failed due to {}", this.taskId, 
t.getMessage());
+      this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
       return new TaskResult(TaskResult.Status.FAILED, 
Throwables.getStackTraceAsString(t));
+    } finally {
+      this.taskMetrics.helixTaskTotalRunning.decrementAndGet();
+      this.taskMetrics.updateTimeForTaskExecution(startTime);
     }
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 73ca784..6bf3d5f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
@@ -29,13 +27,17 @@ import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Counter;
 import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -58,16 +60,27 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
    * A {@link Counter} to count the number of new {@link GobblinHelixTask}s 
that are created.
    */
   private final Optional<Counter> newTasksCounter;
+  @Getter
   private final TaskExecutor taskExecutor;
+  @Getter
+  private final GobblinHelixTaskMetrics taskMetrics;
   private final TaskStateTracker taskStateTracker;
   private final Path appWorkDir;
   private final StateStores stateStores;
   private final TaskAttemptBuilder taskAttemptBuilder;
 
   public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
-                                 TaskExecutor taskExecutor,
+                                 MetricContext metricContext,
                                  TaskStateTracker taskStateTracker,
                                  Config stateStoreConfig) {
+
+    // initialize task related metrics
+    int windowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
+        ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+        ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+    this.taskExecutor = new 
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
+    this.taskMetrics = new GobblinHelixTaskMetrics(taskExecutor, 
metricContext, windowSizeInMin);
+
     this.builder = builder;
     this.containerMetrics = builder.getContainerMetrics();
     this.helixManager = builder.getJobHelixManager();
@@ -76,7 +89,6 @@ public class GobblinHelixTaskFactory implements TaskFactory {
     } else {
       this.newTasksCounter = Optional.absent();
     }
-    this.taskExecutor = taskExecutor;
     this.taskStateTracker = taskStateTracker;
 
     this.appWorkDir = builder.getAppWorkPath();
@@ -101,6 +113,6 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
     if (this.newTasksCounter.isPresent()) {
       this.newTasksCounter.get().inc();
     }
-    return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores);
+    return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores, this.taskMetrics);
   }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
new file mode 100644
index 0000000..44df9f5
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+
+
+public class GobblinHelixTaskMetrics extends 
StandardMetricsBridge.StandardMetrics {
+
+    private TaskExecutor taskExecutor;
+    private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
+    private static String HISTORICAL_QUEUED_TASK_COUNT = 
"historicalQueuedTaskCount";
+    private static String QUEUED_TASK_COUNT = "queuedTaskCount";
+    private static String CURRENT_QUEUED_TASK_TOTAL_TIME = 
"currentQueuedTaskTotalTime";
+    private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME = 
"historicalQueuedTaskTotalTime";
+    private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
+    private static String FAILED_TASK_COUNT = "failedTaskCount";
+    private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
+    private static String RUNNING_TASK_COUNT = "runningTaskCount";
+    private static String TIMER_FOR_TASK_EXEC = "timeForTaskExec";
+
+    private static String HELIX_TASK_TOTAL_COMPLETED = 
"helixTaskTotalCompleted";
+    private static String HELIX_TASK_TOTAL_FAILED = "helixTaskTotalFailed";
+    private static String HELIX_TASK_TOTAL_CANCELLED = 
"helixTaskTotalCancelled";
+    private static String HELIX_TASK_TOTAL_RUNNING = "helixTaskTotalRunning";
+
+    private final ContextAwareTimer timeForTaskExecution;
+
+    AtomicLong helixTaskTotalCompleted;
+    AtomicLong helixTaskTotalCancelled;
+    AtomicLong helixTaskTotalFailed;
+    AtomicLong helixTaskTotalRunning;
+
+    public GobblinHelixTaskMetrics (TaskExecutor executor, MetricContext 
context, int windowSizeInMin) {
+      this.taskExecutor = executor;
+      this.helixTaskTotalCompleted = new AtomicLong(0);
+      this.helixTaskTotalFailed = new AtomicLong(0);
+      this.helixTaskTotalRunning = new AtomicLong(0);
+      this.helixTaskTotalCancelled = new AtomicLong(0);
+      this.timeForTaskExecution = 
context.contextAwareTimer(TIMER_FOR_TASK_EXEC, windowSizeInMin, 
TimeUnit.MINUTES);
+
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT,
 ()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT,
 ()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT, 
()->this.taskExecutor.getQueuedTaskCount().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, 
()->this.taskExecutor.getFailedTaskCount().getCount()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT,
 ()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, 
()->this.taskExecutor.getRunningTaskCount().getCount()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_COMPLETED,
 ()->this.helixTaskTotalCompleted.get()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_FAILED,
 ()->this.helixTaskTotalFailed.get()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_CANCELLED,
 ()->this.helixTaskTotalCancelled.get()));
+      
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_RUNNING,
 ()->this.helixTaskTotalRunning.get()));
+      this.contextAwareMetrics.add(this.timeForTaskExecution);
+
+      
this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, 
this.taskExecutor.getTaskCreateAndRunTimer());
+    }
+
+  public void updateTimeForTaskExecution(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForTaskExecution),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+    public String getName() {
+      return GobblinHelixTaskMetrics.class.getName();
+    }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
deleted file mode 100644
index 9fc8bc0..0000000
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.cluster;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.runtime.TaskExecutor;
-
-
-public class GobblinTaskRunnerMetrics {
-
-  /**
-   * This metrics shows the task execution that correlates to a work unit.
-   */
-  static class TaskExecutionMetrics extends 
StandardMetricsBridge.StandardMetrics {
-    private TaskExecutor taskExecutor;
-    private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
-    private static String HISTORICAL_QUEUED_TASK_COUNT = 
"historicalQueuedTaskCount";
-    private static String QUEUED_TASK_COUNT = "queuedTaskCount";
-    private static String CURRENT_QUEUED_TASK_TOTAL_TIME = 
"currentQueuedTaskTotalTime";
-    private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME = 
"historicalQueuedTaskTotalTime";
-    private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
-    private static String FAILED_TASK_COUNT = "failedTaskCount";
-    private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
-    private static String RUNNING_TASK_COUNT = "runningTaskCount";
-
-    public TaskExecutionMetrics (TaskExecutor executor, MetricContext context) 
{
-      taskExecutor = executor;
-      
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, 
()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
-      
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
-      
contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT,
 ()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
-      
contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME,
 ()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
-      contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT, 
()->this.taskExecutor.getQueuedTaskCount().longValue()));
-      
contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME, 
()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
-      contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, 
()->this.taskExecutor.getFailedTaskCount().getCount()));
-      
contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, 
()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
-      contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, 
()->this.taskExecutor.getRunningTaskCount().getCount()));
-      
this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, 
this.taskExecutor.getTaskCreateAndRunTimer());
-    }
-
-    @Override
-    public String getName() {
-      return TaskExecutionMetrics.class.getName();
-    }
-  }
-
-  static class JvmTaskMetrics extends StandardMetricsBridge.StandardMetrics {
-    //TODO: add metrics to monitor the process execution status (will be 
revisited after process isolation work is done)
-    @Override
-    public String getName() {
-      return JvmTaskMetrics.class.getName();
-    }
-
-  }
-}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 1ae8639..92c309a 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -51,8 +51,6 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 @Alpha
 public abstract class TaskRunnerSuiteBase {
-  protected TaskFactory taskFactory;
-  protected GobblinHelixJobFactory jobFactory;
   protected MetricContext metricContext;
   protected String applicationId;
   protected String applicationName;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
index 72ed17b..0833685 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.instrumented.StandardMetricsBridge;
  */
 @Slf4j
 class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase {
-
+  private final HelixTaskFactory taskFactory;
   TaskRunnerSuiteProcessModel(TaskRunnerSuiteBase.Builder builder) {
     super(builder);
     log.info("Running a task in a separate process is enabled.");
@@ -50,7 +50,7 @@ class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase 
{
 
   @Override
   protected Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection() {
-    return ImmutableList.of(new GobblinTaskRunnerMetrics.JvmTaskMetrics());
+    return ImmutableList.of();
   }
 
   @Override
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index e375736..9bec51d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -34,6 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.services.JMXReportingService;
@@ -44,22 +45,18 @@ import org.apache.gobblin.util.PathUtils;
  * A sub-type of {@link TaskRunnerSuiteBase} suite which runs tasks in a 
thread pool.
  */
 class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
-  private final TaskExecutor taskExecutor;
-  private final GobblinTaskRunnerMetrics.TaskExecutionMetrics 
taskExecutionMetrics;
+  protected final GobblinHelixTaskFactory taskFactory;
+  protected final GobblinHelixJobFactory jobFactory;
 
   TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
     super(builder);
-
-    // initialize task related metrics
-    this.taskExecutor = new 
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
-    this.taskExecutionMetrics = new 
GobblinTaskRunnerMetrics.TaskExecutionMetrics(taskExecutor, metricContext);
-    this.taskFactory = generateTaskFactory(taskExecutor, builder);
+    this.taskFactory = createTaskFactory(builder, this.metricContext);
     this.jobFactory = new GobblinHelixJobFactory(builder, this.metricContext);
   }
 
   @Override
   protected Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection() {
-    return ImmutableList.of(this.taskExecutionMetrics,
+    return ImmutableList.of(this.taskFactory.getTaskMetrics(),
                             this.jobFactory.getJobTaskMetrics(),
                             this.jobFactory.getLauncherMetrics(),
                             this.jobFactory.getHelixMetrics());
@@ -78,7 +75,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
     return this.services;
   }
 
-  private GobblinHelixTaskFactory generateTaskFactory(TaskExecutor 
taskExecutor, Builder builder) {
+  private GobblinHelixTaskFactory createTaskFactory(Builder builder, 
MetricContext metricContext) {
     Properties properties = 
ConfigUtils.configToProperties(builder.getConfig());
     URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
@@ -87,14 +84,16 @@ class TaskRunnerSuiteThreadModel extends 
TaskRunnerSuiteBase {
 
     TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
 
-    services.add(taskExecutor);
+    GobblinHelixTaskFactory taskFactory = new GobblinHelixTaskFactory(builder,
+        metricContext,
+        taskStateTracker,
+        stateStoreJobConfig);
+
+    services.add(taskFactory.getTaskExecutor());
     services.add(taskStateTracker);
     services.add(new JMXReportingService(
-        ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
+        ImmutableMap.of("task.executor", 
taskFactory.getTaskExecutor().getTaskExecutorQueueMetricSet())));
 
-    return new GobblinHelixTaskFactory(builder,
-                                       taskExecutor,
-                                       taskStateTracker,
-                                       stateStoreJobConfig);
+    return taskFactory;
   }
 }
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index e91c984..9386bf6 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -144,7 +144,7 @@ public class GobblinHelixTaskTest {
 
 
     TaskRunnerSuiteBase.Builder builder = new 
TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
-    builder.setInstanceName("TestInstance")
+    TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
         .setApplicationName("TestApplication")
         .setAppWorkPath(appWorkDir)
         .setContainerMetrics(Optional.absent())
@@ -155,7 +155,7 @@ public class GobblinHelixTaskTest {
 
     GobblinHelixTaskFactory gobblinHelixTaskFactory =
         new GobblinHelixTaskFactory(builder,
-                                    this.taskExecutor,
+                                    sb.metricContext,
                                     this.taskStateTracker,
                                     ConfigFactory.empty());
 

Reply via email to