Repository: ambari Updated Branches: refs/heads/branch-2.2 77e1988bb -> d0b96cd95
AMBARI-15621 : Cluster Second aggregator taking more than 2 mins to execute on large clusters, thereby causing lag (avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d0b96cd9 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d0b96cd9 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d0b96cd9 Branch: refs/heads/branch-2.2 Commit: d0b96cd95f5a5c9e3d672e820b2551a85c8ca41e Parents: 77e1988 Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Wed Mar 30 10:53:44 2016 -0700 Committer: Aravindan Vijayan <avija...@hortonworks.com> Committed: Wed Mar 30 10:53:44 2016 -0700 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 19 +++-- .../aggregators/AbstractTimelineAggregator.java | 82 +++++++++----------- .../AbstractTimelineAggregatorTest.java | 48 ++++++------ .../stacks/HDP/2.0.6/services/stack_advisor.py | 2 + 4 files changed, 71 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 2f080e3..a32e206 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -95,37 +95,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); - scheduleAggregatorThread(secondClusterAggregator, metricsConf); + scheduleAggregatorThread(secondClusterAggregator); // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); - scheduleAggregatorThread(minuteClusterAggregator, metricsConf); + scheduleAggregatorThread(minuteClusterAggregator); // Start the hourly cluster aggregator TimelineMetricAggregator hourlyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf); - scheduleAggregatorThread(hourlyClusterAggregator, metricsConf); + scheduleAggregatorThread(hourlyClusterAggregator); // Start the daily cluster aggregator TimelineMetricAggregator dailyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf); - scheduleAggregatorThread(dailyClusterAggregator, metricsConf); + scheduleAggregatorThread(dailyClusterAggregator); // Start the minute host aggregator TimelineMetricAggregator minuteHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); - scheduleAggregatorThread(minuteHostAggregator, metricsConf); + scheduleAggregatorThread(minuteHostAggregator); // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); - scheduleAggregatorThread(hourlyHostAggregator, metricsConf); + scheduleAggregatorThread(hourlyHostAggregator); // Start the daily host aggregator TimelineMetricAggregator dailyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf); - scheduleAggregatorThread(dailyHostAggregator, metricsConf); + scheduleAggregatorThread(dailyHostAggregator); if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); @@ -333,12 +333,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin return metricMetadataManager.getHostedAppsCache(); } - private void scheduleAggregatorThread(TimelineMetricAggregator aggregator, - Configuration metricsConf) { + private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); if (!aggregator.isDisabled()) { executorService.scheduleAtFixedRate(aggregator, - SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)), + 0l, aggregator.getSleepIntervalMillis(), TimeUnit.MILLISECONDS); } http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index f8ec516..c576a40 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -52,7 +52,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String tableName; protected String outputTableName; protected Long nativeTimeRangeDelay; - protected Long lastAggregatedEndTime = -1l; // Explicitly name aggregators for logging needs private final String aggregatorName; @@ -93,18 +92,19 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg LOG.info("Started Timeline aggregator thread @ " + new Date()); Long SLEEP_INTERVAL = getSleepIntervalMillis(); runOnce(SLEEP_INTERVAL); - this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL; } /** * Access relaxed for tests */ public void runOnce(Long SLEEP_INTERVAL) { - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(); + + long currentTime = System.currentTimeMillis(); + long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); if (lastCheckPointTime != -1) { LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((lastAggregatedEndTime - lastCheckPointTime) / 1000) + + ((currentTime - lastCheckPointTime) / 1000) + " seconds."); boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); @@ -120,40 +120,40 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } } - private long readLastCheckpointSavingOnFirstRun() { + private long readLastCheckpointSavingOnFirstRun(long currentTime) { long lastCheckPointTime = -1; try { lastCheckPointTime = readCheckPoint(); - LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime)); - - if (lastAggregatedEndTime == -1l) { - lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()); - } - - if (isLastCheckPointTooOld(lastCheckPointTime)) { - LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + - "lastCheckPointTime = " + new Date(lastCheckPointTime)); - lastCheckPointTime = -1; - } - - if (lastCheckPointTime > 0) { - lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis()); - LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime)); - } - - if (isLastCheckPointTooYoung(lastCheckPointTime)) { - LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle."); - lastCheckPointTime = -1; - } - - if (lastCheckPointTime == -1) { - // Assuming first run, save checkpoint and sleep. - // Set checkpoint to rounded time in the past to allow the - // agents/collectors to catch up - LOG.info("Saving checkpoint time on first run. " + - new Date((lastAggregatedEndTime))); - saveCheckPoint(lastAggregatedEndTime); + if (lastCheckPointTime != -1) { + LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime)); + if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) { + LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + + "lastCheckPointTime = " + new Date(lastCheckPointTime)); + lastCheckPointTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - getSleepIntervalMillis(); + LOG.info("Saving checkpoint time. " + new Date((lastCheckPointTime))); + saveCheckPoint(lastCheckPointTime); + + } else { + + if (lastCheckPointTime > 0) { + lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis()); + LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime)); + } + + if (isLastCheckPointTooYoung(lastCheckPointTime)) { + LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle."); + return -1; //Skip Aggregation this time around + } + } + } else { + /* + No checkpoint. Save current rounded checkpoint and sleep for 1 cycle. + */ + LOG.info("No checkpoint found"); + long firstCheckPoint = getRoundedAggregateTimeMillis(getSleepIntervalMillis()); + LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint))); + saveCheckPoint(firstCheckPoint); } } catch (IOException io) { LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); @@ -161,16 +161,16 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg return lastCheckPointTime; } - private boolean isLastCheckPointTooOld(long checkpoint) { + private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) { // first checkpoint is saved checkpointDelayMillis in the past, // so here we also need to take it into account return checkpoint != -1 && - ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis()); + ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis()); } private boolean isLastCheckPointTooYoung(long checkpoint) { return checkpoint != -1 && - ((lastAggregatedEndTime <= checkpoint)); + ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= checkpoint)); } protected long readCheckPoint() { @@ -295,14 +295,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg return checkpointLocation; } - protected void setLastAggregatedEndTime(long lastAggregatedEndTime) { - this.lastAggregatedEndTime = lastAggregatedEndTime; - } - - protected long getLastAggregatedEndTime() { - return lastAggregatedEndTime; - } - public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) { return referenceTime - (referenceTime % aggregatorPeriod); } http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java index 21b9839..827f399 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.junit.Before; import org.junit.Test; @@ -114,45 +115,42 @@ public class AbstractTimelineAggregatorTest { long currentTime = System.currentTimeMillis(); long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime, sleepIntervalMillis); - + //Test first run of aggregator with no checkpoint checkPoint.set(-1); - agg.setLastAggregatedEndTime(-1l); agg.runOnce(sleepIntervalMillis); assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); assertEquals(roundedOffAggregatorTime, checkPoint.get()); assertEquals("Do not aggregate on first run", 0, actualRuns); - //Test first run with Too Old checkpoint +// //Test first run with too "recent" checkpoint currentTime = System.currentTimeMillis(); - checkPoint.set(currentTime - 16*60*1000); //Old checkpoint - agg.setLastAggregatedEndTime(-1l); + checkPoint.set(currentTime); + agg.setSleepIntervalMillis(sleepIntervalMillis); agg.runOnce(sleepIntervalMillis); assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals(roundedOffAggregatorTime, checkPoint.get()); assertEquals("Do not aggregate on first run", 0, actualRuns); - //Test first run with too "recent" checkpoint + //Test first run with Too Old checkpoint currentTime = System.currentTimeMillis(); - checkPoint.set(currentTime); - agg.setLastAggregatedEndTime(-1l); - agg.setSleepIntervalMillis(sleepIntervalMillis); + checkPoint.set(currentTime - 16*60*1000); //Old checkpoint agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get()); - assertEquals("Do not aggregate on first run", 0, actualRuns); + long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis); + assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get()); + assertEquals("endTime should be zero", checkPointTime, endTimeInDoWork.get()); + assertEquals(roundedOffAggregatorTime, checkPoint.get()); + assertEquals("Do not aggregate on first run", 1, actualRuns); + - //Test first run with perfect checkpoint (sleepIntervalMillis back) +// //Test first run with perfect checkpoint (sleepIntervalMillis back) currentTime = System.currentTimeMillis(); roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime, sleepIntervalMillis); - long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis; + checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis; long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); checkPoint.set(checkPointTime); - agg.setLastAggregatedEndTime(-1l); agg.runOnce(sleepIntervalMillis); assertEquals("startTime should the lower rounded time of the checkpoint time", expectedCheckPoint, startTimeInDoWork.get()); @@ -160,20 +158,20 @@ public class AbstractTimelineAggregatorTest { expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); assertEquals(expectedCheckPoint + sleepIntervalMillis, checkPoint.get()); - assertEquals("Aggregate on first run", 1, actualRuns); + assertEquals("Aggregate on first run", 2, actualRuns); //Test edge case for checkpoint (2 x sleepIntervalMillis) - checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis; - expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); - checkPoint.set(checkPointTime); + currentTime = System.currentTimeMillis(); + checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000); agg.runOnce(sleepIntervalMillis); + long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis); assertEquals("startTime should the lower rounded time of the checkpoint time", - expectedCheckPoint, startTimeInDoWork.get()); + expectedStartTime, startTimeInDoWork.get()); assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", - expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); - assertEquals(expectedCheckPoint + sleepIntervalMillis, + expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get()); + assertEquals(expectedStartTime + sleepIntervalMillis, checkPoint.get()); - assertEquals("Aggregate on second run", 2, actualRuns); + assertEquals("Aggregate on second run", 3, actualRuns); } http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index b8e2c45..a007817 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -592,12 +592,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor): putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) elif total_sinks_count >= 500: putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) else: putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) pass