Repository: incubator-gobblin Updated Branches: refs/heads/master d0ece1a04 -> dad2a8c2a
[GOBBLIN-202] Added JMX reporting of task execution queue size and time. This can be used as a custom metric to autoscale the cluster Closes #2039 from kadaan/JMX_metrics_for_Task_Executor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dad2a8c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dad2a8c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dad2a8c2 Branch: refs/heads/master Commit: dad2a8c2a75e2b76edf0ee74195c4ba8e6a13f90 Parents: d0ece1a Author: Joel Baranick <[email protected]> Authored: Tue Aug 15 12:30:19 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 15 12:30:19 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 5 + .../gobblin/cluster/GobblinTaskRunner.java | 5 +- .../apache/gobblin/runtime/TaskExecutor.java | 233 ++++++++++++++++++- .../runtime/services/JMXReportingService.java | 13 ++ 4 files changed, 244 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 36921be..dd386ef 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -176,6 +176,11 @@ public class ConfigurationKeys { public static final boolean DEFAULT_CLEANUP_STAGING_DATA_PER_TASK = true; public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer"; + public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size"; + public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048; + public static final String QUEUED_TASK_TIME_MAX_AGE = "taskexecutor.queued_task_time.history.max_age"; + public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1); + /** Optional, for user to specified which template to use, inside .job file */ public static final String JOB_TEMPLATE_PATH = "job.template" ; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 838abe6..3ddbf55 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -60,6 +60,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; @@ -69,6 +70,7 @@ import com.google.common.util.concurrent.ServiceManager; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -159,7 +161,8 @@ public class GobblinTaskRunner { Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, applicationId); - List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, new JMXReportingService()); + List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, + new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet()))); services.addAll(getServices()); this.serviceManager = new ServiceManager(services); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java index 2abf131..28ea378 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java @@ -17,7 +17,13 @@ package org.apache.gobblin.runtime; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Properties; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -25,22 +31,32 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.gobblin.runtime.fork.Fork; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.runtime.fork.Fork; import org.apache.gobblin.util.ExecutorsUtils; import lombok.Getter; +import static com.codahale.metrics.MetricRegistry.name; + /** * A class for executing {@link Task}s and retrying failed ones as well as for executing {@link Fork}s. @@ -61,12 +77,63 @@ public class TaskExecutor extends AbstractIdleService { // Task retry interval private final long retryIntervalInSeconds; + // The maximum number of items in the queued task time map. + private final int queuedTaskTimeMaxSize; + + // The maximum age of the items in the queued task time map. + private final long queuedTaskTimeMaxAge ; + + // Map of queued task ids to queue times. The key is the task id, the value is the time the task was queued. If the + // task is being retried, the time may be in the future. Entries with time in the future will not be counted as + // queued until the time is in the past. + private final Map<String, Long> queuedTasks = Maps.newConcurrentMap(); + + // Set of historical queued task times. The key is the UTC epoch time the task started, the value is the milliseconds + // the task waited to start. + private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>(); + + // The timestamp for the last time the metric source data was pruned. + private long lastCleanupTime = 0; + + // The total number of tasks currently queued and queued over the historical lookback period. + private AtomicInteger queuedTaskCount = new AtomicInteger(); + + // The total number of tasks currently queued. + private AtomicInteger currentQueuedTaskCount = new AtomicInteger(); + + // The total number of tasks queued over the historical lookback period. + private AtomicInteger historicalQueuedTaskCount = new AtomicInteger(); + + // The total time tasks have currently been in the queue and were in the queue during the historical lookback period. + private AtomicLong queuedTaskTotalTime = new AtomicLong(); + + // The total time tasks have currently been in the queue. + private AtomicLong currentQueuedTaskTotalTime = new AtomicLong(); + + // The total time tasks have been in the queue during the historical lookback period. + private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong(); + + // Count of running tasks. + private final Counter runningTaskCount = new Counter(); + + // Count of failed tasks. + private final Meter successfulTaskCount = new Meter(); + + // Count of failed tasks. + private final Meter failedTaskCount = new Meter(); + + // The metric set exposed from the task executor. + private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet(); + /** * Constructor used internally. */ - private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds) { + private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds, + int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) { Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive"); Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive"); + Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive"); + Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive"); // Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later. this.taskExecutor = Executors.newScheduledThreadPool( @@ -74,6 +141,8 @@ public class TaskExecutor extends AbstractIdleService { ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))); this.retryIntervalInSeconds = retryIntervalInSeconds; + this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize; + this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge; this.forkExecutor = new ThreadPoolExecutor( // The core thread pool size is equal to that of the task executor as there's at least one fork per task @@ -100,7 +169,11 @@ public class TaskExecutor extends AbstractIdleService { Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY, Integer.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE))), Long.parseLong(properties.getProperty(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY, - Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC)))); + Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))), + Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE, + Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))), + Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE, + Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE)))); } /** @@ -112,7 +185,11 @@ public class TaskExecutor extends AbstractIdleService { conf.getInt(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY, ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE), conf.getLong(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY, - ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC)); + ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC), + conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE, + ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE), + conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE, + ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE)); } @Override @@ -145,7 +222,7 @@ public class TaskExecutor extends AbstractIdleService { */ public void execute(Task task) { LOG.info(String.format("Executing task %s", task.getTaskId())); - this.taskExecutor.execute(task); + this.taskExecutor.execute(new TrackingTask(task)); } /** @@ -156,7 +233,7 @@ public class TaskExecutor extends AbstractIdleService { */ public Future<?> submit(Task task) { LOG.info(String.format("Submitting task %s", task.getTaskId())); - return this.taskExecutor.submit(task); + return this.taskExecutor.submit(new TrackingTask(task)); } /** @@ -196,8 +273,142 @@ public class TaskExecutor extends AbstractIdleService { // Task retry interval increases linearly with number of retries long interval = task.getRetryCount() * this.retryIntervalInSeconds; // Schedule the retry of the failed task - this.taskExecutor.schedule(task, interval, TimeUnit.SECONDS); + this.taskExecutor.schedule(new TrackingTask(task, interval, TimeUnit.SECONDS), interval, TimeUnit.SECONDS); LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), interval)); task.incrementRetryCount(); } + + public MetricSet getTaskExecutorQueueMetricSet() { + return this.metricSet; + } + + private synchronized void cleanupMetricSources() { + long currentTimeMillis = System.currentTimeMillis(); + if (lastCleanupTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) { + int currentQueuedTaskCount = 0; + long currentQueuedTaskTotalTime = 0; + for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) { + if (queuedTask.getValue() <= currentTimeMillis) { + currentQueuedTaskCount++; + currentQueuedTaskTotalTime += queuedTask.getValue(); + } + } + this.currentQueuedTaskCount.set(currentQueuedTaskCount); + this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime); + + int historicalQueuedTaskCount = 0; + long historicalQueuedTaskTotalTime = 0; + long cutoff = currentTimeMillis - queuedTaskTimeMaxAge; + Iterator<Map.Entry<Long, Long>> iterator = queuedTaskTimeHistorical.descendingMap().entrySet().iterator(); + while (iterator.hasNext()) { + try { + Map.Entry<Long, Long> historicalQueuedTask = iterator.next(); + if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= queuedTaskTimeMaxSize) { + iterator.remove(); + } else { + historicalQueuedTaskCount++; + historicalQueuedTaskTotalTime += historicalQueuedTask.getValue(); + } + } catch (NoSuchElementException e) { + LOG.warn("Ran out of items in historical task queue time set."); + } + } + this.historicalQueuedTaskCount.set(historicalQueuedTaskCount); + this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime); + + this.queuedTaskCount.set(currentQueuedTaskCount + historicalQueuedTaskCount); + this.queuedTaskTotalTime.set(currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime); + + this.lastCleanupTime = currentTimeMillis; + } else { + LOG.debug("Skipped cleanup of metrics sources because not enough time has passed since last cleanup."); + } + } + + private class TaskExecutorQueueMetricSet implements MetricSet { + @Override + public Map<String, Metric> getMetrics() { + final Map<String, Metric> metrics = new HashMap<>(); + metrics.put(name("queued", "current", "count"), new Gauge<Integer>() { + @Override + public Integer getValue() { + cleanupMetricSources(); + return currentQueuedTaskCount.intValue(); + } + }); + metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() { + @Override + public Integer getValue() { + cleanupMetricSources(); + return historicalQueuedTaskCount.intValue(); + } + }); + metrics.put(name("queued", "count"), new Gauge<Integer>() { + @Override + public Integer getValue() { + cleanupMetricSources(); + return queuedTaskCount.intValue(); + } + }); + metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() { + @Override + public Long getValue() { + cleanupMetricSources(); + return currentQueuedTaskTotalTime.longValue(); + } + }); + metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>() { + @Override + public Long getValue() { + cleanupMetricSources(); + return historicalQueuedTaskTotalTime.longValue(); + } + }); + metrics.put(name("queued", "time", "total"), new Gauge<Long>() { + @Override + public Long getValue() { + cleanupMetricSources(); + return queuedTaskTotalTime.longValue(); + } + }); + metrics.put(name("running", "count"), runningTaskCount); + metrics.put(name("successful", "count"), successfulTaskCount); + metrics.put(name("failed", "count"), failedTaskCount); + return Collections.unmodifiableMap(metrics); + } + } + + private class TrackingTask implements Runnable { + private Task underlyingTask; + + public TrackingTask(Task task) { + this(task, 0, TimeUnit.SECONDS); + } + + public TrackingTask(Task task, long interval, TimeUnit timeUnit) { + queuedTasks.putIfAbsent(task.getTaskId(), System.currentTimeMillis() + timeUnit.toMillis(interval)); + this.underlyingTask = task; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + onStart(startTime); + try { + this.underlyingTask.run(); + successfulTaskCount.mark();; + } catch (Exception e) { + failedTaskCount.mark(); + throw e; + } finally { + runningTaskCount.dec(); + } + } + + private void onStart(long startTime) { + Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId()); + queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), startTime - queueTime); + runningTaskCount.inc(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java index 2f3800b..99d62ae 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java @@ -29,6 +29,7 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.codahale.metrics.jvm.ThreadStatesGaugeSet; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.AbstractIdleService; @@ -51,11 +52,20 @@ import com.google.common.util.concurrent.AbstractIdleService; public class JMXReportingService extends AbstractIdleService { private final MetricRegistry metricRegistry = new MetricRegistry(); + private Map<String, MetricSet> additionalMetricSets; private final JmxReporter jmxReporter = JmxReporter.forRegistry(this.metricRegistry) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); + public JMXReportingService() { + this(ImmutableMap.of()); + } + + public JMXReportingService(Map<String, MetricSet> additionalMetricSets) { + this.additionalMetricSets = additionalMetricSets; + } + @Override protected void startUp() throws Exception { registerJvmMetrics(); @@ -72,6 +82,9 @@ public class JMXReportingService extends AbstractIdleService { registerMetricSetWithPrefix("jvm.memory", new MemoryUsageGaugeSet()); registerMetricSetWithPrefix("jvm.threads", new ThreadStatesGaugeSet()); this.metricRegistry.register("jvm.fileDescriptorRatio", new FileDescriptorRatioGauge()); + for (Map.Entry<String, MetricSet> metricSet : this.additionalMetricSets.entrySet()) { + registerMetricSetWithPrefix(metricSet.getKey(), metricSet.getValue()); + } } private void registerMetricSetWithPrefix(String prefix, MetricSet metricSet) {
