Repository: incubator-gobblin Updated Branches: refs/heads/master dc96e3e78 -> 79878f992
[GOBBLIN-652] Add helix metrics Closes #2521 from kyuamazon/helixmetrics Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/79878f99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/79878f99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/79878f99 Branch: refs/heads/master Commit: 79878f992db27e60865bd09b6295737d31e2fe8e Parents: dc96e3e Author: Kuai Yu <[email protected]> Authored: Tue Dec 11 15:23:04 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 11 15:23:04 2018 -0800 ---------------------------------------------------------------------- ...blinHelixDistributeJobExecutionLauncher.java | 13 +++- .../gobblin/cluster/GobblinHelixJobFactory.java | 8 ++- .../cluster/GobblinHelixJobLauncher.java | 18 +++++- .../cluster/GobblinHelixJobScheduler.java | 19 +++++- .../gobblin/cluster/GobblinHelixJobTask.java | 8 ++- .../gobblin/cluster/GobblinHelixMetrics.java | 64 ++++++++++++++++++++ .../GobblinHelixPlanningJobLauncherMetrics.java | 10 --- .../cluster/HelixRetriggeringJobCallable.java | 4 ++ .../cluster/TaskRunnerSuiteThreadModel.java | 5 +- .../cluster/GobblinHelixJobLauncherTest.java | 15 +++-- .../TaskRunnerSuiteForJobFactoryTest.java | 9 ++- 11 files changed, 143 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 1d592c4..b4405c3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -63,7 +63,7 @@ import org.apache.gobblin.util.PropertiesUtils; /** * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs on the same * instance (manager), this {@link JobExecutionLauncher} will distribute the original job - * to one of the worker (participant) node. The original job will be launched there. + * to one of the task driver instance. The original task driver logic will be launched there. * * <p> * For job submission, the Helix workflow name will be the original job name with prefix @@ -88,6 +88,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher protected Properties jobPlanningProps; protected HelixJobsMapping jobsMapping; protected GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; + protected GobblinHelixMetrics helixMetrics; protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps."; @@ -125,6 +126,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); + this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics; + this.helixMetrics = builder.helixMetrics; } @Override @@ -156,6 +159,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher Optional<HelixManager> taskDriverHelixManager; Path appWorkDir; GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; + GobblinHelixMetrics helixMetrics; public GobblinHelixDistributeJobExecutionLauncher build() throws Exception { return new GobblinHelixDistributeJobExecutionLauncher(this); } @@ -249,10 +253,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher String planningId = getPlanningJobId(this.jobPlanningProps); JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps); try { + long submitStartTime = System.currentTimeMillis(); + GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.submitMeter.mark(); submitJobToHelix(planningId, planningId, builder); - long startTime = System.currentTimeMillis(); + GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixSubmit(submitStartTime); + long waitStartTime = System.currentTimeMillis(); DistributeJobResult rst = waitForJobCompletion(planningId, planningId); - GobblinHelixDistributeJobExecutionLauncher.this.planningJobLauncherMetrics.updateTimeForHelixWait(startTime); + GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime); return rst; } catch (Exception e) { log.error(planningId + " is not able to submit."); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java index f631abf..a54a8e7 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java @@ -48,6 +48,8 @@ class GobblinHelixJobFactory implements TaskFactory { protected GobblinHelixJobLauncherMetrics launcherMetrics; @Getter protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics; + @Getter + protected GobblinHelixMetrics helixMetrics; private void initJobMapping(TaskRunnerSuiteBase.Builder builder) { Config sysConfig = builder.getConfig(); @@ -73,6 +75,9 @@ class GobblinHelixJobFactory implements TaskFactory { metricsWindowSizeInMin); this.jobTaskMetrics = new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, metricsWindowSizeInMin); + this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobFactory", + metricContext, + metricsWindowSizeInMin); } @Override @@ -81,6 +86,7 @@ class GobblinHelixJobFactory implements TaskFactory { this.jobsMapping, this.builder, this.launcherMetrics, - this.jobTaskMetrics); + this.jobTaskMetrics, + this.helixMetrics); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 5798da0..3f72781 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -117,7 +117,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final int stateSerDeRunnerThreads; private final TaskStateCollectorService taskStateCollectorService; - + private final Optional<GobblinHelixMetrics> helixMetrics; private volatile boolean jobSubmitted = false; private final ConcurrentHashMap<String, Boolean> runningMap; private final StateStores stateStores; @@ -128,11 +128,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { final HelixManager helixManager, Path appWorkDir, List<? extends Tag<?>> metadataTags, - ConcurrentHashMap<String, Boolean> runningMap) throws Exception { + ConcurrentHashMap<String, Boolean> runningMap, + Optional<GobblinHelixMetrics> helixMetrics) throws Exception { super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags)); LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", jobProps, appWorkDir); - this.helixManager = helixManager; this.helixTaskDriver = new TaskDriver(this.helixManager); this.runningMap = runningMap; @@ -172,6 +172,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.stateStores.getTaskStateStore(), this.outputTaskStateDir); + this.helixMetrics = helixMetrics; startCancellationExecutor(); } @@ -202,7 +203,14 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { synchronized (this.cancellationRequest) { if (!this.cancellationRequested) { + long submitStart = System.currentTimeMillis(); + if (helixMetrics.isPresent()) { + helixMetrics.get().submitMeter.mark(); + } submitJobToHelix(createJob(workUnits)); + if (helixMetrics.isPresent()) { + this.helixMetrics.get().updateTimeForHelixSubmit(submitStart); + } jobSubmissionTimer.stop(); LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId())); this.jobSubmitted = true; @@ -212,7 +220,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN); + long waitStart = System.currentTimeMillis(); waitForJobCompletion(); + if (helixMetrics.isPresent()) { + this.helixMetrics.get().updateTimeForHelixWait(waitStart); + } jobRunTimer.stop(); LOGGER.info(String.format("Job %s completed", this.jobContext.getJobId())); } finally { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 134d382..9cf757c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -87,6 +87,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final MutableJobCatalog jobCatalog; private final MetricContext metricContext; + final GobblinHelixMetrics helixMetrics; final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics; final GobblinHelixJobLauncherMetrics launcherMetrics; final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; @@ -128,12 +129,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.metricContext, metricsWindowSizeInMin); + this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler", + this.metricContext, + metricsWindowSizeInMin); + this.startServicesCompleted = false; } @Override public Collection<StandardMetrics> getStandardMetricsCollection() { - return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics, this.planningJobLauncherMetrics); + return ImmutableList.of(this.launcherMetrics, + this.jobSchedulerMetrics, + this.planningJobLauncherMetrics, + this.helixMetrics); } @Override @@ -172,6 +180,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe jobProps, jobListener, this.planningJobLauncherMetrics, + this.helixMetrics, this.appWorkDir, this.jobHelixManager, this.taskDriverHelixManager).call(); @@ -184,7 +193,12 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe combinedProps.putAll(properties); combinedProps.putAll(jobProps); - return new GobblinHelixJobLauncher(combinedProps, this.jobHelixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); + return new GobblinHelixJobLauncher(combinedProps, + this.jobHelixManager, + this.appWorkDir, + this.metadataTags, + this.jobRunningMap, + Optional.of(this.helixMetrics)); } public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) { @@ -193,6 +207,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe jobProps, jobListener, this.planningJobLauncherMetrics, + this.helixMetrics, this.appWorkDir, this.jobHelixManager, this.taskDriverHelixManager); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index 9447b94..ff61ea6 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -68,16 +68,19 @@ class GobblinHelixJobTask implements Task { private final List<? extends Tag<?>> metadataTags; private GobblinHelixJobLauncher launcher; private GobblinHelixJobTaskMetrics jobTaskMetrics; + private GobblinHelixMetrics helixMetrics; private GobblinHelixJobLauncherListener jobLauncherListener; public GobblinHelixJobTask (TaskCallbackContext context, HelixJobsMapping jobsMapping, TaskRunnerSuiteBase.Builder builder, GobblinHelixJobLauncherMetrics launcherMetrics, - GobblinHelixJobTaskMetrics jobTaskMetrics) { + GobblinHelixJobTaskMetrics jobTaskMetrics, + GobblinHelixMetrics helixMetrics) { this.applicationName = builder.getApplicationName(); this.instanceName = builder.getInstanceName(); this.jobTaskMetrics = jobTaskMetrics; + this.helixMetrics = helixMetrics; this.taskConfig = context.getTaskConfig(); this.sysConfig = builder.getConfig(); this.jobHelixManager = builder.getJobHelixManager(); @@ -131,7 +134,8 @@ class GobblinHelixJobTask implements Task { this.jobHelixManager, this.appWorkDir, this.metadataTags, - new ConcurrentHashMap<>()); + new ConcurrentHashMap<>(), + Optional.of(this.helixMetrics)); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java new file mode 100644 index 0000000..81f49d4 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMetrics.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.MetricContext; + + +public class GobblinHelixMetrics extends StandardMetricsBridge.StandardMetrics { + public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait"; + public static final String TIMER_FOR_HELIX_SUBMIT = "timeForHelixSubmit"; + public static final String METER_FOR_HELIX_SUBMIT = "meterForHelixSubmit"; + final String metricsName; + final ContextAwareTimer timeForHelixWait; + final ContextAwareTimer timeForHelixSubmit; + final ContextAwareMeter submitMeter; + + public GobblinHelixMetrics(String metricsName, final MetricContext metricContext, int windowSizeInMin) { + this.metricsName = metricsName; + this.timeForHelixWait = metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, TimeUnit.MINUTES); + this.timeForHelixSubmit = metricContext.contextAwareTimer(TIMER_FOR_HELIX_SUBMIT, windowSizeInMin, TimeUnit.MINUTES); + this.submitMeter = metricContext.contextAwareMeter(METER_FOR_HELIX_SUBMIT); + this.contextAwareMetrics.add(timeForHelixWait); + this.contextAwareMetrics.add(timeForHelixSubmit); + this.contextAwareMetrics.add(submitMeter); + } + + public void updateTimeForHelixSubmit(long startTime) { + Instrumented.updateTimer( + com.google.common.base.Optional.of(this.timeForHelixSubmit), + System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + public void updateTimeForHelixWait(long startTime) { + Instrumented.updateTimer( + com.google.common.base.Optional.of(this.timeForHelixWait), + System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + @Override + public String getName() { + return this.metricsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java index a2e0991..593e72d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java @@ -30,11 +30,9 @@ public class GobblinHelixPlanningJobLauncherMetrics extends StandardMetricsBridg public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = "timeForCompletedPlanningJobs"; public static final String TIMER_FOR_FAILED_PLANNING_JOBS = "timeForFailedPlanningJobs"; - public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait"; final ContextAwareTimer timeForCompletedPlanningJobs; final ContextAwareTimer timeForFailedPlanningJobs; - final ContextAwareTimer timeForHelixWait; public GobblinHelixPlanningJobLauncherMetrics(String metricsName, final MetricContext metricContext, @@ -44,11 +42,9 @@ public class GobblinHelixPlanningJobLauncherMetrics extends StandardMetricsBridg this.timeForCompletedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); this.timeForFailedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); - this.timeForHelixWait = metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, TimeUnit.MINUTES); this.contextAwareMetrics.add(timeForCompletedPlanningJobs); this.contextAwareMetrics.add(timeForFailedPlanningJobs); - this.contextAwareMetrics.add(timeForHelixWait); } public void updateTimeForCompletedPlanningJobs(long startTime) { @@ -63,12 +59,6 @@ public class GobblinHelixPlanningJobLauncherMetrics extends StandardMetricsBridg System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - public void updateTimeForHelixWait(long startTime) { - Instrumented.updateTimer( - com.google.common.base.Optional.of(this.timeForHelixWait), - System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); - } - @Override public String getName() { return this.metricsName; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index 4cda2ed..f563198 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -87,6 +87,7 @@ class HelixRetriggeringJobCallable implements Callable { private final Properties jobProps; private final JobListener jobListener; private final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; + private final GobblinHelixMetrics helixMetrics; private final Path appWorkDir; private final HelixManager jobHelixManager; private final Optional<HelixManager> taskDriverHelixManager; @@ -101,6 +102,7 @@ class HelixRetriggeringJobCallable implements Callable { Properties jobProps, JobListener jobListener, GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics, + GobblinHelixMetrics helixMetrics, Path appWorkDir, HelixManager jobHelixManager, Optional<HelixManager> taskDriverHelixManager) { @@ -109,6 +111,7 @@ class HelixRetriggeringJobCallable implements Callable { this.jobProps = jobProps; this.jobListener = jobListener; this.planningJobLauncherMetrics = planningJobLauncherMetrics; + this.helixMetrics = helixMetrics; this.appWorkDir = appWorkDir; this.jobHelixManager = jobHelixManager; this.taskDriverHelixManager = taskDriverHelixManager; @@ -221,6 +224,7 @@ class HelixRetriggeringJobCallable implements Callable { builder.setTaskDriverHelixManager(this.taskDriverHelixManager); builder.setAppWorkDir(this.appWorkDir); builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics); + builder.setHelixMetrics(this.helixMetrics); try (Closer closer = Closer.create()) { log.info("Planning job {} started.", planningId); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java index 92d3427..e375736 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -59,7 +59,10 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { @Override protected Collection<StandardMetricsBridge.StandardMetrics> getMetricsCollection() { - return ImmutableList.of(this.taskExecutionMetrics, this.jobFactory.getJobTaskMetrics(), this.jobFactory.getLauncherMetrics()); + return ImmutableList.of(this.taskExecutionMetrics, + this.jobFactory.getJobTaskMetrics(), + this.jobFactory.getLauncherMetrics(), + this.jobFactory.getHelixMetrics()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 1e1db0e..1f98807 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -205,7 +205,8 @@ public class GobblinHelixJobLauncherTest { // Normal job launcher final Properties properties = generateJobProperties(this.baseConfig, "1", "_1504201348470"); final GobblinHelixJobLauncher gobblinHelixJobLauncher = this.closer.register( - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap, + java.util.Optional.empty())); gobblinHelixJobLauncher.launchJob(null); @@ -254,12 +255,14 @@ public class GobblinHelixJobLauncherTest { // Job launcher(1) to test parallel job running final Properties properties1 = generateJobProperties(this.baseConfig, "2", "_1504201348471"); final GobblinHelixJobLauncher gobblinHelixJobLauncher1 = this.closer.register( - new GobblinHelixJobLauncher(properties1, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + new GobblinHelixJobLauncher(properties1, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap, + java.util.Optional.empty())); // Job launcher(2) to test parallel job running final Properties properties2 = generateJobProperties(this.baseConfig, "2", "_1504201348472"); final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = this.closer.register( - new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap, + java.util.Optional.empty())); CountDownLatch stg1 = new CountDownLatch(1); CountDownLatch stg2 = new CountDownLatch(1); @@ -288,11 +291,13 @@ public class GobblinHelixJobLauncherTest { final Properties properties = generateJobProperties(this.baseConfig, "3", "_1504201348473"); final GobblinHelixJobLauncher gobblinHelixJobLauncher = - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap); + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap, + java.util.Optional.empty()); final Properties properties2 = generateJobProperties(this.baseConfig, "33", "_1504201348474"); final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = - new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap); + new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap, + java.util.Optional.empty()); gobblinHelixJobLauncher.launchJob(null); gobblinHelixJobLauncher2.launchJob(null); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/79878f99/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java index 2e98cfb..dc88a4e 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -63,7 +63,8 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel jobsMapping, builder, new GobblinHelixJobLauncherMetrics("launcherInJobFactory", metricContext, 5), - new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 5)); + new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 5), + new GobblinHelixMetrics("helixMetricsInJobFactory", metricContext, 5)); } } @@ -72,12 +73,14 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel HelixJobsMapping jobsMapping, TaskRunnerSuiteBase.Builder builder, GobblinHelixJobLauncherMetrics launcherMetrics, - GobblinHelixJobTaskMetrics jobTaskMetrics) { + GobblinHelixJobTaskMetrics jobTaskMetrics, + GobblinHelixMetrics helixMetrics) { super(context, jobsMapping, builder, launcherMetrics, - jobTaskMetrics); + jobTaskMetrics, + helixMetrics); } }
