Repository: ambari Updated Branches: refs/heads/trunk 60c9f8c82 -> 105d073ab
AMBARI-8805. Exception on collector start. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/105d073a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/105d073a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/105d073a Branch: refs/heads/trunk Commit: 105d073abc9e08bfd4d7ac749e75d95118d362a9 Parents: 60c9f8c Author: Siddharth Wagle <[email protected]> Authored: Thu Dec 18 14:50:25 2014 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Thu Dec 18 14:54:43 2014 -0800 ---------------------------------------------------------------------- .../timeline/AbstractTimelineAggregator.java | 8 +- .../AbstractTimelineAggregatorTest.java | 153 +++++++++++-------- 2 files changed, 96 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/105d073a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java index f169003..9bffee2 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java @@ -110,7 +110,13 @@ public abstract class AbstractTimelineAggregator implements Runnable { if (success) { try { - saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); + // Comment to bug fix: + // cannot just save lastCheckPointTime + SLEEP_INTERVAL, + // it has to be verified so it is not a time in the future + // checkpoint says what was aggregated, and there is no way + // the future metrics were aggregated! + saveCheckPoint(Math.min(currentTime, lastCheckPointTime + + SLEEP_INTERVAL)); } catch (IOException io) { LOG.warn("Error saving checkpoint, restarting aggregation at " + "previous checkpoint."); http://git-wip-us.apache.org/repos/asf/ambari/blob/105d073a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java index 8aa8436..82d3017 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java @@ -1,21 +1,5 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; @@ -28,6 +12,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.atomic.AtomicLong; +import static junit.framework.Assert.assertEquals; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics .timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics @@ -122,35 +107,35 @@ public class AbstractTimelineAggregatorTest { clock.setTime(0); long sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - Assert.assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - Assert.assertEquals(0, checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); - Assert.assertEquals("Do not aggregate on first run", 0, actualRuns); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals(0, checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals("Do not aggregate on first run", 0, actualRuns); // exactly one sleepInterval clock.setTime(clock.getTime() + sleepIntervalMillis); sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals("startTime", clock.getTime() - + assertEquals("startTime", clock.getTime() - sleepIntervalMillis, startTimeInDoWork.get()); - Assert.assertEquals("endTime", clock.getTime(), + assertEquals("endTime", clock.getTime(), endTimeInDoWork.get()); - Assert.assertEquals(clock.getTime(), checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); - Assert.assertEquals(1, actualRuns); + assertEquals(clock.getTime(), checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(1, actualRuns); // exactly one sleepInterval clock.setTime(clock.getTime() + sleepIntervalMillis); sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals("startTime", clock.getTime() - + assertEquals("startTime", clock.getTime() - sleepIntervalMillis, startTimeInDoWork.get()); - Assert.assertEquals("endTime", clock.getTime(), + assertEquals("endTime", clock.getTime(), endTimeInDoWork.get()); - Assert.assertEquals(clock.getTime(), checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); - Assert.assertEquals(2, actualRuns); + assertEquals(clock.getTime(), checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(2, actualRuns); // checkpointCutOffMultiplier x sleepInterval - should pass, // it will aggregate only first part of the whole 2x interval @@ -161,32 +146,32 @@ public class AbstractTimelineAggregatorTest { clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * sleepIntervalMillis)); sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals("startTime after 2xinterval", clock.getTime() - + assertEquals("startTime after 2xinterval", clock.getTime() - (checkpointCutOffMultiplier * sleepIntervalMillis), startTimeInDoWork.get()); - Assert.assertEquals("endTime after 2xinterval", clock.getTime() - + assertEquals("endTime after 2xinterval", clock.getTime() - sleepIntervalMillis, endTimeInDoWork.get()); - Assert.assertEquals("checkpoint after 2xinterval", clock.getTime() - + assertEquals("checkpoint after 2xinterval", clock.getTime() - sleepIntervalMillis, checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); - Assert.assertEquals(3, actualRuns); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(3, actualRuns); // exactly one sleepInterval after one that lagged by one whole interval, // so it will do the previous one... and sleep as usual // no way to keep up clock.setTime(clock.getTime() + sleepIntervalMillis); sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals("startTime ", clock.getTime() - + assertEquals("startTime ", clock.getTime() - (checkpointCutOffMultiplier * sleepIntervalMillis), startTimeInDoWork.get()); - Assert.assertEquals("endTime ", clock.getTime() - + assertEquals("endTime ", clock.getTime() - sleepIntervalMillis, endTimeInDoWork.get()); - Assert.assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis, + assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis, checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); - Assert.assertEquals(4, actualRuns); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(4, actualRuns); // checkpointCutOffMultiplier x sleepInterval - in normal state should pass, @@ -195,32 +180,72 @@ public class AbstractTimelineAggregatorTest { clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * sleepIntervalMillis)); sleep = agg.runOnce(sleepIntervalMillis); - Assert.assertEquals(4, actualRuns); - Assert.assertEquals("checkpoint after too much lag is reset to " + + assertEquals(4, actualRuns); + assertEquals("checkpoint after too much lag is reset to " + "current clock time", clock.getTime(), checkPoint.get()); - Assert.assertEquals(sleep, sleepIntervalMillis); + assertEquals(sleep, sleepIntervalMillis); + } + @Test + public void testDoWorkOnInterruptedRuns() throws Exception { + // start at some non-zero arbitrarily selected time; + int startingTime = 10000; - } + // 1. + clock.setTime(startingTime); + long timeOfFirstStep = clock.getTime(); + long sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals("do not aggregate on first run", 0, actualRuns); + assertEquals("first checkpoint set on current time", timeOfFirstStep, + checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + + // 2. + // the doWork was fast, and sleep was interrupted (e.g. restart) + // Q: do we want to aggregate just part of the system? maybe we should + // sleep up to next cycle start!! + clock.setTime(timeOfFirstStep + 1); + long timeOfSecondStep = clock.getTime(); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be on previous checkpoint since it did not" + + " run yet", + timeOfFirstStep, startTimeInDoWork.get()); + + assertEquals("endTime can be start + interval", + startingTime + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("should aggregate", 1, actualRuns); + assertEquals("checkpoint here should be set to min(endTime,currentTime), " + + "it is currentTime in our scenario", + timeOfSecondStep, checkPoint.get()); - //testDoWorkOnInterruptedruns -// 1. On interrupted it can skip some metrics -// testOnInterruption: -// // if sleep is interrupted.. is it ok? -// clock.setTime(10000); -// sleep = agg.runOnce(sleepIntervalMillis); -// Assert.assertEquals("startTime should be zero", 0, -// startTimeInDoWork.get()); -// Assert.assertEquals("endTime should be zero", 0, endTimeInDoWork.get() -// + sleepIntervalMillis); -// -// //if it is interrupted again: -// clock.setTime(30000); -// sleep = agg.runOnce(sleepIntervalMillis); - // - // 2. if it lags it can skip?? - // + assertEquals(sleep, sleepIntervalMillis); + + //3. + // and again not a full sleep passed, so only small part was aggregated + clock.setTime(startingTime + 2); + long timeOfThirdStep = clock.getTime(); + + sleep = agg.runOnce(sleepIntervalMillis); + // startTime and endTime are both be in the future, makes no sens, + // query will not work!! + assertEquals("startTime should be previous checkpoint", + timeOfSecondStep, startTimeInDoWork.get()); + + assertEquals("endTime can be start + interval", + timeOfSecondStep + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("should aggregate", 2, actualRuns); + assertEquals("checkpoint here should be set to min(endTime,currentTime), " + + "it is currentTime in our scenario", + timeOfThirdStep, + checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + + } private static class TestClock implements Clock {
