Repository: incubator-gobblin Updated Branches: refs/heads/master d6c7fe79b -> aea0e4227
[GOBBLIN-263] Fix TaskExecutor metric calculation Closes #2115 from kadaan/Fix_for_GOBBLIN-263 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/aea0e422 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/aea0e422 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/aea0e422 Branch: refs/heads/master Commit: aea0e422792b5ca35fdd13d1e4030fcb0925acf1 Parents: d6c7fe7 Author: Joel Baranick <[email protected]> Authored: Mon Sep 25 11:10:56 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Sep 25 11:10:56 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/runtime/TaskExecutor.java | 57 ++++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/aea0e422/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java index 28ea378..476282c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java @@ -92,8 +92,8 @@ public class TaskExecutor extends AbstractIdleService { // the task waited to start. private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>(); - // The timestamp for the last time the metric source data was pruned. - private long lastCleanupTime = 0; + // The timestamp for the last time the metrics were calculated. + private long lastCalculationTime = 0; // The total number of tasks currently queued and queued over the historical lookback period. private AtomicInteger queuedTaskCount = new AtomicInteger(); @@ -282,19 +282,30 @@ public class TaskExecutor extends AbstractIdleService { return this.metricSet; } - private synchronized void cleanupMetricSources() { + private synchronized void calculateMetrics() { long currentTimeMillis = System.currentTimeMillis(); - if (lastCleanupTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) { + if (lastCalculationTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) { + LOG.debug("Starting metric calculation."); int currentQueuedTaskCount = 0; + int futureQueuedTaskCount = 0; long currentQueuedTaskTotalTime = 0; for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) { if (queuedTask.getValue() <= currentTimeMillis) { currentQueuedTaskCount++; - currentQueuedTaskTotalTime += queuedTask.getValue(); + long currentQueuedTaskTime = currentTimeMillis - queuedTask.getValue(); + currentQueuedTaskTotalTime += currentQueuedTaskTime; + LOG.debug(String.format("Task %s has been waiting in the queue for %d ms.", queuedTask.getKey(), currentQueuedTaskTime)); + } else { + futureQueuedTaskCount++; } } + if (futureQueuedTaskCount > 0) { + LOG.debug(String.format("%d tasks were ignored during metric calculations because they are scheduled to run in the future.", futureQueuedTaskCount)); + } + this.currentQueuedTaskCount.set(currentQueuedTaskCount); this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime); + LOG.debug(String.format("%d current tasks have been waiting for a total of %d ms.", currentQueuedTaskCount, currentQueuedTaskTotalTime)); int historicalQueuedTaskCount = 0; long historicalQueuedTaskTotalTime = 0; @@ -304,10 +315,12 @@ public class TaskExecutor extends AbstractIdleService { try { Map.Entry<Long, Long> historicalQueuedTask = iterator.next(); if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= queuedTaskTimeMaxSize) { + LOG.debug(String.format("Task started at %d is before the cutoff of %d and is being removed. Queue time %d will be removed from metric calculations.", historicalQueuedTask.getKey(), cutoff, historicalQueuedTask.getValue())); iterator.remove(); } else { historicalQueuedTaskCount++; historicalQueuedTaskTotalTime += historicalQueuedTask.getValue(); + LOG.debug(String.format("Task started at %d is after cutoff. Queue time %d will be used in metric calculations.", historicalQueuedTask.getKey(), historicalQueuedTask.getValue())); } } catch (NoSuchElementException e) { LOG.warn("Ran out of items in historical task queue time set."); @@ -315,13 +328,18 @@ public class TaskExecutor extends AbstractIdleService { } this.historicalQueuedTaskCount.set(historicalQueuedTaskCount); this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime); + LOG.debug(String.format("%d historical tasks have been waiting for a total of %d ms.", historicalQueuedTaskCount, historicalQueuedTaskTotalTime)); - this.queuedTaskCount.set(currentQueuedTaskCount + historicalQueuedTaskCount); - this.queuedTaskTotalTime.set(currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime); + int totalQueuedTaskCount = currentQueuedTaskCount + historicalQueuedTaskCount; + long totalQueuedTaskTime = currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime; + this.queuedTaskCount.set(totalQueuedTaskCount); + this.queuedTaskTotalTime.set(totalQueuedTaskTime); + LOG.debug(String.format("%d tasks have been waiting for a total of %d ms.", totalQueuedTaskCount, totalQueuedTaskTime)); - this.lastCleanupTime = currentTimeMillis; + this.lastCalculationTime = currentTimeMillis; + LOG.debug("Finished metric calculation."); } else { - LOG.debug("Skipped cleanup of metrics sources because not enough time has passed since last cleanup."); + LOG.debug("Skipped metric calculation because not enough time has elapsed since the last calculation."); } } @@ -332,42 +350,42 @@ public class TaskExecutor extends AbstractIdleService { metrics.put(name("queued", "current", "count"), new Gauge<Integer>() { @Override public Integer getValue() { - cleanupMetricSources(); + calculateMetrics(); return currentQueuedTaskCount.intValue(); } }); metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() { @Override public Integer getValue() { - cleanupMetricSources(); + calculateMetrics(); return historicalQueuedTaskCount.intValue(); } }); metrics.put(name("queued", "count"), new Gauge<Integer>() { @Override public Integer getValue() { - cleanupMetricSources(); + calculateMetrics(); return queuedTaskCount.intValue(); } }); metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() { @Override public Long getValue() { - cleanupMetricSources(); + calculateMetrics(); return currentQueuedTaskTotalTime.longValue(); } }); metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>() { @Override public Long getValue() { - cleanupMetricSources(); + calculateMetrics(); return historicalQueuedTaskTotalTime.longValue(); } }); metrics.put(name("queued", "time", "total"), new Gauge<Long>() { @Override public Long getValue() { - cleanupMetricSources(); + calculateMetrics(); return queuedTaskTotalTime.longValue(); } }); @@ -386,7 +404,10 @@ public class TaskExecutor extends AbstractIdleService { } public TrackingTask(Task task, long interval, TimeUnit timeUnit) { - queuedTasks.putIfAbsent(task.getTaskId(), System.currentTimeMillis() + timeUnit.toMillis(interval)); + long now = System.currentTimeMillis(); + long timeToRun = now + timeUnit.toMillis(interval); + LOG.debug(String.format("Task %s queued to run %s.", task.getTaskId(), timeToRun <= now ? "now" : "at " + timeToRun)); + queuedTasks.putIfAbsent(task.getTaskId(), timeToRun); this.underlyingTask = task; } @@ -407,7 +428,9 @@ public class TaskExecutor extends AbstractIdleService { private void onStart(long startTime) { Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId()); - queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), startTime - queueTime); + long timeInQueue = startTime - queueTime; + LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", underlyingTask.getTaskId(), timeInQueue)); + queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue); runningTaskCount.inc(); } }
