This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be50a02 [GOBBLIN-679] Refactor GobblinHelixTask metrics
be50a02 is described below
commit be50a02896bd9e8bb79a41f5f415183ba4ebaaa7
Author: Kuai Yu <[email protected]>
AuthorDate: Mon Feb 11 15:07:25 2019 -0800
[GOBBLIN-679] Refactor GobblinHelixTask metrics
Closes #2553 from yukuai518/taskm2
---
.../gobblin/cluster/GobblinHelixJobFactory.java | 3 +-
.../apache/gobblin/cluster/GobblinHelixTask.java | 14 +++-
.../gobblin/cluster/GobblinHelixTaskFactory.java | 24 ++++--
.../gobblin/cluster/GobblinHelixTaskMetrics.java | 94 ++++++++++++++++++++++
.../gobblin/cluster/GobblinTaskRunnerMetrics.java | 71 ----------------
.../gobblin/cluster/TaskRunnerSuiteBase.java | 2 -
.../cluster/TaskRunnerSuiteProcessModel.java | 4 +-
.../cluster/TaskRunnerSuiteThreadModel.java | 29 ++++---
.../gobblin/cluster/GobblinHelixTaskTest.java | 4 +-
9 files changed, 144 insertions(+), 101 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index a54a8e7..8c963e4 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -73,7 +73,8 @@ class GobblinHelixJobFactory implements TaskFactory {
this.launcherMetrics = new
GobblinHelixJobLauncherMetrics("launcherInJobFactory",
metricContext,
metricsWindowSizeInMin);
- this.jobTaskMetrics = new
GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext,
+ this.jobTaskMetrics = new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(
+ metricContext,
metricsWindowSizeInMin);
this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobFactory",
metricContext,
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index c93d9ac..8e608f0 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -70,16 +70,18 @@ public class GobblinHelixTask implements Task {
private String jobKey;
private String taskId;
private Path workUnitFilePath;
-
+ private GobblinHelixTaskMetrics taskMetrics;
private SingleTask task;
public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
TaskAttemptBuilder taskAttemptBuilder,
- StateStores stateStores) {
+ StateStores stateStores,
+ GobblinHelixTaskMetrics taskMetrics) {
this.taskConfig = taskCallbackContext.getTaskConfig();
this.applicationName = builder.getApplicationName();
this.instanceName = builder.getInstanceName();
+ this.taskMetrics = taskMetrics;
getInfoFromTaskConfig();
Path jobStateFilePath = GobblinClusterUtils
@@ -107,20 +109,28 @@ public class GobblinHelixTask implements Task {
@Override
public TaskResult run() {
+ this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
+ long startTime = System.currentTimeMillis();
log.info("Actual task {} started. [{} {}]", this.taskId,
this.applicationName, this.instanceName);
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY,
this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY,
this.jobKey));
this.task.run();
log.info("Actual task {} completed.", this.taskId);
+ this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
return new TaskResult(TaskResult.Status.COMPLETED, "");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Actual task {} interrupted.", this.taskId);
+ this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
return new TaskResult(TaskResult.Status.CANCELED, "");
} catch (Throwable t) {
log.error("Actual task {} failed due to {}", this.taskId,
t.getMessage());
+ this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
return new TaskResult(TaskResult.Status.FAILED,
Throwables.getStackTraceAsString(t));
+ } finally {
+ this.taskMetrics.helixTaskTotalRunning.decrementAndGet();
+ this.taskMetrics.updateTimeForTaskExecution(startTime);
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 73ca784..6bf3d5f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.cluster;
-import java.io.IOException;
-
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
@@ -29,13 +27,17 @@ import org.slf4j.LoggerFactory;
import com.codahale.metrics.Counter;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.typesafe.config.Config;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -58,16 +60,27 @@ public class GobblinHelixTaskFactory implements TaskFactory
{
* A {@link Counter} to count the number of new {@link GobblinHelixTask}s
that are created.
*/
private final Optional<Counter> newTasksCounter;
+ @Getter
private final TaskExecutor taskExecutor;
+ @Getter
+ private final GobblinHelixTaskMetrics taskMetrics;
private final TaskStateTracker taskStateTracker;
private final Path appWorkDir;
private final StateStores stateStores;
private final TaskAttemptBuilder taskAttemptBuilder;
public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
- TaskExecutor taskExecutor,
+ MetricContext metricContext,
TaskStateTracker taskStateTracker,
Config stateStoreConfig) {
+
+ // initialize task related metrics
+ int windowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
+ ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+ this.taskExecutor = new
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
+ this.taskMetrics = new GobblinHelixTaskMetrics(taskExecutor,
metricContext, windowSizeInMin);
+
this.builder = builder;
this.containerMetrics = builder.getContainerMetrics();
this.helixManager = builder.getJobHelixManager();
@@ -76,7 +89,6 @@ public class GobblinHelixTaskFactory implements TaskFactory {
} else {
this.newTasksCounter = Optional.absent();
}
- this.taskExecutor = taskExecutor;
this.taskStateTracker = taskStateTracker;
this.appWorkDir = builder.getAppWorkPath();
@@ -101,6 +113,6 @@ public class GobblinHelixTaskFactory implements TaskFactory
{
if (this.newTasksCounter.isPresent()) {
this.newTasksCounter.get().inc();
}
- return new GobblinHelixTask(builder, context, this.taskAttemptBuilder,
this.stateStores);
+ return new GobblinHelixTask(builder, context, this.taskAttemptBuilder,
this.stateStores, this.taskMetrics);
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
new file mode 100644
index 0000000..44df9f5
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+
+
+public class GobblinHelixTaskMetrics extends
StandardMetricsBridge.StandardMetrics {
+
+ private TaskExecutor taskExecutor;
+ private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
+ private static String HISTORICAL_QUEUED_TASK_COUNT =
"historicalQueuedTaskCount";
+ private static String QUEUED_TASK_COUNT = "queuedTaskCount";
+ private static String CURRENT_QUEUED_TASK_TOTAL_TIME =
"currentQueuedTaskTotalTime";
+ private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME =
"historicalQueuedTaskTotalTime";
+ private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
+ private static String FAILED_TASK_COUNT = "failedTaskCount";
+ private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
+ private static String RUNNING_TASK_COUNT = "runningTaskCount";
+ private static String TIMER_FOR_TASK_EXEC = "timeForTaskExec";
+
+ private static String HELIX_TASK_TOTAL_COMPLETED =
"helixTaskTotalCompleted";
+ private static String HELIX_TASK_TOTAL_FAILED = "helixTaskTotalFailed";
+ private static String HELIX_TASK_TOTAL_CANCELLED =
"helixTaskTotalCancelled";
+ private static String HELIX_TASK_TOTAL_RUNNING = "helixTaskTotalRunning";
+
+ private final ContextAwareTimer timeForTaskExecution;
+
+ AtomicLong helixTaskTotalCompleted;
+ AtomicLong helixTaskTotalCancelled;
+ AtomicLong helixTaskTotalFailed;
+ AtomicLong helixTaskTotalRunning;
+
+ public GobblinHelixTaskMetrics (TaskExecutor executor, MetricContext
context, int windowSizeInMin) {
+ this.taskExecutor = executor;
+ this.helixTaskTotalCompleted = new AtomicLong(0);
+ this.helixTaskTotalFailed = new AtomicLong(0);
+ this.helixTaskTotalRunning = new AtomicLong(0);
+ this.helixTaskTotalCancelled = new AtomicLong(0);
+ this.timeForTaskExecution =
context.contextAwareTimer(TIMER_FOR_TASK_EXEC, windowSizeInMin,
TimeUnit.MINUTES);
+
+
this.contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT,
()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT,
()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT,
()->this.taskExecutor.getQueuedTaskCount().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT,
()->this.taskExecutor.getFailedTaskCount().getCount()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT,
()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT,
()->this.taskExecutor.getRunningTaskCount().getCount()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_COMPLETED,
()->this.helixTaskTotalCompleted.get()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_FAILED,
()->this.helixTaskTotalFailed.get()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_CANCELLED,
()->this.helixTaskTotalCancelled.get()));
+
this.contextAwareMetrics.add(context.newContextAwareGauge(HELIX_TASK_TOTAL_RUNNING,
()->this.helixTaskTotalRunning.get()));
+ this.contextAwareMetrics.add(this.timeForTaskExecution);
+
+
this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL,
this.taskExecutor.getTaskCreateAndRunTimer());
+ }
+
+ public void updateTimeForTaskExecution(long startTime) {
+ Instrumented.updateTimer(
+ com.google.common.base.Optional.of(this.timeForTaskExecution),
+ System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public String getName() {
+ return GobblinHelixTaskMetrics.class.getName();
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
deleted file mode 100644
index 9fc8bc0..0000000
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.cluster;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.runtime.TaskExecutor;
-
-
-public class GobblinTaskRunnerMetrics {
-
- /**
- * This metrics shows the task execution that correlates to a work unit.
- */
- static class TaskExecutionMetrics extends
StandardMetricsBridge.StandardMetrics {
- private TaskExecutor taskExecutor;
- private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
- private static String HISTORICAL_QUEUED_TASK_COUNT =
"historicalQueuedTaskCount";
- private static String QUEUED_TASK_COUNT = "queuedTaskCount";
- private static String CURRENT_QUEUED_TASK_TOTAL_TIME =
"currentQueuedTaskTotalTime";
- private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME =
"historicalQueuedTaskTotalTime";
- private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
- private static String FAILED_TASK_COUNT = "failedTaskCount";
- private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
- private static String RUNNING_TASK_COUNT = "runningTaskCount";
-
- public TaskExecutionMetrics (TaskExecutor executor, MetricContext context)
{
- taskExecutor = executor;
-
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT,
()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
-
contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
-
contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT,
()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
-
contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
- contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT,
()->this.taskExecutor.getQueuedTaskCount().longValue()));
-
contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME,
()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
- contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT,
()->this.taskExecutor.getFailedTaskCount().getCount()));
-
contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT,
()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
- contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT,
()->this.taskExecutor.getRunningTaskCount().getCount()));
-
this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL,
this.taskExecutor.getTaskCreateAndRunTimer());
- }
-
- @Override
- public String getName() {
- return TaskExecutionMetrics.class.getName();
- }
- }
-
- static class JvmTaskMetrics extends StandardMetricsBridge.StandardMetrics {
- //TODO: add metrics to monitor the process execution status (will be
revisited after process isolation work is done)
- @Override
- public String getName() {
- return JvmTaskMetrics.class.getName();
- }
-
- }
-}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 1ae8639..92c309a 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -51,8 +51,6 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
@Alpha
public abstract class TaskRunnerSuiteBase {
- protected TaskFactory taskFactory;
- protected GobblinHelixJobFactory jobFactory;
protected MetricContext metricContext;
protected String applicationId;
protected String applicationName;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
index 72ed17b..0833685 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.instrumented.StandardMetricsBridge;
*/
@Slf4j
class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase {
-
+ private final HelixTaskFactory taskFactory;
TaskRunnerSuiteProcessModel(TaskRunnerSuiteBase.Builder builder) {
super(builder);
log.info("Running a task in a separate process is enabled.");
@@ -50,7 +50,7 @@ class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase
{
@Override
protected Collection<StandardMetricsBridge.StandardMetrics>
getMetricsCollection() {
- return ImmutableList.of(new GobblinTaskRunnerMetrics.JvmTaskMetrics());
+ return ImmutableList.of();
}
@Override
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index e375736..9bec51d 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -34,6 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.services.JMXReportingService;
@@ -44,22 +45,18 @@ import org.apache.gobblin.util.PathUtils;
* A sub-type of {@link TaskRunnerSuiteBase} suite which runs tasks in a
thread pool.
*/
class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
- private final TaskExecutor taskExecutor;
- private final GobblinTaskRunnerMetrics.TaskExecutionMetrics
taskExecutionMetrics;
+ protected final GobblinHelixTaskFactory taskFactory;
+ protected final GobblinHelixJobFactory jobFactory;
TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
super(builder);
-
- // initialize task related metrics
- this.taskExecutor = new
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
- this.taskExecutionMetrics = new
GobblinTaskRunnerMetrics.TaskExecutionMetrics(taskExecutor, metricContext);
- this.taskFactory = generateTaskFactory(taskExecutor, builder);
+ this.taskFactory = createTaskFactory(builder, this.metricContext);
this.jobFactory = new GobblinHelixJobFactory(builder, this.metricContext);
}
@Override
protected Collection<StandardMetricsBridge.StandardMetrics>
getMetricsCollection() {
- return ImmutableList.of(this.taskExecutionMetrics,
+ return ImmutableList.of(this.taskFactory.getTaskMetrics(),
this.jobFactory.getJobTaskMetrics(),
this.jobFactory.getLauncherMetrics(),
this.jobFactory.getHelixMetrics());
@@ -78,7 +75,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
return this.services;
}
- private GobblinHelixTaskFactory generateTaskFactory(TaskExecutor
taskExecutor, Builder builder) {
+ private GobblinHelixTaskFactory createTaskFactory(Builder builder,
MetricContext metricContext) {
Properties properties =
ConfigUtils.configToProperties(builder.getConfig());
URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
@@ -87,14 +84,16 @@ class TaskRunnerSuiteThreadModel extends
TaskRunnerSuiteBase {
TaskStateTracker taskStateTracker = new
GobblinHelixTaskStateTracker(properties);
- services.add(taskExecutor);
+ GobblinHelixTaskFactory taskFactory = new GobblinHelixTaskFactory(builder,
+ metricContext,
+ taskStateTracker,
+ stateStoreJobConfig);
+
+ services.add(taskFactory.getTaskExecutor());
services.add(taskStateTracker);
services.add(new JMXReportingService(
- ImmutableMap.of("task.executor",
taskExecutor.getTaskExecutorQueueMetricSet())));
+ ImmutableMap.of("task.executor",
taskFactory.getTaskExecutor().getTaskExecutorQueueMetricSet())));
- return new GobblinHelixTaskFactory(builder,
- taskExecutor,
- taskStateTracker,
- stateStoreJobConfig);
+ return taskFactory;
}
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index e91c984..9386bf6 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -144,7 +144,7 @@ public class GobblinHelixTaskTest {
TaskRunnerSuiteBase.Builder builder = new
TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
- builder.setInstanceName("TestInstance")
+ TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
.setApplicationName("TestApplication")
.setAppWorkPath(appWorkDir)
.setContainerMetrics(Optional.absent())
@@ -155,7 +155,7 @@ public class GobblinHelixTaskTest {
GobblinHelixTaskFactory gobblinHelixTaskFactory =
new GobblinHelixTaskFactory(builder,
- this.taskExecutor,
+ sb.metricContext,
this.taskStateTracker,
ConfigFactory.empty());