Repository: ambari
Updated Branches:
  refs/heads/trunk 60c9f8c82 -> 105d073ab


AMBARI-8805. Exception on collector start.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/105d073a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/105d073a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/105d073a

Branch: refs/heads/trunk
Commit: 105d073abc9e08bfd4d7ac749e75d95118d362a9
Parents: 60c9f8c
Author: Siddharth Wagle <[email protected]>
Authored: Thu Dec 18 14:50:25 2014 -0800
Committer: Siddharth Wagle <[email protected]>
Committed: Thu Dec 18 14:54:43 2014 -0800

----------------------------------------------------------------------
 .../timeline/AbstractTimelineAggregator.java    |   8 +-
 .../AbstractTimelineAggregatorTest.java         | 153 +++++++++++--------
 2 files changed, 96 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/105d073a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index f169003..9bffee2 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -110,7 +110,13 @@ public abstract class AbstractTimelineAggregator 
implements Runnable {
 
       if (success) {
         try {
-          saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+          // Comment to bug fix:
+          // cannot just save lastCheckPointTime + SLEEP_INTERVAL,
+          // it has to be verified so it is not a time in the future
+          // checkpoint says what was aggregated, and there is no way
+          // the future metrics were aggregated!
+          saveCheckPoint(Math.min(currentTime, lastCheckPointTime +
+            SLEEP_INTERVAL));
         } catch (IOException io) {
           LOG.warn("Error saving checkpoint, restarting aggregation at " +
             "previous checkpoint.");

http://git-wip-us.apache.org/repos/asf/ambari/blob/105d073a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
index 8aa8436..82d3017 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
@@ -1,21 +1,5 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
 
 import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +12,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static junit.framework.Assert.assertEquals;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
@@ -122,35 +107,35 @@ public class AbstractTimelineAggregatorTest {
     clock.setTime(0);
 
     long sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals("startTime should be zero", 0, 
startTimeInDoWork.get());
-    Assert.assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    Assert.assertEquals(0, checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
-    Assert.assertEquals("Do not aggregate on first run", 0, actualRuns);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals(0, checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
 
     // exactly one sleepInterval
     clock.setTime(clock.getTime() + sleepIntervalMillis);
     sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals("startTime", clock.getTime() -
+    assertEquals("startTime", clock.getTime() -
         sleepIntervalMillis,
       startTimeInDoWork.get());
-    Assert.assertEquals("endTime", clock.getTime(),
+    assertEquals("endTime", clock.getTime(),
       endTimeInDoWork.get());
-    Assert.assertEquals(clock.getTime(), checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
-    Assert.assertEquals(1, actualRuns);
+    assertEquals(clock.getTime(), checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(1, actualRuns);
 
     // exactly one sleepInterval
     clock.setTime(clock.getTime() + sleepIntervalMillis);
     sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals("startTime", clock.getTime() -
+    assertEquals("startTime", clock.getTime() -
         sleepIntervalMillis,
       startTimeInDoWork.get());
-    Assert.assertEquals("endTime", clock.getTime(),
+    assertEquals("endTime", clock.getTime(),
       endTimeInDoWork.get());
-    Assert.assertEquals(clock.getTime(), checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
-    Assert.assertEquals(2, actualRuns);
+    assertEquals(clock.getTime(), checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(2, actualRuns);
 
     // checkpointCutOffMultiplier x sleepInterval - should pass,
     // it will aggregate only first part of the whole 2x interval
@@ -161,32 +146,32 @@ public class AbstractTimelineAggregatorTest {
     clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
       sleepIntervalMillis));
     sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals("startTime after 2xinterval", clock.getTime() -
+    assertEquals("startTime after 2xinterval", clock.getTime() -
         (checkpointCutOffMultiplier * sleepIntervalMillis),
       startTimeInDoWork.get());
-    Assert.assertEquals("endTime after 2xinterval", clock.getTime() -
+    assertEquals("endTime after 2xinterval", clock.getTime() -
         sleepIntervalMillis,
       endTimeInDoWork.get());
-    Assert.assertEquals("checkpoint after 2xinterval", clock.getTime() -
+    assertEquals("checkpoint after 2xinterval", clock.getTime() -
       sleepIntervalMillis, checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
-    Assert.assertEquals(3, actualRuns);
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(3, actualRuns);
 
     // exactly one sleepInterval after one that lagged by one whole interval,
     // so it will do the previous one... and sleep as usual
     // no way to keep up
     clock.setTime(clock.getTime() + sleepIntervalMillis);
     sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals("startTime ", clock.getTime() -
+    assertEquals("startTime ", clock.getTime() -
         (checkpointCutOffMultiplier * sleepIntervalMillis),
       startTimeInDoWork.get());
-    Assert.assertEquals("endTime  ", clock.getTime() -
+    assertEquals("endTime  ", clock.getTime() -
         sleepIntervalMillis,
       endTimeInDoWork.get());
-    Assert.assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
+    assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
       checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
-    Assert.assertEquals(4, actualRuns);
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(4, actualRuns);
 
 
     // checkpointCutOffMultiplier x sleepInterval - in normal state should 
pass,
@@ -195,32 +180,72 @@ public class AbstractTimelineAggregatorTest {
     clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
       sleepIntervalMillis));
     sleep = agg.runOnce(sleepIntervalMillis);
-    Assert.assertEquals(4, actualRuns);
-    Assert.assertEquals("checkpoint after too much lag is reset to " +
+    assertEquals(4, actualRuns);
+    assertEquals("checkpoint after too much lag is reset to " +
         "current clock time",
       clock.getTime(), checkPoint.get());
-    Assert.assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(sleep, sleepIntervalMillis);
+  }
 
+  @Test
+  public void testDoWorkOnInterruptedRuns() throws Exception {
+    // start at some non-zero arbitrarily selected time;
+    int startingTime = 10000;
 
-  }
+    // 1.
+    clock.setTime(startingTime);
+    long timeOfFirstStep = clock.getTime();
+    long sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals("do not aggregate on first run", 0, actualRuns);
+    assertEquals("first checkpoint set on current time", timeOfFirstStep,
+      checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+
+    // 2.
+    // the doWork was fast, and sleep was interrupted (e.g. restart)
+    // Q: do we want to aggregate just part of the system? maybe we should
+    // sleep up to next cycle start!!
+    clock.setTime(timeOfFirstStep + 1);
+    long timeOfSecondStep = clock.getTime();
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be on previous checkpoint since it did not" 
+
+        " run yet",
+      timeOfFirstStep, startTimeInDoWork.get());
+
+    assertEquals("endTime can be start + interval",
+      startingTime + sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("should aggregate", 1, actualRuns);
+    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
+        "it is currentTime in our scenario",
+      timeOfSecondStep, checkPoint.get());
 
-  //testDoWorkOnInterruptedruns
-// 1. On interrupted it can skip some metrics
-//  testOnInterruption:
-//    // if sleep is interrupted.. is it ok?
-//    clock.setTime(10000);
-//  sleep = agg.runOnce(sleepIntervalMillis);
-//  Assert.assertEquals("startTime should be zero", 0,
-//    startTimeInDoWork.get());
-//  Assert.assertEquals("endTime  should be zero", 0, endTimeInDoWork.get()
-//    + sleepIntervalMillis);
-//
-//  //if it is interrupted again:
-//  clock.setTime(30000);
-//  sleep = agg.runOnce(sleepIntervalMillis);
-  //
-  // 2. if it lags it can skip??
-  //
+    assertEquals(sleep, sleepIntervalMillis);
+
+    //3.
+    // and again not a full sleep passed, so only small part was aggregated
+    clock.setTime(startingTime + 2);
+    long timeOfThirdStep = clock.getTime();
+
+    sleep = agg.runOnce(sleepIntervalMillis);
+    // startTime and endTime are both be in the future, makes no sens,
+    // query will not work!!
+    assertEquals("startTime should be previous checkpoint",
+      timeOfSecondStep, startTimeInDoWork.get());
+
+    assertEquals("endTime  can be start + interval",
+      timeOfSecondStep + sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("should aggregate", 2, actualRuns);
+    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
+        "it is currentTime in our scenario",
+      timeOfThirdStep,
+      checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+
+  }
 
   private static class TestClock implements Clock {
 

Reply via email to