Repository: incubator-gobblin Updated Branches: refs/heads/master 5c03b11b5 -> f7e3ad062
[GOBBLIN-320] Add metrics for GobblinHelixJobScheduler Closes #2172 from yukuai518/sensor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f7e3ad06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f7e3ad06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f7e3ad06 Branch: refs/heads/master Commit: f7e3ad0628c145b6312f9d9b80af1c72eccf96fe Parents: 5c03b11 Author: Kuai Yu <[email protected]> Authored: Mon Nov 27 17:15:17 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 27 17:15:17 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinClusterManager.java | 12 +- .../cluster/GobblinHelixJobScheduler.java | 194 ++++++++++++++++++- .../instrumented/StandardMetricsBridge.java | 43 ++++ .../gobblin/metrics/ContextAwareHistogram.java | 2 +- .../apache/gobblin/runtime/api/JobCatalog.java | 48 ++++- .../runtime/api/JobExecutionLauncher.java | 2 + 6 files changed, 286 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 9b26aca..cdb5b29 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -86,6 +86,8 @@ import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.logs.Log4jConfigurationHelper; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import lombok.Getter; + /** * The central cluster manager for Gobblin Clusters. @@ -140,8 +142,10 @@ public class GobblinClusterManager implements ApplicationLauncher { private final boolean isStandaloneMode; + @Getter private MutableJobCatalog jobCatalog; - + @Getter + private GobblinHelixJobScheduler jobScheduler; private final String clusterName; private final Config config; @@ -193,9 +197,9 @@ public class GobblinClusterManager implements ApplicationLauncher { SchedulerService schedulerService = new SchedulerService(properties); this.applicationLauncher.addService(schedulerService); - this.applicationLauncher.addService( - buildGobblinHelixJobScheduler(config, this.appWorkDir, getMetadataTags(clusterName, applicationId), - schedulerService)); + this.jobScheduler = buildGobblinHelixJobScheduler(config, this.appWorkDir, getMetadataTags(clusterName, applicationId), + schedulerService); + this.applicationLauncher.addService(this.jobScheduler); this.applicationLauncher.addService(buildJobConfigurationManager(config)); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index d1ef74e..3a25df7 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -19,34 +19,59 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Gauge; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; +import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; +import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareCounter; +import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; -import org.apache.gobblin.runtime.api.MutableJobCatalog; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.listeners.AbstractJobListener; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; -import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; -import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; -import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; import org.apache.gobblin.scheduler.SchedulerService; +import javax.annotation.Nonnull; +import lombok.AllArgsConstructor; + + /** * An extension to {@link JobScheduler} that schedules and runs Gobblin jobs on Helix using * {@link GobblinHelixJobLauncher}s. @@ -54,7 +79,7 @@ import org.apache.gobblin.scheduler.SchedulerService; * @author Yinan Li */ @Alpha -public class GobblinHelixJobScheduler extends JobScheduler { +public class GobblinHelixJobScheduler extends JobScheduler implements StandardMetricsBridge{ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class); @@ -70,6 +95,8 @@ public class GobblinHelixJobScheduler extends JobScheduler { private final List<? extends Tag<?>> metadataTags; private final ConcurrentHashMap<String, Boolean> jobRunningMap; private final MutableJobCatalog jobCatalog; + private final MetricContext metricContext; + private final InnerStandardMetrics metrics; public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus, Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService, @@ -82,6 +109,157 @@ public class GobblinHelixJobScheduler extends JobScheduler { this.appWorkDir = appWorkDir; this.metadataTags = metadataTags; this.jobCatalog = jobCatalog; + this.metricContext = getDefaultMetricContext(properties); + this.metrics = new InnerStandardMetrics(this.metricContext); + } + + public MetricContext getDefaultMetricContext(Properties properties) { + org.apache.gobblin.configuration.State fakeState = + new org.apache.gobblin.configuration.State(properties); + List<Tag<?>> tags = new ArrayList<>(); + MetricContext res = Instrumented.getMetricContext(fakeState, GobblinHelixJobScheduler.class, tags); + return res; + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return GobblinMetrics.isEnabled(this.properties); + } + + @Override + public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { + return null; + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public StandardMetricsBridge.StandardMetrics getStandardMetrics() { + return metrics; + } + + private class InnerStandardMetrics implements StandardMetrics { + + private final ContextAwareCounter numJobsLaunched; + private final ContextAwareCounter numJobsCompleted; + private final ContextAwareCounter numJobsCommitted; + private final ContextAwareCounter numJobsFailed; + private final ContextAwareCounter numJobsCancelled; + private final ContextAwareGauge<Integer> numJobsRunning; + private final ContextAwareTimer timeForJobCompletion; + private final ContextAwareTimer timeForJobFailure; + + public InnerStandardMetrics(final MetricContext metricContext) { + this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER); + this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER); + this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER); + this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER); + this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER); + this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE, + new Gauge<Integer>() { + @Override public Integer getValue() { + return (int)(InnerStandardMetrics.this.numJobsLaunched.getCount() - + InnerStandardMetrics.this.numJobsCompleted.getCount()); + } + }); + this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION); + this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE); + } + + @Override + public String getName() { + return GobblinHelixJobScheduler.class.getName(); + } + + @Override + public Collection<ContextAwareGauge<?>> getGauges() { + return Collections.singleton(numJobsRunning); + } + + @Override + public Collection<ContextAwareCounter> getCounters() { + List<ContextAwareCounter> counters = Lists.newArrayList(); + counters.add(numJobsLaunched); + counters.add(numJobsCompleted); + counters.add(numJobsCommitted); + counters.add(numJobsFailed); + counters.add(numJobsCancelled); + return counters; + } + + @Override + public Collection<ContextAwareMeter> getMeters() { + return null; + } + + @Override + public Collection<ContextAwareTimer> getTimers() { + return ImmutableList.of(timeForJobCompletion, timeForJobFailure); + } + + @Override + public Collection<ContextAwareHistogram> getHistograms() { + return null; + } + } + + private class MetricsTrackingListener extends AbstractJobListener { + private final InnerStandardMetrics metrics; + private static final String START_TIME = "startTime"; + MetricsTrackingListener(InnerStandardMetrics metrics) { + this.metrics = metrics; + } + + @Override + public void onJobPrepare(JobContext jobContext) + throws Exception { + super.onJobPrepare(jobContext); + jobContext.getJobState().setProp(START_TIME, Long.toString(System.nanoTime())); + if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { + metrics.numJobsLaunched.inc(); + } + } + + @Override + public void onJobCompletion(JobContext jobContext) + throws Exception { + super.onJobCompletion(jobContext); + long startTime = jobContext.getJobState().getPropAsLong(START_TIME); + if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { + metrics.numJobsCompleted.inc(); + Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { + metrics.numJobsFailed.inc(); + Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + } else { + metrics.numJobsCommitted.inc(); + } + } + } + + @Override + public void onJobCancellation(JobContext jobContext) + throws Exception { + super.onJobCancellation(jobContext); + if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { + metrics.numJobsCancelled.inc(); + } + } + } @Override @@ -132,10 +310,10 @@ public class GobblinHelixJobScheduler extends JobScheduler { jobConfig.putAll(newJobArrival.getJobConfig()); if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { LOGGER.info("Scheduling job " + newJobArrival.getJobName()); - scheduleJob(jobConfig, null); + scheduleJob(jobConfig, new MetricsTrackingListener(metrics)); } else { LOGGER.info("No job schedule found, so running job " + newJobArrival.getJobName()); - this.jobExecutor.execute(new NonScheduledJobRunner(newJobArrival.getJobName(), jobConfig, null)); + this.jobExecutor.execute(new NonScheduledJobRunner(newJobArrival.getJobName(), jobConfig, new MetricsTrackingListener(metrics))); } } catch (JobException je) { LOGGER.error("Failed to schedule or run job " + newJobArrival.getJobName(), je); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java new file mode 100644 index 0000000..087cf39 --- /dev/null +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.instrumented; + +import java.util.Collection; + +import org.apache.gobblin.metrics.ContextAwareCounter; +import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ContextAwareTimer; + +/** + * This interface indicates a class will expose its metrics to some external systems. + */ +public interface StandardMetricsBridge extends Instrumentable { + + StandardMetrics getStandardMetrics(); + + interface StandardMetrics { + String getName(); + Collection<ContextAwareGauge<?>> getGauges(); + Collection<ContextAwareCounter> getCounters(); + Collection<ContextAwareMeter> getMeters(); + Collection<ContextAwareTimer> getTimers(); + Collection<ContextAwareHistogram> getHistograms(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java index 680cb3b..a940715 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java @@ -41,7 +41,7 @@ import org.apache.gobblin.metrics.metric.InnerMetric; * * @author Yinan Li */ -class ContextAwareHistogram extends Histogram implements ContextAwareMetric { +public class ContextAwareHistogram extends Histogram implements ContextAwareMetric { @Delegate private final InnerHistogram innerHistogram; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index e0264d9..df1dc44 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -17,17 +17,25 @@ package org.apache.gobblin.runtime.api; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import com.codahale.metrics.Gauge; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Service; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumentable; +import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import lombok.Getter; @@ -36,7 +44,7 @@ import lombok.Getter; * A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware of. */ @Alpha -public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable { +public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable, StandardMetricsBridge { /** Returns an immutable {@link Collection} of {@link JobSpec}s that are known to the catalog. */ Collection<JobSpec> getJobs(); @@ -44,13 +52,18 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable * ({@link #isInstrumentationEnabled()}) is false. */ StandardMetrics getMetrics(); + @Override + default StandardMetrics getStandardMetrics() { + return getMetrics(); + } + /** * Get a {@link JobSpec} by uri. * @throws JobSpecNotFoundException if no such JobSpec exists **/ JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException; - public static class StandardMetrics implements JobCatalogListener { + public static class StandardMetrics implements JobCatalogListener, StandardMetricsBridge.StandardMetrics { public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs"; public static final String NUM_ADDED_JOBS = "numAddedJobs"; public static final String NUM_DELETED_JOBS = "numDeletedJobs"; @@ -111,5 +124,36 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable this.numUpdatedJobs.inc(); submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE); } + + @Override + public String getName() { + return "JobCatalog"; + } + + @Override + public Collection<ContextAwareGauge<?>> getGauges() { + return Collections.singleton(this.numActiveJobs); + } + + @Override + public Collection<ContextAwareCounter> getCounters() { + List<ContextAwareCounter> counters = ImmutableList.of(numAddedJobs, numDeletedJobs, numDeletedJobs); + return counters; + } + + @Override + public Collection<ContextAwareMeter> getMeters() { + return null; + } + + @Override + public Collection<ContextAwareTimer> getTimers() { + return null; + } + + @Override + public Collection<ContextAwareHistogram> getHistograms() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f7e3ad06/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index 0e9e114..e63cb4a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -43,6 +43,8 @@ public interface JobExecutionLauncher extends Instrumentable { public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed"; public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled"; public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning"; + public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion"; + public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure"; public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent"; public static final String JOB_EXECID_META = "jobExecId";
