[GOBBLIN-419] Add more metrics for cluster job monitoring Closes #2296 from yukuai518/metrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5e6bfb07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5e6bfb07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5e6bfb07 Branch: refs/heads/0.12.0 Commit: 5e6bfb079cfdd3d2026e7cd674dd6673933437d3 Parents: 4c15fde Author: Kuai Yu <[email protected]> Authored: Wed Feb 28 09:04:36 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Feb 28 09:04:36 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 5 + .../gobblin/cluster/GobblinClusterManager.java | 31 ++--- .../cluster/GobblinHelixJobLauncher.java | 3 + .../cluster/GobblinHelixJobScheduler.java | 125 +++++++------------ .../gobblin/cluster/GobblinTaskRunner.java | 82 +++++++++--- .../cluster/GobblinTaskRunnerMetrics.java | 78 ++++++++++++ .../instrumented/StandardMetricsBridge.java | 52 ++++---- .../service/StreamingKafkaSpecConsumer.java | 44 ++----- .../apache/gobblin/runtime/TaskExecutor.java | 33 ++++- .../apache/gobblin/runtime/api/JobCatalog.java | 39 +++--- .../runtime/api/JobExecutionLauncher.java | 6 +- .../gobblin/runtime/api/MutableJobCatalog.java | 23 ++-- .../gobblin/runtime/api/MutableSpecCatalog.java | 23 ++-- .../apache/gobblin/runtime/api/SpecCatalog.java | 39 +++--- .../runtime/job_catalog/FSJobCatalog.java | 4 +- .../job_catalog/ImmutableFSJobCatalog.java | 2 +- .../runtime/job_catalog/JobCatalogBase.java | 12 +- .../runtime/spec_catalog/FlowCatalog.java | 2 +- .../runtime/spec_catalog/TopologyCatalog.java | 2 +- .../modules/core/GobblinServiceManager.java | 28 +---- 20 files changed, 359 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index c80ceaf..612fd8b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -169,6 +169,9 @@ public class ConfigurationKeys { public static final String WORK_UNIT_RETRY_POLICY_KEY = "workunit.retry.policy"; public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled"; + public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis"; + public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval"; + public static final String JOB_RUN_ONCE_KEY = "job.runonce"; public static final String JOB_DISABLED_KEY = "job.disabled"; public static final String JOB_JAR_FILES_KEY = "job.jars"; @@ -631,6 +634,8 @@ public class ConfigurationKeys { public static final String METRICS_REPORT_INTERVAL_KEY = METRICS_CONFIGURATIONS_PREFIX + "report.interval"; public static final String DEFAULT_METRICS_REPORT_INTERVAL = Long.toString(TimeUnit.SECONDS.toMillis(30)); public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name"; + public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes"; + public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15; // File-based reporting public static final String METRICS_REPORTING_FILE_ENABLED_KEY = http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 3393df6..d57c61e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -40,6 +40,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.hadoop.conf.Configuration; @@ -155,7 +156,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private GobblinHelixJobScheduler jobScheduler; @Getter private JobConfigurationManager jobConfigurationManager; - + private final String clusterName; private final Config config; private final MetricContext metricContext; @@ -165,7 +166,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri this.clusterName = clusterName; this.config = config; this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); - this.metrics = new Metrics(this.metricContext); + this.metrics = new Metrics(this.metricContext, this.config); this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE); @@ -557,21 +558,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config)); } - @Override - public List<Tag<?>> generateTags(State state) { - return ImmutableList.of(); - } - - @Override - public void switchMetricContext(List<Tag<?>> tags) { - throw new UnsupportedOperationException(); - } - - @Override - public void switchMetricContext(MetricContext context) { - throw new UnsupportedOperationException(); - } - /** * A custom implementation of {@link LiveInstanceChangeListener}. */ @@ -588,19 +574,16 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private class Metrics extends StandardMetrics { public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange"; private ContextAwareHistogram clusterLeadershipChange; - public Metrics(final MetricContext metricContext) { - clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES); + public Metrics(final MetricContext metricContext, final Config config) { + int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(clusterLeadershipChange); } @Override public String getName() { return GobblinClusterManager.class.getName(); } - - @Override - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(this.clusterLeadershipChange); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 62c9b3f..d502462 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -183,6 +183,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { @Override protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception { try { + long workUnitStartTime = System.currentTimeMillis(); + workUnits.forEach((k) -> k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, workUnitStartTime)); + // Start the output TaskState collector service this.taskStateCollectorService.startAsync().awaitRunning(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 141e3d1..48b12f2 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -28,13 +28,13 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; @@ -47,7 +47,7 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareCounter; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinMetrics; @@ -121,21 +121,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } @Override - public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { - return null; - } - - @Override - public void switchMetricContext(List<Tag<?>> tags) { - throw new UnsupportedOperationException(); - } - - @Override - public void switchMetricContext(MetricContext context) { - throw new UnsupportedOperationException(); - } - - @Override public StandardMetrics getStandardMetrics() { return metrics; } @@ -147,29 +132,25 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final AtomicLong totalJobsCommitted; private final AtomicLong totalJobsFailed; private final AtomicLong totalJobsCancelled; - private final ContextAwareGauge<Long> numJobsLaunched; - private final ContextAwareGauge<Long> numJobsCompleted; - private final ContextAwareGauge<Long> numJobsCommitted; - private final ContextAwareGauge<Long> numJobsFailed; - private final ContextAwareGauge<Long> numJobsCancelled; - private final ContextAwareGauge<Integer> numJobsRunning; - - private final ContextAwareTimer timeForJobCompletion; - private final ContextAwareTimer timeForJobFailure; + + private final ContextAwareTimer timeForCompletedJobs; + private final ContextAwareTimer timeForFailedJobs; + private final ContextAwareTimer timeForCommittedJobs; private final ContextAwareTimer timeBeforeJobScheduling; private final ContextAwareTimer timeBeforeJobLaunching; + private final ContextAwareTimer timeBetwenJobSchedulingAndLaunching; private final ThreadPoolExecutor threadPoolExecutor; - private final ContextAwareGauge<Integer> executorActiveCount; - private final ContextAwareGauge<Integer> executorMaximumPoolSize; - private final ContextAwareGauge<Integer> executorPoolSize; - private final ContextAwareGauge<Integer> executorCorePoolSize; - private final ContextAwareGauge<Integer> executorQueueSize; public Metrics(final MetricContext metricContext) { // Thread executor reference from job scheduler this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor; + // timer duration setup + int windowSize = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(GobblinHelixJobScheduler.this.properties), + ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + // All historical counters this.totalJobsLaunched = new AtomicLong(0); this.totalJobsCompleted = new AtomicLong(0); @@ -177,25 +158,34 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.totalJobsFailed = new AtomicLong(0); this.totalJobsCancelled = new AtomicLong(0); - this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get()); - this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get()); - this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get()); - this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get()); - this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get()); - this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING, - ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get())); - - this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES); - this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES); - this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES); - this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING, + ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()))); + + this.timeForCompletedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS, windowSize, TimeUnit.MINUTES); + this.timeForFailedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS, windowSize, TimeUnit.MINUTES); + this.timeForCommittedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS, windowSize, TimeUnit.MINUTES); + this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, windowSize, TimeUnit.MINUTES); + this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, windowSize, TimeUnit.MINUTES); + this.timeBetwenJobSchedulingAndLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING, windowSize, TimeUnit.MINUTES); // executor metrics - this.executorActiveCount = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount()); - this.executorMaximumPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize()); - this.executorPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize()); - this.executorCorePoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize()); - this.executorQueueSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size()); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size())); + + this.contextAwareMetrics.add(timeForCommittedJobs); + this.contextAwareMetrics.add(timeForCompletedJobs); + this.contextAwareMetrics.add(timeForFailedJobs); + this.contextAwareMetrics.add(timeBeforeJobScheduling); + this.contextAwareMetrics.add(timeBeforeJobLaunching); + this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching); } private void updateTimeBeforeJobScheduling (Properties jobConfig) { @@ -208,36 +198,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS); } - @Override - public String getName() { - return GobblinHelixJobScheduler.class.getName(); - } - - @Override - public Collection<ContextAwareGauge<?>> getGauges() { - List<ContextAwareGauge<?>> list = Lists.newArrayList(); - list.add(numJobsRunning); - list.add(numJobsLaunched); - list.add(numJobsCompleted); - list.add(numJobsCommitted); - list.add(numJobsFailed); - list.add(numJobsCancelled); - list.add(executorActiveCount); - list.add(executorMaximumPoolSize); - list.add(executorPoolSize); - list.add(executorCorePoolSize); - list.add(executorQueueSize); - return list; + private void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) { + Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), launchingTime - scheduledTime, TimeUnit.MILLISECONDS); } @Override - public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(); - } - - @Override - public Collection<ContextAwareTimer> getTimers() { - return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching); + public String getName() { + return GobblinHelixJobScheduler.class.getName(); } } @@ -265,12 +232,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe long startTime = jobContext.getJobState().getPropAsLong(START_TIME); if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { metrics.totalJobsCompleted.incrementAndGet(); - Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + Instrumented.updateTimer(Optional.of(metrics.timeForCompletedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { metrics.totalJobsFailed.incrementAndGet(); - Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + Instrumented.updateTimer(Optional.of(metrics.timeForFailedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } else { metrics.totalJobsCommitted.incrementAndGet(); + Instrumented.updateTimer(Optional.of(metrics.timeForCommittedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } } } @@ -382,17 +350,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final String jobUri; private final Properties jobConfig; private final JobListener jobListener; + private final Long creationTimeInMillis; public NonScheduledJobRunner(String jobUri, Properties jobConfig, JobListener jobListener) { this.jobUri = jobUri; this.jobConfig = jobConfig; this.jobListener = jobListener; + this.creationTimeInMillis = System.currentTimeMillis(); } @Override public void run() { try { ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig); + ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener); // remove non-scheduled job catalog once done so it won't be re-executed http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index a3fddab..3ec40dc 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -36,6 +36,11 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -81,6 +86,9 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.PathUtils; +import javax.annotation.Nonnull; +import lombok.Getter; + import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; @@ -109,7 +117,7 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER * @author Yinan Li */ @Alpha -public class GobblinTaskRunner { +public class GobblinTaskRunner implements StandardMetricsBridge { private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class); static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); @@ -141,6 +149,8 @@ public class GobblinTaskRunner { private final String applicationName; private final String applicationId; private final Path appWorkPath; + private final MetricContext metricContext; + private final StandardMetricsBridge.StandardMetrics metrics; public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) @@ -160,8 +170,10 @@ public class GobblinTaskRunner { initHelixManager(); this.containerMetrics = buildContainerMetrics(); - - this.taskStateModelFactory = registerHelixTaskFactory(); + TaskFactoryBuilder builder = new TaskFactoryBuilder(this.config); + this.taskStateModelFactory = createTaskStateModelFactory(builder.build()); + this.metrics = builder.getTaskMetrics(); + this.metricContext = builder.getMetricContext(); services.addAll(getServices()); if (services.isEmpty()) { @@ -174,6 +186,38 @@ public class GobblinTaskRunner { applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional); } + private class TaskFactoryBuilder { + private final boolean isRunTaskInSeparateProcessEnabled; + private final TaskFactory taskFactory; + @Getter + private final MetricContext metricContext; + @Getter + private StandardMetricsBridge.StandardMetrics taskMetrics; + + public TaskFactoryBuilder(Config config) { + isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(config); + metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); + if (isRunTaskInSeparateProcessEnabled) { + logger.info("Running a task in a separate process is enabled."); + taskFactory = new HelixTaskFactory(GobblinTaskRunner.this.containerMetrics, CLUSTER_CONF_PATH, config); + taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics(); + } else { + Properties properties = ConfigUtils.configToProperties(config); + TaskExecutor taskExecutor = new TaskExecutor(properties); + taskFactory = getInProcessTaskFactory(taskExecutor); + taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); + } + } + + public TaskFactory build(){ + return taskFactory; + } + + private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) { + return ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false); + } + } + private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) { return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId); @@ -189,19 +233,10 @@ public class GobblinTaskRunner { this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); } - private TaskStateModelFactory registerHelixTaskFactory() { + private TaskStateModelFactory createTaskStateModelFactory(TaskFactory factory) { Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); - boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(); - TaskFactory taskFactory; - if (isRunTaskInSeparateProcessEnabled) { - logger.info("Running a task in a separate process is enabled."); - taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH, config); - } else { - taskFactory = getInProcessTaskFactory(); - } - - taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory); + taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, factory); TaskStateModelFactory taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine() @@ -209,14 +244,13 @@ public class GobblinTaskRunner { return taskStateModelFactory; } - private TaskFactory getInProcessTaskFactory() { + private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor) { Properties properties = ConfigUtils.configToProperties(this.config); URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri(); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(rootPathUri.toString())); - TaskExecutor taskExecutor = new TaskExecutor(properties); TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); services.add(taskExecutor); @@ -385,6 +419,22 @@ public class GobblinTaskRunner { } } + @Override + public StandardMetrics getStandardMetrics() { + return this.metrics; + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return GobblinMetrics.isEnabled(this.config); + } + /** * A custom {@link MessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages * of type "SHUTDOWN" for shutting down the participants. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java new file mode 100644 index 0000000..51e8b36 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import com.codahale.metrics.Metric; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareMetric; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.runtime.TaskExecutor; + + +public class GobblinTaskRunnerMetrics { + + static class InProcessTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics { + private TaskExecutor taskExecutor; + private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount"; + private static String HISTORICAL_QUEUED_TASK_COUNT = "historicalQueuedTaskCount"; + private static String QUEUED_TASK_COUNT = "queuedTaskCount"; + private static String CURRENT_QUEUED_TASK_TOTAL_TIME = "currentQueuedTaskTotalTime"; + private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME = "historicalQueuedTaskTotalTime"; + private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime"; + private static String FAILED_TASK_COUNT = "failedTaskCount"; + private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount"; + private static String RUNNING_TASK_COUNT = "runningTaskCount"; + + public InProcessTaskRunnerMetrics (TaskExecutor executor, MetricContext context) { + taskExecutor = executor; + contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, ()->this.taskExecutor.getCurrentQueuedTaskCount().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT, ()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT, ()->this.taskExecutor.getQueuedTaskCount().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getQueuedTaskTotalTime().longValue())); + contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, ()->this.taskExecutor.getFailedTaskCount().getCount())); + contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, ()->this.taskExecutor.getSuccessfulTaskCount().getCount())); + contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, ()->this.taskExecutor.getRunningTaskCount().getCount())); + + this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, this.taskExecutor.getTaskCreateAndRunTimer()); + } + + @Override + public String getName() { + return InProcessTaskRunnerMetrics.class.getName(); + } + } + + static class JvmTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics { + //TODO: add metrics to monitor the process execution status (will be revisited after process isolation work is done) + @Override + public String getName() { + return JvmTaskRunnerMetrics.class.getName(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java index 3993dce..355139b 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java @@ -18,15 +18,18 @@ package org.apache.gobblin.instrumented; import java.util.Collection; +import java.util.List; +import java.util.Map; -import org.apache.gobblin.metrics.ContextAwareCounter; -import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareHistogram; -import org.apache.gobblin.metrics.ContextAwareMeter; -import org.apache.gobblin.metrics.ContextAwareTimer; - +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.gobblin.metrics.ContextAwareMetric; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; /** * This interface indicates a class will expose its metrics to some external systems. @@ -35,30 +38,37 @@ public interface StandardMetricsBridge extends Instrumentable { StandardMetrics getStandardMetrics(); - public class StandardMetrics { + default void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } - public String getName() { - return this.getClass().getName(); - } + default void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } - public Collection<ContextAwareGauge<?>> getGauges() { - return ImmutableList.of(); - } + default List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { + return ImmutableList.of(); + } + + public class StandardMetrics implements MetricSet { + protected final List<ContextAwareMetric> contextAwareMetrics; + protected final Map<String, Metric> rawMetrics; - public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(); + public StandardMetrics() { + this.contextAwareMetrics = Lists.newArrayList(); + this.rawMetrics = Maps.newHashMap(); } - public Collection<ContextAwareMeter> getMeters() { - return ImmutableList.of(); + public String getName() { + return this.getClass().getName(); } - public Collection<ContextAwareTimer> getTimers() { - return ImmutableList.of(); + public Collection<ContextAwareMetric> getContextAwareMetrics() { + return contextAwareMetrics; } - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(); + public Map<String, Metric> getMetrics() { + return rawMetrics; } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java index 5fd5413..6d8de39 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; @@ -199,12 +200,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S } private class Metrics extends StandardMetricsBridge.StandardMetrics { - private ContextAwareGauge<Integer> jobSpecQueueSize; - private ContextAwareGauge<Long> jobSpecEnq; - private ContextAwareGauge<Long> jobSpecDeq; - private ContextAwareGauge<Long> jobSpecConsumed; - private ContextAwareGauge<Long> jobSpecParseFailures; - private AtomicLong jobSpecEnqCount = new AtomicLong(0); private AtomicLong jobSpecDeqCount = new AtomicLong(0); @@ -215,12 +210,12 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures"; public Metrics(MetricContext context) { - this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()); - this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()); - this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()); - this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED, - ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()); - this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()); + this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size())); + this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get())); + this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get())); + this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED, + ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures())); + this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures())); } private long getNewSpecs() { @@ -237,16 +232,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null? StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0; } - - public Collection<ContextAwareGauge<?>> getGauges() { - List list = Lists.newArrayList(); - list.add(jobSpecQueueSize); - list.add(jobSpecEnq); - list.add(jobSpecDeq); - list.add(jobSpecConsumed); - list.add(jobSpecParseFailures); - return list; - } } @Override @@ -264,19 +249,4 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S public boolean isInstrumentationEnabled() { return _isInstrumentedEnabled; } - - @Override - public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { - return ImmutableList.of(); - } - - @Override - public void switchMetricContext(List<Tag<?>> tags) { - throw new UnsupportedOperationException(); - } - - @Override - public void switchMetricContext(MetricContext context) { - throw new UnsupportedOperationException(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java index be78275..b868893 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java @@ -39,6 +39,8 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -75,12 +77,15 @@ public class TaskExecutor extends AbstractIdleService { private final ExecutorService forkExecutor; // Task retry interval + @Getter private final long retryIntervalInSeconds; // The maximum number of items in the queued task time map. + @Getter private final int queuedTaskTimeMaxSize; // The maximum age of the items in the queued task time map. + @Getter private final long queuedTaskTimeMaxAge ; // Map of queued task ids to queue times. The key is the task id, the value is the time the task was queued. If the @@ -96,32 +101,44 @@ public class TaskExecutor extends AbstractIdleService { private long lastCalculationTime = 0; // The total number of tasks currently queued and queued over the historical lookback period. + @Getter private AtomicInteger queuedTaskCount = new AtomicInteger(); // The total number of tasks currently queued. + @Getter private AtomicInteger currentQueuedTaskCount = new AtomicInteger(); // The total number of tasks queued over the historical lookback period. + @Getter private AtomicInteger historicalQueuedTaskCount = new AtomicInteger(); // The total time tasks have currently been in the queue and were in the queue during the historical lookback period. + @Getter private AtomicLong queuedTaskTotalTime = new AtomicLong(); // The total time tasks have currently been in the queue. + @Getter private AtomicLong currentQueuedTaskTotalTime = new AtomicLong(); // The total time tasks have been in the queue during the historical lookback period. + @Getter private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong(); // Count of running tasks. + @Getter private final Counter runningTaskCount = new Counter(); // Count of failed tasks. + @Getter private final Meter successfulTaskCount = new Meter(); // Count of failed tasks. + @Getter private final Meter failedTaskCount = new Meter(); + @Getter + private final Timer taskCreateAndRunTimer; + // The metric set exposed from the task executor. private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet(); @@ -129,7 +146,7 @@ public class TaskExecutor extends AbstractIdleService { * Constructor used internally. */ private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds, - int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) { + int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge, int timerWindowSize) { Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive"); Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive"); Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive"); @@ -143,6 +160,7 @@ public class TaskExecutor extends AbstractIdleService { this.retryIntervalInSeconds = retryIntervalInSeconds; this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize; this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge; + this.taskCreateAndRunTimer = new Timer(new SlidingTimeWindowReservoir(timerWindowSize, TimeUnit.MINUTES)); this.forkExecutor = ExecutorsUtils.loggingDecorator( new ThreadPoolExecutor( @@ -175,7 +193,9 @@ public class TaskExecutor extends AbstractIdleService { Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE, Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))), Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE, - Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE)))); + Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))), + Integer.parseInt(properties.getProperty(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + Integer.toString(ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES)))); } /** @@ -191,7 +211,9 @@ public class TaskExecutor extends AbstractIdleService { conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE, ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE), conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE, - ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE)); + ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE), + conf.getInt(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES)); } @Override @@ -430,7 +452,12 @@ public class TaskExecutor extends AbstractIdleService { private void onStart(long startTime) { Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId()); + long workUnitCreationTime = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, 0); long timeInQueue = startTime - queueTime; + long timeSinceWorkUnitCreation = startTime - workUnitCreationTime; + + taskCreateAndRunTimer.update(timeSinceWorkUnitCreation, TimeUnit.MILLISECONDS); + LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", underlyingTask.getTaskId(), timeInQueue)); queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue); runningTaskCount.inc(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index 42ecef3..6afa94a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -18,23 +18,27 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.util.ConfigUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -76,6 +80,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated"; private final MetricContext metricsContext; + protected final int timeWindowSizeInMinutes; @Getter private final AtomicLong totalAddedJobs; @Getter private final AtomicLong totalDeletedJobs; @Getter private final AtomicLong totalUpdatedJobs; @@ -85,17 +90,28 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable @Getter private final ContextAwareGauge<Long> totalUpdateCalls; @Getter private final ContextAwareGauge<Integer> numActiveJobs; - public StandardMetrics(final JobCatalog jobCatalog) { + public StandardMetrics(final JobCatalog jobCatalog, Optional<Config> sysConfig) { + // timer window size + this.timeWindowSizeInMinutes = sysConfig.isPresent()? + ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) : + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES; + this.metricsContext = jobCatalog.getMetricContext(); this.totalAddedJobs = new AtomicLong(0); this.totalDeletedJobs = new AtomicLong(0); this.totalUpdatedJobs = new AtomicLong(0); - this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES); + this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES); this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get()); this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get()); this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get()); this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->(int)(totalAddedJobs.get() - totalDeletedJobs.get())); + + this.contextAwareMetrics.add(timeForJobCatalogGet); + this.contextAwareMetrics.add(totalAddCalls); + this.contextAwareMetrics.add(totalDeleteCalls); + this.contextAwareMetrics.add(totalUpdateCalls); + this.contextAwareMetrics.add(numActiveJobs); } public void updateGetJobTime(long startTime) { @@ -136,20 +152,5 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable this.totalUpdatedJobs.incrementAndGet(); submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE); } - - @Override - public Collection<ContextAwareGauge<?>> getGauges() { - return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs); - } - - @Override - public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(); - } - - @Override - public Collection<ContextAwareTimer> getTimers() { - return ImmutableList.of(timeForJobCatalogGet); - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index 3f50ee7..7bc9cc0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -44,10 +44,12 @@ public interface JobExecutionLauncher extends Instrumentable { public static final String NUM_JOBS_CANCELLED = "numJobsCancelled"; public static final String NUM_JOBS_RUNNING = "numJobsRunning"; - public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion"; - public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure"; + public static final String TIMER_FOR_COMPLETED_JOBS = "timeForCompletedJobs"; + public static final String TIMER_FOR_FAILED_JOBS = "timeForFailedJobs"; + public static final String TIMER_FOR_COMMITTED_JOBS = "timerForCommittedJobs"; public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling"; public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching"; + public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = "timerBetwenJobSchedulingAndLaunching"; public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount"; public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java index 8b6e98c..57dfce5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java @@ -22,10 +22,14 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.util.ConfigUtils; import com.google.common.base.Optional; +import com.typesafe.config.Config; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -55,10 +59,12 @@ public interface MutableJobCatalog extends JobCatalog { public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut"; @Getter private final ContextAwareTimer timeForJobCatalogPut; @Getter private final ContextAwareTimer timeForJobCatalogRemove; - public MutableStandardMetrics(JobCatalog catalog) { - super(catalog); - timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES); - timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES); + public MutableStandardMetrics(JobCatalog catalog, Optional<Config> sysConfig) { + super(catalog, sysConfig); + timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, timeWindowSizeInMinutes, TimeUnit.MINUTES); + timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(timeForJobCatalogPut); + this.contextAwareMetrics.add(timeForJobCatalogRemove); } public void updatePutJobTime(long startTime) { @@ -70,14 +76,5 @@ public interface MutableJobCatalog extends JobCatalog { log.info("updateRemoveJobTime..."); Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - - @Override - public Collection<ContextAwareTimer> getTimers() { - Collection<ContextAwareTimer> all = new ArrayList<>(); - all.addAll(super.getTimers()); - all.add(this.timeForJobCatalogPut); - all.add(this.timeForJobCatalogRemove); - return all; - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java index 3aa16be..108a324 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java @@ -22,10 +22,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.util.ConfigUtils; import com.google.common.base.Optional; +import com.typesafe.config.Config; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -55,10 +59,12 @@ public interface MutableSpecCatalog extends SpecCatalog { public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut"; @Getter private final ContextAwareTimer timeForSpecCatalogPut; @Getter private final ContextAwareTimer timeForSpecCatalogRemove; - public MutableStandardMetrics(SpecCatalog catalog) { - super(catalog); - timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES); - timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES); + public MutableStandardMetrics(SpecCatalog catalog, Optional<Config> sysConfig) { + super(catalog, sysConfig); + timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, this.timeWindowSizeInMinutes, TimeUnit.MINUTES); + timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(timeForSpecCatalogPut); + this.contextAwareMetrics.add(timeForSpecCatalogRemove); } public void updatePutSpecTime(long startTime) { @@ -70,14 +76,5 @@ public interface MutableSpecCatalog extends SpecCatalog { log.info("updateRemoveSpecTime..."); Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - - @Override - public Collection<ContextAwareTimer> getTimers() { - Collection<ContextAwareTimer> all = new ArrayList<>(); - all.addAll(super.getTimers()); - all.add(this.timeForSpecCatalogPut); - all.add(this.timeForSpecCatalogRemove); - return all; - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index 6e8510f..457be9a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -19,21 +19,25 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.util.ConfigUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -70,6 +74,8 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet"; private final MetricContext metricsContext; + protected final int timeWindowSizeInMinutes; + @Getter private final AtomicLong totalAddedSpecs; @Getter private final AtomicLong totalDeletedSpecs; @Getter private final AtomicLong totalUpdatedSpecs; @@ -80,9 +86,13 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr @Getter private final ContextAwareTimer timeForSpecCatalogGet; - public StandardMetrics(final SpecCatalog specCatalog) { + public StandardMetrics(final SpecCatalog specCatalog, Optional<Config> sysConfig) { + this.timeWindowSizeInMinutes = sysConfig.isPresent()? + ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) : + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES; + this.metricsContext = specCatalog.getMetricContext(); - this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES); + this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES); this.totalAddedSpecs = new AtomicLong(0); this.totalDeletedSpecs = new AtomicLong(0); this.totalUpdatedSpecs = new AtomicLong(0); @@ -95,6 +105,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get()); this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get()); this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get()); + + this.contextAwareMetrics.add(numActiveSpecs); + this.contextAwareMetrics.add(totalAddCalls); + this.contextAwareMetrics.add(totalUpdateCalls); + this.contextAwareMetrics.add(totalDeleteCalls); + this.contextAwareMetrics.add(timeForSpecCatalogGet); } public void updateGetSpecTime(long startTime) { @@ -102,21 +118,6 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - @Override - public Collection<ContextAwareGauge<?>> getGauges() { - return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls); - } - - @Override - public Collection<ContextAwareCounter> getCounters() { - return ImmutableList.of(); - } - - @Override - public Collection<ContextAwareTimer> getTimers() { - return ImmutableList.of(this.timeForSpecCatalogGet); - } - @Override public void onAddSpec(Spec addedSpec) { this.totalAddedSpecs.incrementAndGet(); submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java index 0f99235..f48b42c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java @@ -87,9 +87,9 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat } @Override - protected JobCatalog.StandardMetrics createStandardMetrics() { + protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config> sysConfig) { log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName()); - return new MutableStandardMetrics(this); + return new MutableStandardMetrics(this, sysConfig); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java index 7162c81..626cac2 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java @@ -102,7 +102,7 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog public ImmutableFSJobCatalog(Config sysConfig, PathAlterationObserver observer, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) throws IOException { - super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled); + super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled, Optional.of(sysConfig)); this.sysConfig = sysConfig; ConfigAccessor cfgAccessor = new ConfigAccessor(this.sysConfig); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java index d40c962..dcb0723 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; @@ -63,13 +64,18 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) { + this(log, parentMetricContext, instrumentationEnabled, Optional.absent()); + } + + public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext, + boolean instrumentationEnabled, Optional<Config> sysConfig) { this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); this.listeners = new JobCatalogListenersList(log); if (instrumentationEnabled) { MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build(); - this.metrics = createStandardMetrics(); + this.metrics = createStandardMetrics(sysConfig); this.addListener(this.metrics); } else { @@ -78,8 +84,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC } } - protected StandardMetrics createStandardMetrics() { - return new StandardMetrics(this); + protected StandardMetrics createStandardMetrics(Optional<Config> sysConfig) { + return new StandardMetrics(this, sysConfig); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 482825f..a91baed 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -87,7 +87,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build(); - this.metrics = new MutableStandardMetrics(this); + this.metrics = new MutableStandardMetrics(this, Optional.of(config)); this.addListener(this.metrics); } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java index c334d2b..5c25a67 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java @@ -94,7 +94,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build(); - this.metrics = new SpecCatalog.StandardMetrics(this); + this.metrics = new SpecCatalog.StandardMetrics(this, Optional.of(config)); this.addListener(this.metrics); } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 2cbb113..3137c21 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -21,6 +21,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.service.FlowId; @@ -155,7 +156,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri } this.config = config; this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); - this.metrics = new Metrics(this.metricContext); + this.metrics = new Metrics(this.metricContext, this.config); this.serviceId = serviceId; this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName); @@ -463,31 +464,14 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri return false; } - @Override - public List<Tag<?>> generateTags(State state) { - return null; - } - - @Override - public void switchMetricContext(List<Tag<?>> tags) { - throw new UnsupportedOperationException(); - } - - @Override - public void switchMetricContext(MetricContext context) { - throw new UnsupportedOperationException(); - } - private class Metrics extends StandardMetrics { public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange"; private ContextAwareHistogram serviceLeadershipChange; - public Metrics(final MetricContext metricContext) { - serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES); - } - @Override - public Collection<ContextAwareHistogram> getHistograms() { - return ImmutableList.of(this.serviceLeadershipChange); + public Metrics(final MetricContext metricContext, Config config) { + int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(this.serviceLeadershipChange); } }
