Repository: incubator-gobblin Updated Branches: refs/heads/master 35b894c2c -> 1be745524
[GOBBLIN-326] Add more metrics for GaaS and Gobblin Cluster Closes #2178 from yukuai518/timer Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1be74552 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1be74552 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1be74552 Branch: refs/heads/master Commit: 1be7455246d5c35900c8715c32559bd75b6c7bde Parents: 35b894c Author: Kuai Yu <[email protected]> Authored: Thu Dec 7 15:28:16 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Dec 7 15:28:16 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinClusterManager.java | 68 +++++++++++++++- .../cluster/GobblinHelixJobScheduler.java | 85 +++++++++++--------- .../instrumented/StandardMetricsBridge.java | 35 ++++++-- .../gobblin/metrics/ContextAwareHistogram.java | 8 ++ .../metrics/ContextAwareMetricFactory.java | 23 +++++- .../metrics/ContextAwareMetricFactoryArgs.java | 47 +++++++++++ .../gobblin/metrics/ContextAwareTimer.java | 9 +++ .../apache/gobblin/metrics/InnerHistogram.java | 17 ++++ .../gobblin/metrics/InnerMetricContext.java | 17 ++++ .../org/apache/gobblin/metrics/InnerTimer.java | 14 ++++ .../apache/gobblin/metrics/MetricContext.java | 27 ++++--- .../apache/gobblin/runtime/api/JobCatalog.java | 77 ++++++++++-------- .../runtime/api/JobExecutionLauncher.java | 8 ++ .../gobblin/runtime/api/MutableJobCatalog.java | 43 ++++++++++ .../gobblin/runtime/api/MutableSpecCatalog.java | 42 ++++++++++ .../apache/gobblin/runtime/api/SpecCatalog.java | 85 ++++++++++++++++---- .../runtime/job_catalog/FSJobCatalog.java | 17 +++- .../runtime/job_catalog/JobCatalogBase.java | 20 ++++- .../runtime/job_catalog/StaticJobCatalog.java | 3 + .../runtime/spec_catalog/FlowCatalog.java | 27 +++++-- .../runtime/spec_catalog/TopologyCatalog.java | 1 + .../modules/core/GobblinServiceManager.java | 62 +++++++++++++- .../scheduler/GobblinServiceJobScheduler.java | 2 +- 23 files changed, 615 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 7948a8a..6b53c6c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -20,6 +20,8 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,6 +37,12 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -83,9 +91,9 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JvmUtils; -import org.apache.gobblin.util.logs.Log4jConfigurationHelper; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import javax.annotation.Nonnull; import lombok.Getter; @@ -112,7 +120,7 @@ import lombok.Getter; * @author Yinan Li */ @Alpha -public class GobblinClusterManager implements ApplicationLauncher { +public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class); @@ -148,12 +156,14 @@ public class GobblinClusterManager implements ApplicationLauncher { private GobblinHelixJobScheduler jobScheduler; private final String clusterName; private final Config config; - + private final MetricContext metricContext; + private final Metrics metrics; public GobblinClusterManager(String clusterName, String applicationId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.clusterName = clusterName; this.config = config; - + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); + this.metrics = new Metrics(this.metricContext); this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE); @@ -239,6 +249,7 @@ public class GobblinClusterManager implements ApplicationLauncher { */ @VisibleForTesting void handleLeadershipChange(NotificationContext changeContext) { + this.metrics.clusterLeadershipChange.update(1); if (this.helixManager.isLeader()) { // can get multiple notifications on a leadership change, so only start the application launcher the first time // the notification is received @@ -531,6 +542,37 @@ public class GobblinClusterManager implements ApplicationLauncher { this.applicationLauncher.close(); } + @Override + public StandardMetrics getStandardMetrics() { + return this.metrics; + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config)); + } + + @Override + public List<Tag<?>> generateTags(State state) { + return ImmutableList.of(); + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } + /** * A custom implementation of {@link LiveInstanceChangeListener}. */ @@ -544,6 +586,24 @@ public class GobblinClusterManager implements ApplicationLauncher { } } + private class Metrics extends StandardMetrics { + public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange"; + private ContextAwareHistogram clusterLeadershipChange; + public Metrics(final MetricContext metricContext) { + clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES); + } + + @Override + public String getName() { + return GobblinClusterManager.class.getName(); + } + + @Override + public Collection<ContextAwareHistogram> getHistograms() { + return ImmutableList.of(this.clusterLeadershipChange); + } + } + /** * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type * "SHUTDOWN" for shutting down the controller. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 3a25df7..9fd5add 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -19,7 +19,6 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -28,12 +27,13 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Gauge; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -50,8 +50,6 @@ import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareHistogram; -import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; @@ -69,7 +67,6 @@ import org.apache.gobblin.scheduler.SchedulerService; import javax.annotation.Nonnull; -import lombok.AllArgsConstructor; /** @@ -96,7 +93,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final ConcurrentHashMap<String, Boolean> jobRunningMap; private final MutableJobCatalog jobCatalog; private final MetricContext metricContext; - private final InnerStandardMetrics metrics; + private final Metrics metrics; public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus, Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService, @@ -109,16 +106,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.appWorkDir = appWorkDir; this.metadataTags = metadataTags; this.jobCatalog = jobCatalog; - this.metricContext = getDefaultMetricContext(properties); - this.metrics = new InnerStandardMetrics(this.metricContext); - } - - public MetricContext getDefaultMetricContext(Properties properties) { - org.apache.gobblin.configuration.State fakeState = - new org.apache.gobblin.configuration.State(properties); - List<Tag<?>> tags = new ArrayList<>(); - MetricContext res = Instrumented.getMetricContext(fakeState, GobblinHelixJobScheduler.class, tags); - return res; + this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass()); + this.metrics = new Metrics(this.metricContext); } @Nonnull @@ -148,36 +137,61 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } @Override - public StandardMetricsBridge.StandardMetrics getStandardMetrics() { + public StandardMetrics getStandardMetrics() { return metrics; } - private class InnerStandardMetrics implements StandardMetrics { + private class Metrics extends StandardMetrics { private final ContextAwareCounter numJobsLaunched; private final ContextAwareCounter numJobsCompleted; private final ContextAwareCounter numJobsCommitted; private final ContextAwareCounter numJobsFailed; private final ContextAwareCounter numJobsCancelled; + private final ContextAwareHistogram histogramJobsLaunched; + private final ContextAwareHistogram histogramJobsCompleted; + private final ContextAwareHistogram histogramJobsCommitted; + private final ContextAwareHistogram histogramJobsFailed; + private final ContextAwareHistogram histogramJobsCancelled; + private final ContextAwareGauge<Integer> numJobsRunning; private final ContextAwareTimer timeForJobCompletion; private final ContextAwareTimer timeForJobFailure; + private final ContextAwareTimer timeBeforeJobScheduling; + private final ContextAwareTimer timeBeforeJobLaunching; - public InnerStandardMetrics(final MetricContext metricContext) { + public Metrics(final MetricContext metricContext) { + // All historical counters this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER); this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER); this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER); this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER); this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER); + + // Counters within last 1 minute + this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES); + this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES); + this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES); + this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES); + this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES); + this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE, - new Gauge<Integer>() { - @Override public Integer getValue() { - return (int)(InnerStandardMetrics.this.numJobsLaunched.getCount() - - InnerStandardMetrics.this.numJobsCompleted.getCount()); - } - }); - this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION); - this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE); + ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount())); + + this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES); + this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES); + this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES); + this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES); + } + + private void updateTimeBeforeJobScheduling (Properties jobConfig) { + long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); + Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS); + } + + private void updateTimeBeforeJobLaunching (Properties jobConfig) { + long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); + Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS); } @Override @@ -202,25 +216,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } @Override - public Collection<ContextAwareMeter> getMeters() { - return null; - } - - @Override public Collection<ContextAwareTimer> getTimers() { - return ImmutableList.of(timeForJobCompletion, timeForJobFailure); + return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching); } @Override public Collection<ContextAwareHistogram> getHistograms() { - return null; + return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted); } } private class MetricsTrackingListener extends AbstractJobListener { - private final InnerStandardMetrics metrics; + private final Metrics metrics; private static final String START_TIME = "startTime"; - MetricsTrackingListener(InnerStandardMetrics metrics) { + MetricsTrackingListener(Metrics metrics) { this.metrics = metrics; } @@ -308,6 +317,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe Properties jobConfig = new Properties(); jobConfig.putAll(this.properties); jobConfig.putAll(newJobArrival.getJobConfig()); + + metrics.updateTimeBeforeJobScheduling(jobConfig); + if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { LOGGER.info("Scheduling job " + newJobArrival.getJobName()); scheduleJob(jobConfig, new MetricsTrackingListener(metrics)); @@ -365,6 +377,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void run() { try { + ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig); GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener); // remove non-scheduled job catalog once done so it won't be re-executed http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java index 087cf39..3993dce 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java @@ -25,6 +25,9 @@ import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.ContextAwareTimer; +import com.google.common.collect.ImmutableList; + + /** * This interface indicates a class will expose its metrics to some external systems. */ @@ -32,12 +35,30 @@ public interface StandardMetricsBridge extends Instrumentable { StandardMetrics getStandardMetrics(); - interface StandardMetrics { - String getName(); - Collection<ContextAwareGauge<?>> getGauges(); - Collection<ContextAwareCounter> getCounters(); - Collection<ContextAwareMeter> getMeters(); - Collection<ContextAwareTimer> getTimers(); - Collection<ContextAwareHistogram> getHistograms(); + public class StandardMetrics { + + public String getName() { + return this.getClass().getName(); + } + + public Collection<ContextAwareGauge<?>> getGauges() { + return ImmutableList.of(); + } + + public Collection<ContextAwareCounter> getCounters() { + return ImmutableList.of(); + } + + public Collection<ContextAwareMeter> getMeters() { + return ImmutableList.of(); + } + + public Collection<ContextAwareTimer> getTimers() { + return ImmutableList.of(); + } + + public Collection<ContextAwareHistogram> getHistograms() { + return ImmutableList.of(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java index a940715..a45f928 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java @@ -17,10 +17,13 @@ package org.apache.gobblin.metrics; +import java.util.concurrent.TimeUnit; + import lombok.experimental.Delegate; import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; import org.apache.gobblin.metrics.metric.InnerMetric; @@ -52,7 +55,12 @@ public class ContextAwareHistogram extends Histogram implements ContextAwareMetr super(new ExponentiallyDecayingReservoir()); this.innerHistogram = new InnerHistogram(context, name, this); this.context = context; + } + ContextAwareHistogram(MetricContext context, String name, long windowSize, TimeUnit unit) { + super(new SlidingTimeWindowReservoir(windowSize, unit)); + this.innerHistogram = new InnerHistogram(context, name, this, windowSize, unit); + this.context = context; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java index d699b06..e9f6d62 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java @@ -23,7 +23,6 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.Timer; - /** * An interface for factory classes for {@link ContextAwareMetric}s. * @@ -49,6 +48,10 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> { */ public T newMetric(MetricContext context, String name); + default public T newMetric(ContextAwareMetricFactoryArgs args) { + return null; + } + /** * Check if a given metric is an instance of the type of context-aware metrics created by this * {@link ContextAwareMetricFactory}. @@ -102,6 +105,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> { } @Override + public ContextAwareHistogram newMetric(ContextAwareMetricFactoryArgs args) { + if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) { + ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args; + return new ContextAwareHistogram(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit()); + } + throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareHistogram"); + } + + @Override public boolean isInstance(Metric metric) { return Histogram.class.isInstance(metric); } @@ -118,6 +130,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> { } @Override + public ContextAwareTimer newMetric(ContextAwareMetricFactoryArgs args) { + if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) { + ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args; + return new ContextAwareTimer(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit()); + } + throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareTimer"); + } + + @Override public boolean isInstance(Metric metric) { return Timer.class.isInstance(metric); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java new file mode 100644 index 0000000..1e7e077 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics; + +import java.util.concurrent.TimeUnit; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +/** + * A class which wraps all arguments required by {@link ContextAwareMetricFactory}s. + * + * A concrete {@link ContextAwareMetricFactory} knows how to interpret this class into its corresponding sub-type. + */ +@AllArgsConstructor +@Getter +public class ContextAwareMetricFactoryArgs { + protected final MetricContext context; + protected final String name; + + @Getter + public static class SlidingTimeWindowArgs extends ContextAwareMetricFactoryArgs { + protected final long windowSize; + protected final TimeUnit unit; + public SlidingTimeWindowArgs(MetricContext context, String name, long windowSize, TimeUnit unit) { + super(context, name); + this.windowSize = windowSize; + this.unit = unit; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java index 33d68ed..839df20 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java @@ -17,6 +17,9 @@ package org.apache.gobblin.metrics; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.SlidingTimeWindowReservoir; import com.codahale.metrics.Timer; import org.apache.gobblin.metrics.metric.InnerMetric; @@ -51,6 +54,12 @@ public class ContextAwareTimer extends Timer implements ContextAwareMetric { this.context = context; } + ContextAwareTimer(MetricContext context, String name, long windowSize, TimeUnit unit) { + super(new SlidingTimeWindowReservoir(windowSize, unit)); + this.innerTimer = new InnerTimer(context, name, this, windowSize, unit); + this.context = context; + } + @Override public MetricContext getContext() { return this.context; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java index 7fb5401..66f8fd2 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java @@ -18,9 +18,11 @@ package org.apache.gobblin.metrics; import java.lang.ref.WeakReference; +import java.util.concurrent.TimeUnit; import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; import com.google.common.base.Optional; import org.apache.gobblin.metrics.metric.InnerMetric; @@ -50,6 +52,21 @@ public class InnerHistogram extends Histogram implements InnerMetric { this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram); } + InnerHistogram(MetricContext context, String name, ContextAwareHistogram contextAwareHistogram, long windowSize, TimeUnit unit) { + super(new SlidingTimeWindowReservoir(windowSize, unit)); + + this.name = name; + + Optional<MetricContext> parentContext = context.getParent(); + if (parentContext.isPresent()) { + this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name, windowSize, unit)); + } else { + this.parentHistogram = Optional.absent(); + } + + this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram); + } + @Override public void update(int value) { update((long) value); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java index 70502ec..2738974 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java @@ -368,6 +368,23 @@ public class InnerMetricContext extends MetricRegistry implements ReportableCont return newMetric; } + @SuppressWarnings("unchecked") + protected synchronized <T extends ContextAwareMetric> T getOrCreate( + ContextAwareMetricFactory<T> factory, ContextAwareMetricFactoryArgs args) { + String name = args.getName(); + InnerMetric metric = this.contextAwareMetrics.get(name); + if (metric != null) { + if (factory.isInstance(metric)) { + return (T) metric.getContextAwareMetric(); + } + throw new IllegalArgumentException(name + " is already used for a different type of metric"); + } + + T newMetric = factory.newMetric(args); + this.register(name, newMetric); + return newMetric; + } + private boolean removeChildrenMetrics(String name) { boolean removed = true; for (MetricContext child : getChildContextsAsMap().values()) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java index db4fc0a..be3517f 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java @@ -20,6 +20,7 @@ package org.apache.gobblin.metrics; import java.lang.ref.WeakReference; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.SlidingTimeWindowReservoir; import com.codahale.metrics.Timer; import com.google.common.base.Optional; @@ -47,6 +48,19 @@ public class InnerTimer extends Timer implements InnerMetric { this.timer = new WeakReference<>(contextAwareTimer); } + InnerTimer(MetricContext context, String name, ContextAwareTimer contextAwareTimer, long windowSize, TimeUnit unit) { + super(new SlidingTimeWindowReservoir(windowSize, unit)); + this.name = name; + + Optional<MetricContext> parentContext = context.getParent(); + if (parentContext.isPresent()) { + this.parentTimer = Optional.fromNullable(parentContext.get().contextAwareTimer(name, windowSize, unit)); + } else { + this.parentTimer = Optional.absent(); + } + this.timer = new WeakReference<>(contextAwareTimer); + } + @Override public void update(long duration, TimeUnit unit) { super.update(duration, unit); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java index dcc1029..46f8ab1 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java @@ -465,19 +465,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext, * @return the {@link ContextAwareHistogram} with the given name */ public ContextAwareHistogram contextAwareHistogram(String name) { - return contextAwareHistogram(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY); + return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY); } /** - * Get a {@link ContextAwareHistogram} with a given name. + * Get a {@link ContextAwareHistogram} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir} * * @param name name of the {@link ContextAwareHistogram} - * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareHistogram}s + * @param windowSize normally the duration of the time window + * @param unit the unit of time * @return the {@link ContextAwareHistogram} with the given name */ - public ContextAwareHistogram contextAwareHistogram(String name, - ContextAwareMetricFactory<ContextAwareHistogram> factory) { - return this.innerMetricContext.getOrCreate(name, factory); + public ContextAwareHistogram contextAwareHistogram(String name, long windowSize, TimeUnit unit) { + ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs( + this.innerMetricContext.getMetricContext().get(), name, windowSize, unit); + return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY, args); } /** @@ -487,18 +489,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext, * @return the {@link ContextAwareTimer} with the given name */ public ContextAwareTimer contextAwareTimer(String name) { - return contextAwareTimer(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY); + return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY); } /** - * Get a {@link ContextAwareTimer} with a given name. + * Get a {@link ContextAwareTimer} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir} * * @param name name of the {@link ContextAwareTimer} - * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareTimer}s + * @param windowSize normally the duration of the time window + * @param unit the unit of time * @return the {@link ContextAwareTimer} with the given name */ - public ContextAwareTimer contextAwareTimer(String name, ContextAwareMetricFactory<ContextAwareTimer> factory) { - return this.innerMetricContext.getOrCreate(name, factory); + public ContextAwareTimer contextAwareTimer(String name, long windowSize, TimeUnit unit) { + ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs( + this.innerMetricContext.getMetricContext().get(), name, windowSize, unit); + return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY, args); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index df1dc44..6c0ea5b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -17,28 +17,30 @@ package org.apache.gobblin.runtime.api; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.TimeUnit; import com.codahale.metrics.Gauge; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Service; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumentable; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareHistogram; -import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + /** * A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware of. @@ -50,10 +52,9 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable /** Metrics for the job catalog; null if * ({@link #isInstrumentationEnabled()}) is false. */ - StandardMetrics getMetrics(); + JobCatalog.StandardMetrics getMetrics(); - @Override - default StandardMetrics getStandardMetrics() { + default StandardMetricsBridge.StandardMetrics getStandardMetrics() { return getMetrics(); } @@ -63,11 +64,16 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable **/ JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException; - public static class StandardMetrics implements JobCatalogListener, StandardMetricsBridge.StandardMetrics { + @Slf4j + public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener { public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs"; public static final String NUM_ADDED_JOBS = "numAddedJobs"; public static final String NUM_DELETED_JOBS = "numDeletedJobs"; public static final String NUM_UPDATED_JOBS = "numUpdatedJobs"; + public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet"; + public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd"; + public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate"; + public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete"; public static final String TRACKING_EVENT_NAME = "JobCatalogEvent"; public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded"; public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted"; @@ -77,22 +83,36 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable @Getter private final ContextAwareCounter numAddedJobs; @Getter private final ContextAwareCounter numDeletedJobs; @Getter private final ContextAwareCounter numUpdatedJobs; - - public StandardMetrics(final JobCatalog parent) { - this.numAddedJobs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_JOBS); - this.numDeletedJobs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_JOBS); - this.numUpdatedJobs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_JOBS); - this.numActiveJobs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, - new Gauge<Integer>() { - @Override public Integer getValue() { - return parent.getJobs().size(); - } + @Getter private final ContextAwareTimer timeForJobCatalogGet; + @Getter private final ContextAwareHistogram histogramForJobAdd; + @Getter private final ContextAwareHistogram histogramForJobUpdate; + @Getter private final ContextAwareHistogram histogramForJobDelete; + + public StandardMetrics(final JobCatalog jobCatalog) { + MetricContext context = jobCatalog.getMetricContext(); + this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES); + this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS); + this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS); + this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS); + this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{ + long startTime = System.currentTimeMillis(); + int size = jobCatalog.getJobs().size(); + updateGetJobTime(startTime); + return size; }); - parent.addListener(this); + this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES); + this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES); + this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES); + } + + public void updateGetJobTime(long startTime) { + log.info("updateGetJobTime..."); + Instrumented.updateTimer(Optional.of(this.timeForJobCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } @Override public void onAddJob(JobSpec addedJob) { this.numAddedJobs.inc(); + this.histogramForJobAdd.update(1); submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE); } @@ -116,44 +136,35 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { this.numDeletedJobs.inc(); + this.histogramForJobDelete.update(1); submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE); } @Override public void onUpdateJob(JobSpec updatedJob) { this.numUpdatedJobs.inc(); + this.histogramForJobUpdate.update(1); submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE); } @Override - public String getName() { - return "JobCatalog"; - } - - @Override public Collection<ContextAwareGauge<?>> getGauges() { return Collections.singleton(this.numActiveJobs); } @Override public Collection<ContextAwareCounter> getCounters() { - List<ContextAwareCounter> counters = ImmutableList.of(numAddedJobs, numDeletedJobs, numDeletedJobs); - return counters; - } - - @Override - public Collection<ContextAwareMeter> getMeters() { - return null; + return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs); } @Override public Collection<ContextAwareTimer> getTimers() { - return null; + return ImmutableList.of(timeForJobCatalogGet); } @Override public Collection<ContextAwareHistogram> getHistograms() { - return null; + return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index e63cb4a..fb54139 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -43,8 +43,16 @@ public interface JobExecutionLauncher extends Instrumentable { public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed"; public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled"; public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning"; + public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched"; + public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted"; + public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted"; + public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed"; + public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled"; + public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion"; public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure"; + public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling"; + public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching"; public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent"; public static final String JOB_EXECID_META = "jobExecId"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java index b5c82b7..8b6e98c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java @@ -17,8 +17,19 @@ package org.apache.gobblin.runtime.api; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.ContextAwareTimer; + +import com.google.common.base.Optional; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + /** * A {@link JobCatalog} that can have its {@link Collection} of {@link JobSpec}s modified @@ -37,4 +48,36 @@ public interface MutableJobCatalog extends JobCatalog { * Removes an existing JobSpec with the given URI. A no-op if such JobSpec does not exist. */ void remove(URI uri); + + @Slf4j + public static class MutableStandardMetrics extends JobCatalog.StandardMetrics { + public static final String TIME_FOR_JOB_CATALOG_REMOVE = "timeForJobCatalogRemove"; + public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut"; + @Getter private final ContextAwareTimer timeForJobCatalogPut; + @Getter private final ContextAwareTimer timeForJobCatalogRemove; + public MutableStandardMetrics(JobCatalog catalog) { + super(catalog); + timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES); + timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES); + } + + public void updatePutJobTime(long startTime) { + log.info("updatePutJobTime..."); + Instrumented.updateTimer(Optional.of(this.timeForJobCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + public void updateRemoveJobTime(long startTime) { + log.info("updateRemoveJobTime..."); + Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + @Override + public Collection<ContextAwareTimer> getTimers() { + Collection<ContextAwareTimer> all = new ArrayList<>(); + all.addAll(super.getTimers()); + all.add(this.timeForJobCatalogPut); + all.add(this.timeForJobCatalogRemove); + return all; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java index f63600a..3aa16be 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java @@ -18,7 +18,17 @@ package org.apache.gobblin.runtime.api; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.ContextAwareTimer; + +import com.google.common.base.Optional; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** @@ -38,4 +48,36 @@ public interface MutableSpecCatalog extends SpecCatalog { * Throws SpecNotFoundException if such {@link Spec} does not exist. */ void remove(URI uri) throws SpecNotFoundException; + + @Slf4j + public static class MutableStandardMetrics extends StandardMetrics { + public static final String TIME_FOR_SPEC_CATALOG_REMOVE = "timeForSpecCatalogRemove"; + public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut"; + @Getter private final ContextAwareTimer timeForSpecCatalogPut; + @Getter private final ContextAwareTimer timeForSpecCatalogRemove; + public MutableStandardMetrics(SpecCatalog catalog) { + super(catalog); + timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES); + timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES); + } + + public void updatePutSpecTime(long startTime) { + log.info("updatePutSpecTime..."); + Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + public void updateRemoveSpecTime(long startTime) { + log.info("updateRemoveSpecTime..."); + Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + @Override + public Collection<ContextAwareTimer> getTimers() { + Collection<ContextAwareTimer> all = new ArrayList<>(); + all.addAll(super.getTimers()); + all.add(this.timeForSpecCatalogPut); + all.add(this.timeForSpecCatalogRemove); + return all; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index d2af9b8..4b85ea9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -19,20 +19,29 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import com.codahale.metrics.Gauge; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.gobblin.instrumented.GobblinMetricsKeys; -import org.apache.gobblin.instrumented.Instrumentable; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; -public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentable { +public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetricsBridge { /** Returns an immutable {@link Collection} of {@link Spec}s that are known to the catalog. */ Collection<Spec> getSpecs(); @@ -40,13 +49,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab * ({@link #isInstrumentationEnabled()}) is false. */ SpecCatalog.StandardMetrics getMetrics(); + default StandardMetricsBridge.StandardMetrics getStandardMetrics() { + return this.getMetrics(); + } + /** * Get a {@link Spec} by uri. * @throws SpecNotFoundException if no such Spec exists **/ Spec getSpec(URI uri) throws SpecNotFoundException; - public static class StandardMetrics implements SpecCatalogListener { + @Slf4j + public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener { public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs"; public static final String NUM_ADDED_SPECS = "numAddedSpecs"; public static final String NUM_DELETED_SPECS = "numDeletedSpecs"; @@ -55,28 +69,65 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded"; public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted"; public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated"; + public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet"; + public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd"; + public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate"; + public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete"; - @Getter - private final ContextAwareGauge<Integer> numActiveSpecs; + @Getter private final ContextAwareGauge<Integer> numActiveSpecs; @Getter private final ContextAwareCounter numAddedSpecs; @Getter private final ContextAwareCounter numDeletedSpecs; @Getter private final ContextAwareCounter numUpdatedSpecs; + @Getter private final ContextAwareTimer timeForSpecCatalogGet; + @Getter private final ContextAwareHistogram histogramForSpecAdd; + @Getter private final ContextAwareHistogram histogramForSpecUpdate; + @Getter private final ContextAwareHistogram histogramForSpecDelete; + + public StandardMetrics(final SpecCatalog specCatalog) { + MetricContext context = specCatalog.getMetricContext(); + this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES); + this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS); + this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS); + this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS); + this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{ + long startTime = System.currentTimeMillis(); + int size = specCatalog.getSpecs().size(); + updateGetSpecTime(startTime); + return size; + }); + this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES); + this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES); + this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES); + } + + public void updateGetSpecTime(long startTime) { + log.info("updateGetSpecTime..."); + Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + @Override + public Collection<ContextAwareGauge<?>> getGauges() { + return Collections.singleton(this.numActiveSpecs); + } - public StandardMetrics(final SpecCatalog parent) { - this.numAddedSpecs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_SPECS); - this.numDeletedSpecs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_SPECS); - this.numUpdatedSpecs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_SPECS); - this.numActiveSpecs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, - new Gauge<Integer>() { - @Override public Integer getValue() { - return parent.getSpecs().size(); - } - }); - parent.addListener(this); + @Override + public Collection<ContextAwareCounter> getCounters() { + return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs); + } + + @Override + public Collection<ContextAwareHistogram> getHistograms() { + return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate); + } + + @Override + public Collection<ContextAwareTimer> getTimers() { + return ImmutableList.of(this.timeForSpecCatalogGet); } @Override public void onAddSpec(Spec addedSpec) { this.numAddedSpecs.inc(); + this.histogramForSpecAdd.update(1); submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE); } @@ -100,12 +151,14 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab @Override public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { this.numDeletedSpecs.inc(); + this.histogramForSpecDelete.update(1); submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE); } @Override public void onUpdateSpec(Spec updatedSpec) { this.numUpdatedSpecs.inc(); + this.histogramForSpecUpdate.update(1); submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java index be06b1e..8d89b97 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime.job_catalog; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobCatalog; import org.apache.gobblin.runtime.api.JobCatalogWithTemplates; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -62,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class); public static final String CONF_EXTENSION = ".conf"; private static final String FS_SCHEME = "FS"; - + private final MutableStandardMetrics mutableMetrics; /** * Initialize the JobCatalog, fetch all jobs in jobConfDirPath. * @param sysConfig @@ -71,15 +72,24 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat public FSJobCatalog(Config sysConfig) throws IOException { super(sysConfig); + this.mutableMetrics = (MutableStandardMetrics)metrics; } public FSJobCatalog(GobblinInstanceEnvironment env) throws IOException { super(env); + this.mutableMetrics = (MutableStandardMetrics)metrics; } public FSJobCatalog(Config sysConfig, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) throws IOException{ super(sysConfig, null, parentMetricContext, instrumentationEnabled); + this.mutableMetrics = (MutableStandardMetrics)metrics; + } + + @Override + protected JobCatalog.StandardMetrics createStandardMetrics() { + log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName()); + return new MutableStandardMetrics(this); } /** @@ -94,6 +104,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat protected FSJobCatalog(Config sysConfig, PathAlterationObserver observer) throws IOException { super(sysConfig, observer); + this.mutableMetrics = (MutableStandardMetrics)this.metrics; } /** @@ -108,8 +119,10 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(jobSpec); try { + long startTime = System.currentTimeMillis(); Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri()); materializedJobSpec(jobSpecPath, jobSpec, this.fs); + this.mutableMetrics.updatePutJobTime(startTime); } catch (IOException e) { throw new RuntimeException("When persisting a new JobSpec, unexpected issues happen:" + e.getMessage()); } catch (JobSpecNotFoundException e) { @@ -126,10 +139,12 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat public synchronized void remove(URI jobURI) { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); try { + long startTime = System.currentTimeMillis(); Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI); if (fs.exists(jobSpecPath)) { fs.delete(jobSpecPath, false); + this.mutableMetrics.updateRemoveJobTime(startTime); } else { LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed."); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java index ec33226..d40c962 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java @@ -18,6 +18,7 @@ package org.apache.gobblin.runtime.job_catalog; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -68,7 +69,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build(); - this.metrics = new StandardMetrics(this); + this.metrics = createStandardMetrics(); + this.addListener(this.metrics); } else { this.metricContext = null; @@ -76,6 +78,10 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC } } + protected StandardMetrics createStandardMetrics() { + return new StandardMetrics(this); + } + @Override protected void startUp() throws IOException { notifyAllListeners(); @@ -87,11 +93,19 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC } protected void notifyAllListeners() { - for (JobSpec jobSpec : getJobs()) { + Collection<JobSpec> jobSpecs = getJobsWithTimeUpdate(); + for (JobSpec jobSpec : jobSpecs) { this.listeners.onAddJob(jobSpec); } } + private Collection<JobSpec> getJobsWithTimeUpdate() { + long startTime = System.currentTimeMillis(); + Collection<JobSpec> jobSpecs = getJobs(); + this.metrics.updateGetJobTime(startTime); + return jobSpecs; + } + /**{@inheritDoc}*/ @Override public synchronized void addListener(JobCatalogListener jobListener) { @@ -99,7 +113,7 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC this.listeners.addListener(jobListener); if (state() == State.RUNNING) { - for (JobSpec jobSpec : getJobs()) { + for (JobSpec jobSpec : getJobsWithTimeUpdate()) { JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec); this.listeners.callbackOneListener(addJobCallback, jobListener); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java index 80d5955..091984f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java @@ -68,6 +68,9 @@ public class StaticJobCatalog extends JobCatalogBase { return mapBuilder.build(); } + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR", + justification = "Uninitialized variable has been checked.") @Override public void addListener(JobCatalogListener jobListener) { if (this.jobs == null) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index ecfe036..482825f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -61,7 +61,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut protected final SpecCatalogListenersList listeners; protected final Logger log; protected final MetricContext metricContext; - protected final FlowCatalog.StandardMetrics metrics; + protected final MutableStandardMetrics metrics; protected final SpecStore specStore; private final ClassAliasResolver<SpecStore> aliasResolver; @@ -87,7 +87,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build(); - this.metrics = new StandardMetrics(this); + this.metrics = new MutableStandardMetrics(this); + this.addListener(this.metrics); } else { this.metricContext = null; @@ -133,7 +134,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut /**************************************************/ protected void notifyAllListeners() { - for (Spec spec : getSpecs()) { + for (Spec spec : getSpecsWithTimeUpdate()) { this.listeners.onAddSpec(spec); } } @@ -144,7 +145,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut this.listeners.addListener(specListener); if (state() == State.RUNNING) { - for (Spec spec : getSpecs()) { + for (Spec spec : getSpecsWithTimeUpdate()) { SpecCatalogListener.AddSpecCallback addJobCallback = new SpecCatalogListener.AddSpecCallback(spec); this.listeners.callbackOneListener(addJobCallback, specListener); } @@ -192,7 +193,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } @Override - public StandardMetrics getMetrics() { + public SpecCatalog.StandardMetrics getMetrics() { return this.metrics; } @@ -209,6 +210,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } } + public Collection<Spec> getSpecsWithTimeUpdate() { + try { + long startTime = System.currentTimeMillis(); + Collection<Spec> specs = specStore.getSpecs(); + this.metrics.updateGetSpecTime(startTime); + return specs; + } catch (IOException e) { + throw new RuntimeException("Cannot retrieve Specs from Spec store", e); + } + } + public boolean exists(URI uri) { try { return specStore.exists(uri); @@ -232,9 +244,11 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(spec); + long startTime = System.currentTimeMillis(); log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(), ((FlowSpec) spec).getConfigAsProperties())); specStore.addSpec(spec); + metrics.updatePutSpecTime(startTime); this.listeners.onAddSpec(spec); } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e); @@ -246,9 +260,10 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); - + long startTime = System.currentTimeMillis(); log.info(String.format("Removing FlowSpec with URI: %s", uri)); specStore.deleteSpec(uri); + this.metrics.updateRemoveSpecTime(startTime); this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java index 7bb8b9c..c334d2b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java @@ -95,6 +95,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build(); this.metrics = new SpecCatalog.StandardMetrics(this); + this.addListener(this.metrics); } else { this.metricContext = null; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index a13ed28..2cbb113 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -17,6 +17,12 @@ package org.apache.gobblin.service.modules.core; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.Schedule; import java.io.IOException; @@ -28,7 +34,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.commons.cli.CommandLine; @@ -37,7 +45,6 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.gobblin.util.logs.Log4jConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.eventbus.EventBus; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -94,7 +102,7 @@ import org.apache.gobblin.util.ConfigUtils; @Alpha -public class GobblinServiceManager implements ApplicationLauncher { +public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge{ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class); @@ -134,6 +142,9 @@ public class GobblinServiceManager implements ApplicationLauncher { @Getter protected Config config; + private final MetricContext metricContext; + private final Metrics metrics; + public GobblinServiceManager(String serviceName, String serviceId, Config config, Optional<Path> serviceWorkDirOptional) throws Exception { @@ -143,7 +154,8 @@ public class GobblinServiceManager implements ApplicationLauncher { properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300)); } this.config = config; - + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); + this.metrics = new Metrics(this.metricContext); this.serviceId = serviceId; this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName); @@ -435,6 +447,50 @@ public class GobblinServiceManager implements ApplicationLauncher { this.serviceLauncher.close(); } + @Override + public StandardMetrics getStandardMetrics() { + return this.metrics; + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return false; + } + + @Override + public List<Tag<?>> generateTags(State state) { + return null; + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } + + private class Metrics extends StandardMetrics { + public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange"; + private ContextAwareHistogram serviceLeadershipChange; + public Metrics(final MetricContext metricContext) { + serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES); + } + + @Override + public Collection<ContextAwareHistogram> getHistograms() { + return ImmutableList.of(this.serviceLeadershipChange); + } + } + /** * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 0c45daf..5c26445 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -109,7 +109,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata // the onAddSpec will forward specs to the leader, which is itself. this.isActive = isActive; if (this.flowCatalog.isPresent()) { - Collection<Spec> specs = this.flowCatalog.get().getSpecs(); + Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate(); for (Spec spec : specs) { onAddSpec(spec); }
