Repository: incubator-gobblin Updated Branches: refs/heads/master 0fabaa7a4 -> ab034478c
[GOBBLIN-349] Add gauges for number of job scheduler for gobblin cluster Change number counter to guage type for gobblin cluster metrics Fix build Minor change Closes #2212 from yukuai518/me Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ab034478 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ab034478 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ab034478 Branch: refs/heads/master Commit: ab034478c9dfa9eaa3d1b81f698fec3606833328 Parents: 0fabaa7 Author: Kuai Yu <[email protected]> Authored: Sun Dec 24 05:59:19 2017 +0530 Committer: Abhishek Tiwari <[email protected]> Committed: Sun Dec 24 05:59:19 2017 +0530 ---------------------------------------------------------------------- .../cluster/GobblinHelixJobScheduler.java | 80 ++++++++------------ .../apache/gobblin/runtime/api/JobCatalog.java | 68 +++++++---------- .../runtime/api/JobExecutionLauncher.java | 29 +++---- .../apache/gobblin/runtime/api/SpecCatalog.java | 65 +++++++--------- .../runtime/job_catalog/FSJobCatalog.java | 2 +- .../job_catalog/NonObservingFSJobCatalog.java | 6 +- .../job_catalog/TestInMemoryJobCatalog.java | 42 +++++----- 7 files changed, 126 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 9fd5add..36ba542 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -20,15 +20,13 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; @@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -143,18 +140,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private class Metrics extends StandardMetrics { - private final ContextAwareCounter numJobsLaunched; - private final ContextAwareCounter numJobsCompleted; - private final ContextAwareCounter numJobsCommitted; - private final ContextAwareCounter numJobsFailed; - private final ContextAwareCounter numJobsCancelled; - private final ContextAwareHistogram histogramJobsLaunched; - private final ContextAwareHistogram histogramJobsCompleted; - private final ContextAwareHistogram histogramJobsCommitted; - private final ContextAwareHistogram histogramJobsFailed; - private final ContextAwareHistogram histogramJobsCancelled; - + private final AtomicLong totalJobsLaunched; + private final AtomicLong totalJobsCompleted; + private final AtomicLong totalJobsCommitted; + private final AtomicLong totalJobsFailed; + private final AtomicLong totalJobsCancelled; + private final ContextAwareGauge<Long> numJobsLaunched; + private final ContextAwareGauge<Long> numJobsCompleted; + private final ContextAwareGauge<Long> numJobsCommitted; + private final ContextAwareGauge<Long> numJobsFailed; + private final ContextAwareGauge<Long> numJobsCancelled; private final ContextAwareGauge<Integer> numJobsRunning; + private final ContextAwareTimer timeForJobCompletion; private final ContextAwareTimer timeForJobFailure; private final ContextAwareTimer timeBeforeJobScheduling; @@ -162,21 +159,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe public Metrics(final MetricContext metricContext) { // All historical counters - this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER); - this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER); - this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER); - this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER); - this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER); - - // Counters within last 1 minute - this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES); - this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES); - this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES); - this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES); - this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES); - - this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE, - ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount())); + this.totalJobsLaunched = new AtomicLong(0); + this.totalJobsCompleted = new AtomicLong(0); + this.totalJobsCommitted = new AtomicLong(0); + this.totalJobsFailed = new AtomicLong(0); + this.totalJobsCancelled = new AtomicLong(0); + + this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get()); + this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get()); + this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get()); + this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get()); + this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get()); + this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING, + ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get())); this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES); this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES); @@ -201,29 +196,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public Collection<ContextAwareGauge<?>> getGauges() { - return Collections.singleton(numJobsRunning); + return ImmutableList.of(numJobsRunning, numJobsLaunched, numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled); } @Override public Collection<ContextAwareCounter> getCounters() { - List<ContextAwareCounter> counters = Lists.newArrayList(); - counters.add(numJobsLaunched); - counters.add(numJobsCompleted); - counters.add(numJobsCommitted); - counters.add(numJobsFailed); - counters.add(numJobsCancelled); - return counters; + return ImmutableList.of(); } @Override public Collection<ContextAwareTimer> getTimers() { return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching); } - - @Override - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted); - } } private class MetricsTrackingListener extends AbstractJobListener { @@ -239,7 +223,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe super.onJobPrepare(jobContext); jobContext.getJobState().setProp(START_TIME, Long.toString(System.nanoTime())); if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.numJobsLaunched.inc(); + metrics.totalJobsLaunched.incrementAndGet(); } } @@ -249,13 +233,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe super.onJobCompletion(jobContext); long startTime = jobContext.getJobState().getPropAsLong(START_TIME); if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.numJobsCompleted.inc(); + metrics.totalJobsCompleted.incrementAndGet(); Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { - metrics.numJobsFailed.inc(); + metrics.totalJobsFailed.incrementAndGet(); Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } else { - metrics.numJobsCommitted.inc(); + metrics.totalJobsCommitted.incrementAndGet(); } } } @@ -265,7 +249,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe throws Exception { super.onJobCancellation(jobContext); if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.numJobsCancelled.inc(); + metrics.totalJobsCancelled.incrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index 6c0ea5b..950b86d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -18,10 +18,9 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; -import com.codahale.metrics.Gauge; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; @@ -67,42 +65,42 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable @Slf4j public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener { public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs"; - public static final String NUM_ADDED_JOBS = "numAddedJobs"; - public static final String NUM_DELETED_JOBS = "numDeletedJobs"; - public static final String NUM_UPDATED_JOBS = "numUpdatedJobs"; + public static final String TOTAL_ADD_CALLS = "totalAddCalls"; + public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls"; + public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls"; public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet"; - public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd"; - public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate"; - public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete"; + public static final String TRACKING_EVENT_NAME = "JobCatalogEvent"; public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded"; public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted"; public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated"; - @Getter private final ContextAwareGauge<Integer> numActiveJobs; - @Getter private final ContextAwareCounter numAddedJobs; - @Getter private final ContextAwareCounter numDeletedJobs; - @Getter private final ContextAwareCounter numUpdatedJobs; + private final MetricContext metricsContext; + @Getter private final AtomicLong totalAddedJobs; + @Getter private final AtomicLong totalDeletedJobs; + @Getter private final AtomicLong totalUpdatedJobs; @Getter private final ContextAwareTimer timeForJobCatalogGet; - @Getter private final ContextAwareHistogram histogramForJobAdd; - @Getter private final ContextAwareHistogram histogramForJobUpdate; - @Getter private final ContextAwareHistogram histogramForJobDelete; + @Getter private final ContextAwareGauge<Long> totalAddCalls; + @Getter private final ContextAwareGauge<Long> totalDeleteCalls; + @Getter private final ContextAwareGauge<Long> totalUpdateCalls; + @Getter private final ContextAwareGauge<Integer> numActiveJobs; public StandardMetrics(final JobCatalog jobCatalog) { - MetricContext context = jobCatalog.getMetricContext(); - this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES); - this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS); - this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS); - this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS); - this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{ + this.metricsContext = jobCatalog.getMetricContext(); + this.totalAddedJobs = new AtomicLong(0); + this.totalDeletedJobs = new AtomicLong(0); + this.totalUpdatedJobs = new AtomicLong(0); + + this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES); + this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get()); + this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get()); + this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get()); + this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{ long startTime = System.currentTimeMillis(); int size = jobCatalog.getJobs().size(); updateGetJobTime(startTime); return size; }); - this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES); - this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES); - this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES); } public void updateGetJobTime(long startTime) { @@ -111,8 +109,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable } @Override public void onAddJob(JobSpec addedJob) { - this.numAddedJobs.inc(); - this.histogramForJobAdd.update(1); + this.totalAddedJobs.incrementAndGet(); submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE); } @@ -130,41 +127,34 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable .put(GobblinMetricsKeys.JOB_SPEC_VERSION_META, jobSpecVersion) .build()) .build(); - this.numAddedJobs.getContext().submitEvent(e); + this.metricsContext.submitEvent(e); } @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { - this.numDeletedJobs.inc(); - this.histogramForJobDelete.update(1); + this.totalDeletedJobs.incrementAndGet(); submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE); } @Override public void onUpdateJob(JobSpec updatedJob) { - this.numUpdatedJobs.inc(); - this.histogramForJobUpdate.update(1); + this.totalUpdatedJobs.incrementAndGet(); submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE); } @Override public Collection<ContextAwareGauge<?>> getGauges() { - return Collections.singleton(this.numActiveJobs); + return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs); } @Override public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs); + return ImmutableList.of(); } @Override public Collection<ContextAwareTimer> getTimers() { return ImmutableList.of(timeForJobCatalogGet); } - - @Override - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate); - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index fb54139..a7e5878 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -37,17 +37,12 @@ public interface JobExecutionLauncher extends Instrumentable { StandardMetrics getMetrics(); public static class StandardMetrics { - public static final String NUM_JOBS_LAUNCHED_COUNTER = "numJobsLaunched"; - public static final String NUM_JOBS_COMPLETED_COUNTER = "numJobsCompleted"; - public static final String NUM_JOBS_COMMITTED_COUNTER = "numJobsCommitted"; - public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed"; - public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled"; - public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning"; - public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched"; - public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted"; - public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted"; - public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed"; - public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled"; + public static final String NUM_JOBS_LAUNCHED = "numJobsLaunched"; + public static final String NUM_JOBS_COMPLETED = "numJobsCompleted"; + public static final String NUM_JOBS_COMMITTED = "numJobsCommitted"; + public static final String NUM_JOBS_FAILED = "numJobsFailed"; + public static final String NUM_JOBS_CANCELLED = "numJobsCancelled"; + public static final String NUM_JOBS_RUNNING = "numJobsRunning"; public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion"; public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure"; @@ -70,12 +65,12 @@ public interface JobExecutionLauncher extends Instrumentable { @Getter private final ContextAwareGauge<Integer> numJobsRunning; public StandardMetrics(final JobExecutionLauncher parent) { - this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED_COUNTER); - this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED_COUNTER); - this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED_COUNTER); - this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED_COUNTER); - this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED_COUNTER); - this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING_GAUGE, + this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED); + this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED); + this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED); + this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED); + this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED); + this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING, new Gauge<Integer>() { @Override public Integer getValue() { return (int)(StandardMetrics.this.getNumJobsLaunched().getCount() - http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index 4b85ea9..6e8510f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -19,10 +19,9 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; -import com.codahale.metrics.Gauge; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,7 +31,6 @@ import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; @@ -62,42 +60,41 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr @Slf4j public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener { public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs"; - public static final String NUM_ADDED_SPECS = "numAddedSpecs"; - public static final String NUM_DELETED_SPECS = "numDeletedSpecs"; - public static final String NUM_UPDATED_SPECS = "numUpdatedSpecs"; + public static final String TOTAL_ADD_CALLS = "totalAddCalls"; + public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls"; + public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls"; public static final String TRACKING_EVENT_NAME = "SpecCatalogEvent"; public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded"; public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted"; public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated"; public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet"; - public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd"; - public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate"; - public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete"; + private final MetricContext metricsContext; + @Getter private final AtomicLong totalAddedSpecs; + @Getter private final AtomicLong totalDeletedSpecs; + @Getter private final AtomicLong totalUpdatedSpecs; + @Getter private final ContextAwareGauge<Long> totalAddCalls; + @Getter private final ContextAwareGauge<Long> totalDeleteCalls; + @Getter private final ContextAwareGauge<Long> totalUpdateCalls; @Getter private final ContextAwareGauge<Integer> numActiveSpecs; - @Getter private final ContextAwareCounter numAddedSpecs; - @Getter private final ContextAwareCounter numDeletedSpecs; - @Getter private final ContextAwareCounter numUpdatedSpecs; + @Getter private final ContextAwareTimer timeForSpecCatalogGet; - @Getter private final ContextAwareHistogram histogramForSpecAdd; - @Getter private final ContextAwareHistogram histogramForSpecUpdate; - @Getter private final ContextAwareHistogram histogramForSpecDelete; public StandardMetrics(final SpecCatalog specCatalog) { - MetricContext context = specCatalog.getMetricContext(); - this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES); - this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS); - this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS); - this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS); - this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{ + this.metricsContext = specCatalog.getMetricContext(); + this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES); + this.totalAddedSpecs = new AtomicLong(0); + this.totalDeletedSpecs = new AtomicLong(0); + this.totalUpdatedSpecs = new AtomicLong(0); + this.numActiveSpecs = metricsContext.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{ long startTime = System.currentTimeMillis(); int size = specCatalog.getSpecs().size(); updateGetSpecTime(startTime); return size; }); - this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES); - this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES); - this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES); + this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get()); + this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get()); + this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get()); } public void updateGetSpecTime(long startTime) { @@ -107,17 +104,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr @Override public Collection<ContextAwareGauge<?>> getGauges() { - return Collections.singleton(this.numActiveSpecs); + return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls); } @Override public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs); - } - - @Override - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate); + return ImmutableList.of(); } @Override @@ -126,8 +118,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr } @Override public void onAddSpec(Spec addedSpec) { - this.numAddedSpecs.inc(); - this.histogramForSpecAdd.update(1); + this.totalAddedSpecs.incrementAndGet(); submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE); } @@ -145,20 +136,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr .put(GobblinMetricsKeys.SPEC_VERSION_META, specSpecVersion) .build()) .build(); - this.numAddedSpecs.getContext().submitEvent(e); + this.metricsContext.submitEvent(e); } @Override public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { - this.numDeletedSpecs.inc(); - this.histogramForSpecDelete.update(1); + this.totalDeletedSpecs.incrementAndGet(); submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE); } @Override public void onUpdateSpec(Spec updatedSpec) { - this.numUpdatedSpecs.inc(); - this.histogramForSpecUpdate.update(1); + this.totalUpdatedSpecs.incrementAndGet(); submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java index 8d89b97..0f99235 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java @@ -63,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class); public static final String CONF_EXTENSION = ".conf"; private static final String FS_SCHEME = "FS"; - private final MutableStandardMetrics mutableMetrics; + protected final MutableStandardMetrics mutableMetrics; /** * Initialize the JobCatalog, fetch all jobs in jobConfDirPath. * @param sysConfig http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java index eebf6f9..76adca4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java @@ -77,11 +77,12 @@ public class NonObservingFSJobCatalog extends FSJobCatalog { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(jobSpec); try { + long startTime = System.currentTimeMillis(); Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri()); boolean isUpdate = fs.exists(jobSpecPath); materializedJobSpec(jobSpecPath, jobSpec, this.fs); - + this.mutableMetrics.updatePutJobTime(startTime); if (isUpdate) { this.listeners.onUpdateJob(jobSpec); } else { @@ -103,12 +104,13 @@ public class NonObservingFSJobCatalog extends FSJobCatalog { public synchronized void remove(URI jobURI) { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); try { + long startTime = System.currentTimeMillis(); JobSpec jobSpec = getJobSpec(jobURI); - Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI); if (fs.exists(jobSpecPath)) { fs.delete(jobSpecPath, false); + this.mutableMetrics.updateRemoveJobTime(startTime); } else { LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed."); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java index 5920f78..026c888 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java @@ -94,9 +94,9 @@ public class TestInMemoryJobCatalog { cat.put(js1_1); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 0); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 0); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME), @@ -109,9 +109,9 @@ public class TestInMemoryJobCatalog { cat.put(js1_2); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME), @@ -124,9 +124,9 @@ public class TestInMemoryJobCatalog { cat.put(js2); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME), @@ -139,9 +139,9 @@ public class TestInMemoryJobCatalog { cat.put(js1_3); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME), @@ -154,9 +154,9 @@ public class TestInMemoryJobCatalog { cat.remove(js2.getUri()); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME), @@ -169,15 +169,15 @@ public class TestInMemoryJobCatalog { cat.remove(new URI("test:dummy_job")); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1); cat.remove(js1_3.getUri()); Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 0); - Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2); - Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 2); + Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2); + Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 2); ma.assertEvent(Predicates.and( MetricsAssert.eqEventNamespace(JobCatalog.class.getName()), MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
