Repository: helix
Updated Branches:
  refs/heads/master 9d7364d7a -> 7e49f995e


http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index fac957c..c1cfce0 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -21,37 +21,118 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.google.common.collect.Range;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.TopStateHandoffReportStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Resource;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestTopStateHandoffMetrics extends BaseStageTest {
   public final static String TEST_INPUT_FILE = 
"TestTopStateHandoffMetrics.json";
-  public final static String INITIAL_CURRENT_STATES = "initialCurrentStates";
-  public final static String MISSING_TOP_STATES = "MissingTopStates";
-  public final static String HANDOFF_CURRENT_STATES = "handoffCurrentStates";
-  public final static String EXPECTED_DURATION = "expectedDuration";
   public final static String TEST_RESOURCE = "TestResource";
   public final static String PARTITION = "PARTITION";
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String NON_GRACEFUL_HANDOFF_DURATION = 
"PartitionTopStateNonGracefulHandoffGauge.Max";
+  private static final String GRACEFUL_HANDOFF_DURATION = 
"PartitionTopStateHandoffDurationGauge.Max";
+  private static final String HANDOFF_USER_LATENCY = 
"PartitionTopStateHandoffUserLatencyGauge.Max";
+  private static final Range<Long> DURATION_ZERO = Range.closed(0L, 0L);
+  private TestConfig config;
+
+  @BeforeClass
+  public void beforeClass() {
+    super.beforeClass();
+    try {
+      config = OBJECT_MAPPER
+          
.readValue(getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE), 
TestConfig.class);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static class CurrentStateInfo {
+    String currentState;
+    String previousState;
+    long startTime;
+    long endTime;
+
+    @JsonCreator
+    public CurrentStateInfo(
+        @JsonProperty("CurrentState") String cs,
+        @JsonProperty("PreviousState") String ps,
+        @JsonProperty("StartTime") long start,
+        @JsonProperty("EndTime") long end
+    ) {
+      currentState = cs;
+      previousState = ps;
+      startTime = start;
+      endTime = end;
+    }
+  }
+
+  private static class TestCaseConfig {
+    final Map<String, CurrentStateInfo> initialCurrentStates;
+    final Map<String, CurrentStateInfo> currentStateWithMissingTopState;
+    final Map<String, CurrentStateInfo> finalCurrentState;
+    final long duration;
+    final boolean isGraceful;
+    final long userLatency;
+
+    @JsonCreator
+    public TestCaseConfig(
+        @JsonProperty("InitialCurrentStates") Map<String, CurrentStateInfo> 
initial,
+        @JsonProperty("MissingTopStates") Map<String, CurrentStateInfo> 
missing,
+        @JsonProperty("HandoffCurrentStates") Map<String, CurrentStateInfo> 
handoff,
+        @JsonProperty("Duration") long d,
+        @JsonProperty("UserLatency") long user,
+        @JsonProperty("IsGraceful") boolean graceful
+    ) {
+      initialCurrentStates = initial;
+      currentStateWithMissingTopState = missing;
+      finalCurrentState = handoff;
+      duration = d;
+      userLatency = user;
+      isGraceful = graceful;
+    }
+  }
+
+  private static class TestConfig {
+    final List<TestCaseConfig> succeeded;
+    final List<TestCaseConfig> failed;
+    final List<TestCaseConfig> fast;
+    final List<TestCaseConfig> succeededNonGraceful;
+
+    @JsonCreator
+    public TestConfig(
+        @JsonProperty("succeeded") List<TestCaseConfig> succeededCfg,
+        @JsonProperty("failed") List<TestCaseConfig> failedCfg,
+        @JsonProperty("fast") List<TestCaseConfig> fastCfg,
+        @JsonProperty("succeededNonGraceful") List<TestCaseConfig> nonGraceful
+    ) {
+      succeeded = succeededCfg;
+      failed = failedCfg;
+      fast = fastCfg;
+      succeededNonGraceful = nonGraceful;
+    }
+  }
+
   private void preSetup() {
     setupLiveInstances(3);
     setupStateModel();
@@ -61,165 +142,159 @@ public class TestTopStateHandoffMetrics extends 
BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(),
         Collections.singletonMap(TEST_RESOURCE, resource));
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
-        CurrentStateComputationStage.NOT_RECORDED);
+        TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
     ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
     monitor.active();
     event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
   }
 
   @Test(dataProvider = "successCurrentStateInput")
-  public void testTopStateSuccessHandoff(Map<String, Map<String, String>> 
initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration) {
-    preSetup();
-    runStageAndVerify(initialCurrentStates, missingTopStates, 
handOffCurrentStates, null, 1, 0,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
+  public void testTopStateSuccessHandoff(TestCaseConfig cfg) {
+    runTestWithNoInjection(cfg, false);
   }
 
   @Test(dataProvider = "fastCurrentStateInput")
-  public void testFastTopStateHandoffWithNoMissingTopState(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration
-  ) {
-    preSetup();
-    runStageAndVerify(initialCurrentStates, missingTopStates, 
handOffCurrentStates, null, 1, 0,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
+  public void testFastTopStateHandoffWithNoMissingTopState(TestCaseConfig cfg) 
{
+    runTestWithNoInjection(cfg, false);
   }
 
   @Test(dataProvider = "fastCurrentStateInput")
-  public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration
-  ) {
+  public void 
testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(TestCaseConfig 
cfg) {
     preSetup();
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 
7500L);
     // By simulating last master instance crash, we now have:
     //  - M->S from 6000 to 7000
     //  - lastPipelineFinishTimestamp is 7500
     //  - S->M from 8000 to 9000
-    // Therefore the recorded latency should be 9000 - 7500 = 1500
+    // Therefore the recorded latency should be 9000 - 7500 = 1500, though 
original master crashed,
+    // since this is a single top state handoff observed within 1 pipeline, we 
treat it as graceful,
+    // and only record user latency for transiting to master
+    Range<Long> expectedDuration = Range.closed(1500L, 1500L);
+    Range<Long> expectedUserLatency = Range.closed(1000L, 1000L);
     runStageAndVerify(
-        initialCurrentStates, missingTopStates, handOffCurrentStates,
+        cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, 
cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
             cache.getLiveInstances().remove("localhost_1");
           }
         }, 1, 0,
-        Range.closed(1500L, 1500L),
-        Range.closed(1500L, 1500L)
+        expectedDuration,
+        DURATION_ZERO,
+        expectedDuration,
+        expectedUserLatency
     );
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
-        CurrentStateComputationStage.NOT_RECORDED);
+        TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
   }
 
-  @Test(dataProvider = "failedCurrentStateInput")
-  public void testTopStateFailedHandoff(Map<String, Map<String, String>> 
initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration) {
+  @Test(dataProvider = "succeededNonGraceful")
+  public void testTopStateSuccessfulYetNonGracefulHandoff(TestCaseConfig cfg) {
+    // localhost_0 crashed at 15000
+    // localhost_1 slave -> master started 20000, ended 22000, top state 
handoff = 7000
     preSetup();
-    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
-    clusterConfig.setMissTopStateDurationThreshold(5000L);
-    setClusterConfig(clusterConfig);
-
-    runStageAndVerify(initialCurrentStates, missingTopStates, 
handOffCurrentStates, null, 0, 1,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
-  }
-
-  // Test handoff that are triggered by an offline master instance
-  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = 
"testTopStateSuccessHandoff")
-  public void testTopStateSuccessHandoffWithOfflineNode(
-      final Map<String, Map<String, String>> initialCurrentStates,
-      final Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration) {
-    final long offlineTimeBeforeMasterless = 125;
-
-    preSetup();
-    long durationToVerify = expectedDuration + offlineTimeBeforeMasterless;
-    runStageAndVerify(initialCurrentStates, missingTopStates, 
handOffCurrentStates,
+    final String downInstance = "localhost_0";
+    final Long lastOfflineTime = 15000L;
+    Range<Long> expectedDuration = Range.closed(7000L, 7000L);
+    runStageAndVerify(
+        cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, 
cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
-            Set<String> topStateNodes = new HashSet<>();
-            for (String instance : initialCurrentStates.keySet()) {
-              if 
(initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
-                topStateNodes.add(instance);
-              }
-            }
-            // Simulate the previous top state instance goes offline
-            if (!topStateNodes.isEmpty()) {
-              cache.getLiveInstances().keySet().removeAll(topStateNodes);
-              for (String topStateNode : topStateNodes) {
-                long originalStartTime =
-                    
Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
-                cache.getInstanceOfflineTimeMap()
-                    .put(topStateNode, originalStartTime - 
offlineTimeBeforeMasterless);
-              }
-            }
+            
accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance));
+            cache.getLiveInstances().remove("localhost_0");
+            cache.getInstanceOfflineTimeMap().put("localhost_0", 
lastOfflineTime);
+            cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
           }
-        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
-        Range.closed(durationToVerify, durationToVerify));
+        }, 1, 0,
+        DURATION_ZERO, // graceful handoff duration should be 0
+        expectedDuration, // we should have an record for non-graceful handoff
+        expectedDuration, // max handoff should be same as non-graceful handoff
+        DURATION_ZERO // we don't record user latency for non-graceful 
transition
+    );
+  }
+
+  @Test(dataProvider = "failedCurrentStateInput")
+  public void testTopStateFailedHandoff(TestCaseConfig cfg) {
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.setMissTopStateDurationThreshold(5000L);
+    setClusterConfig(clusterConfig);
+    runTestWithNoInjection(cfg, true);
   }
 
   // Test success with no available clue about previous master.
   // For example, controller is just changed to a new node.
-  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = 
"testTopStateSuccessHandoff")
-  public void testHandoffDurationWithDefaultStartTime(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration) {
+  @Test(
+      dataProvider = "successCurrentStateInput",
+      dependsOnMethods = "testHandoffDurationWithPendingMessage"
+  )
+  public void testHandoffDurationWithDefaultStartTime(final TestCaseConfig 
cfg) {
     preSetup();
 
-    // reset expectedDuration since current system time would be used
-    for (Map<String, String> states : handOffCurrentStates.values()) {
-      if (states.get("CurrentState").equals("MASTER")) {
-        expectedDuration = Long.parseLong(states.get("EndTime")) - 
System.currentTimeMillis();
+    // No initialCurrentStates means no input can be used as the clue of the 
previous master.
+    // in such case, reportTopStateMissing will use current system time as 
missing top state
+    // start time, and we assume it is a graceful handoff, and only "to top 
state" user latency
+    // will be recorded
+    long userLatency = 0;
+    for (CurrentStateInfo info : cfg.currentStateWithMissingTopState.values()) 
{
+      if (info.previousState.equals("MASTER")) {
+        userLatency = cfg.userLatency - (info.endTime - info.startTime);
+      }
+      info.previousState = "OFFLINE";
+    }
+    for (CurrentStateInfo states : cfg.finalCurrentState.values()) {
+      if (states.currentState.equals("MASTER")) {
+        states.endTime = System.currentTimeMillis();
+        states.startTime = System.currentTimeMillis() - userLatency;
         break;
       }
     }
-
-    // No initialCurrentStates means no input can be used as the clue of the 
previous master.
-    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, 
handOffCurrentStates, null, 1, 0,
-        Range.atMost(1500L), Range.atMost(1500L));
+    runStageAndVerify(Collections.EMPTY_MAP, 
cfg.currentStateWithMissingTopState,
+        cfg.finalCurrentState, null, 1, 0,
+        Range.closed(userLatency, userLatency),
+        DURATION_ZERO,
+        Range.closed(userLatency, userLatency),
+        Range.closed(userLatency, userLatency)
+    );
   }
 
   /**
    * Test success with only a pending message as the clue.
    * For instance, if the master was dropped, there is no way to track the 
dropping time.
    * So either use current system time.
-   *
    * @see 
org.apache.helix.monitoring.mbeans.TestTopStateHandoffMetrics#testHandoffDurationWithDefaultStartTime
    * Or we can check if any pending message to be used as the start time.
    */
   @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = 
"testTopStateSuccessHandoff")
-  public void testHandoffDurationWithPendingMessage(
-      final Map<String, Map<String, String>> initialCurrentStates,
-      final Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long 
expectedDuration) {
+  public void testHandoffDurationWithPendingMessage(final TestCaseConfig cfg) {
     final long messageTimeBeforeMasterless = 145;
     preSetup();
 
-    long durationToVerify = expectedDuration + messageTimeBeforeMasterless;
+    long durationToVerify = cfg.duration + messageTimeBeforeMasterless;
+    long userLatency = 0;
+    for (CurrentStateInfo info : cfg.finalCurrentState.values()) {
+      if (info.currentState.equals("MASTER")) {
+        userLatency = info.endTime - info.startTime;
+      }
+    }
+
     // No initialCurrentStates means no input can be used as the clue of the 
previous master.
-    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, 
handOffCurrentStates,
+    // in this case, we will treat the handoff as graceful and only to-master 
user latency
+    // will be recorded
+    runStageAndVerify(
+        Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState, 
cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override public void doInject(ClusterDataCache cache) {
             String topStateNode = null;
-            for (String instance : initialCurrentStates.keySet()) {
-              if 
(initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
+            for (String instance : cfg.initialCurrentStates.keySet()) {
+              if 
(cfg.initialCurrentStates.get(instance).currentState.equals("MASTER")) {
                 topStateNode = instance;
                 break;
               }
             }
             // Simulate the previous top state instance goes offline
             if (topStateNode != null) {
-              long originalStartTime =
-                  
Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
+              long originalStartTime = 
cfg.currentStateWithMissingTopState.get(topStateNode).startTime;
               // Inject a message that fit expectedDuration
               Message message =
                   new Message(Message.MessageType.STATE_TRANSITION, 
"thisisafakemessage");
@@ -232,112 +307,112 @@ public class TestTopStateHandoffMetrics extends 
BaseStageTest {
               cache.cacheMessages(Collections.singletonList(message));
             }
           }
-        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
-        Range.closed(durationToVerify, durationToVerify));
+        }, 1, 0,
+        Range.closed(durationToVerify, durationToVerify),
+        DURATION_ZERO,
+        Range.closed(durationToVerify, durationToVerify),
+        Range.closed(userLatency, userLatency));
   }
 
-  private final String CURRENT_STATE = "CurrentState";
-  private final String PREVIOUS_STATE = "PreviousState";
-  private final String START_TIME = "StartTime";
-  private final String END_TIME = "EndTime";
-
   @DataProvider(name = "successCurrentStateInput")
   public Object[][] successCurrentState() {
-    return loadInputData("succeeded");
+    return testCaseConfigListToObjectArray(config.succeeded);
   }
 
   @DataProvider(name = "failedCurrentStateInput")
   public Object[][] failedCurrentState() {
-    return loadInputData("failed");
+    return testCaseConfigListToObjectArray(config.failed);
   }
 
   @DataProvider(name = "fastCurrentStateInput")
   public Object[][] fastCurrentState() {
-    return loadInputData("fast");
+    return testCaseConfigListToObjectArray(config.fast);
   }
 
-  private Object[][] loadInputData(String inputEntry) {
-    Object[][] inputData = null;
-    InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE);
+  @DataProvider(name = "succeededNonGraceful")
+  public Object[][] nonGracefulCurrentState() {
+    return testCaseConfigListToObjectArray(config.succeededNonGraceful);
+  }
 
-    try {
-      ObjectReader mapReader = new ObjectMapper().reader(Map.class);
-      Map<String, Object> inputMaps = mapReader.readValue(inputStream);
-
-      List<Map<String, Object>> inputs = (List<Map<String, Object>>) 
inputMaps.get(inputEntry);
-      inputData = new Object[inputs.size()][];
-      for (int i = 0; i < inputData.length; i++) {
-        Map<String, Map<String, String>> intialCurrentStates =
-            (Map<String, Map<String, String>>) 
inputs.get(i).get(INITIAL_CURRENT_STATES);
-        Map<String, Map<String, String>> missingTopStates =
-            (Map<String, Map<String, String>>) 
inputs.get(i).get(MISSING_TOP_STATES);
-        Map<String, Map<String, String>> handoffCurrentStates =
-            (Map<String, Map<String, String>>) 
inputs.get(i).get(HANDOFF_CURRENT_STATES);
-        Long expectedDuration = Long.parseLong((String) 
inputs.get(i).get(EXPECTED_DURATION));
-
-        inputData[i] = new Object[] { intialCurrentStates, missingTopStates, 
handoffCurrentStates,
-            expectedDuration
-        };
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
+  private Object[][] testCaseConfigListToObjectArray(List<TestCaseConfig> 
configs) {
+    Object[][] result = new Object[configs.size()][];
+    for (int i = 0; i < configs.size(); i++) {
+      result[i] = new Object[] {configs.get(i)};
     }
+    return result;
+  }
 
-    return inputData;
+  private void runTestWithNoInjection(TestCaseConfig cfg, boolean expectFail) {
+    preSetup();
+    Range<Long> duration = Range.closed(cfg.duration, cfg.duration);
+    Range<Long> expectedDuration = cfg.isGraceful ? duration : DURATION_ZERO;
+    Range<Long> expectedNonGracefulDuration = cfg.isGraceful ? DURATION_ZERO : 
duration;
+    Range<Long> expectedUserLatency =
+        cfg.isGraceful ? Range.closed(cfg.userLatency, cfg.userLatency) : 
DURATION_ZERO;
+    runStageAndVerify(cfg.initialCurrentStates, 
cfg.currentStateWithMissingTopState,
+        cfg.finalCurrentState, null, expectFail ? 0 : 1, expectFail ? 1 : 0, 
expectedDuration, expectedNonGracefulDuration,
+        expectedDuration, expectedUserLatency);
   }
 
   private Map<String, CurrentState> generateCurrentStateMap(
-      Map<String, Map<String, String>> currentStateRawData) {
+      Map<String, CurrentStateInfo> currentStateRawData) {
     Map<String, CurrentState> currentStateMap = new HashMap<String, 
CurrentState>();
     for (String instanceName : currentStateRawData.keySet()) {
-      Map<String, String> propertyMap = currentStateRawData.get(instanceName);
+      CurrentStateInfo info = currentStateRawData.get(instanceName);
       CurrentState currentState = new CurrentState(TEST_RESOURCE);
       currentState.setSessionId(SESSION_PREFIX + instanceName.split("_")[1]);
-      currentState.setState(PARTITION, propertyMap.get(CURRENT_STATE));
-      currentState.setPreviousState(PARTITION, 
propertyMap.get(PREVIOUS_STATE));
-      currentState.setStartTime(PARTITION, 
Long.parseLong(propertyMap.get(START_TIME)));
-      currentState.setEndTime(PARTITION, 
Long.parseLong(propertyMap.get(END_TIME)));
+      currentState.setState(PARTITION, info.currentState);
+      currentState.setPreviousState(PARTITION, info.previousState);
+      currentState.setStartTime(PARTITION, info.startTime);
+      currentState.setEndTime(PARTITION, info.endTime);
       currentStateMap.put(instanceName, currentState);
     }
     return currentStateMap;
   }
 
-  private void runCurrentStage(Map<String, Map<String, String>> 
initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates,
+  private void runPipeLine(Map<String, CurrentStateInfo> initialCurrentStates,
+      Map<String, CurrentStateInfo> missingTopStates,
+      Map<String, CurrentStateInfo> handOffCurrentStates,
       MissingStatesDataCacheInject testInjection) {
 
     if (initialCurrentStates != null && !initialCurrentStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(initialCurrentStates));
-      runStage(event, new ReadClusterDataStage());
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(initialCurrentStates, null);
     }
 
     if (missingTopStates != null && !missingTopStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(missingTopStates));
-      runStage(event, new ReadClusterDataStage());
-      if (testInjection != null) {
-        ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
-        testInjection.doInject(cache);
-      }
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(missingTopStates, testInjection);
     }
 
     if (handOffCurrentStates != null && !handOffCurrentStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
-      runStage(event, new ReadClusterDataStage());
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(handOffCurrentStates, null);
+    }
+  }
+
+  private void doRunStages(Map<String, CurrentStateInfo> currentStates,
+      MissingStatesDataCacheInject clusterDataInjection) {
+    setupCurrentStates(generateCurrentStateMap(currentStates));
+    runStage(event, new ReadClusterDataStage());
+    if (clusterDataInjection != null) {
+      ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
+      clusterDataInjection.doInject(cache);
     }
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new TopStateHandoffReportStage());
   }
 
   private void runStageAndVerify(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, 
MissingStatesDataCacheInject inject,
-      int successCnt, int failCnt,
-      Range<Long> expectedDuration, Range<Long> expectedMaxDuration
+      Map<String, CurrentStateInfo> initialCurrentStates,
+      Map<String, CurrentStateInfo> missingTopStates,
+      Map<String, CurrentStateInfo> handOffCurrentStates,
+      MissingStatesDataCacheInject inject,
+      int successCnt,
+      int failCnt,
+      Range<Long> expectedDuration,
+      Range<Long> expectedNonGracefulDuration,
+      Range<Long> expectedMaxDuration,
+      Range<Long> expectedUserLatency
   ) {
-    runCurrentStage(initialCurrentStates, missingTopStates, 
handOffCurrentStates, inject);
+    runPipeLine(initialCurrentStates, missingTopStates, handOffCurrentStates, 
inject);
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor monitor = 
clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
@@ -345,10 +420,17 @@ public class TestTopStateHandoffMetrics extends 
BaseStageTest {
     Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 
successCnt);
     Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);
 
-    Assert.assertTrue(
-        
expectedDuration.contains(monitor.getSuccessfulTopStateHandoffDurationCounter()));
-    Assert.assertTrue(
-        
expectedMaxDuration.contains(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge()));
+    long graceful = monitor.getPartitionTopStateHandoffDurationGauge()
+        .getAttributeValue(GRACEFUL_HANDOFF_DURATION).longValue();
+    long nonGraceful = 
monitor.getPartitionTopStateNonGracefulHandoffDurationGauge()
+        .getAttributeValue(NON_GRACEFUL_HANDOFF_DURATION).longValue();
+    long user = monitor.getPartitionTopStateHandoffUserLatencyGauge()
+        .getAttributeValue(HANDOFF_USER_LATENCY).longValue();
+    long max = monitor.getMaxSinglePartitionTopStateHandoffDurationGauge();
+    Assert.assertTrue(expectedDuration.contains(graceful));
+    Assert.assertTrue(expectedNonGracefulDuration.contains(nonGraceful));
+    Assert.assertTrue(expectedUserLatency.contains(user));
+    Assert.assertTrue(expectedMaxDuration.contains(max));
   }
 
   interface MissingStatesDataCacheInject {

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json 
b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index d296e6c..0b39a1d 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -1,306 +1,371 @@
 {
   "succeeded": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "7000"
+      "Duration": 7000,
+      "UserLatency": 5000,
+      "IsGraceful": true
     },
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "2500",
-          "EndTime": "5000"
+          "StartTime": 2500,
+          "EndTime": 5000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "ERROR",
           "PreviousState": "SLAVE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "14000"
+      "Duration": 14000,
+      "UserLatency": 4000,
+      "IsGraceful": true
+    }
+  ],
+  "succeededNonGraceful": [
+    {
+      "InitialCurrentStates": {
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": 10000,
+          "EndTime": 11000
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "MissingTopStates": {
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "HandoffCurrentStates": {
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": 20000,
+          "EndTime": 22000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "Duration": 7000,
+      "UserLatency": 5000,
+      "IsGraceful": false
     }
   ],
   "failed": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "0"
+      "Duration": 0,
+      "UserLatency": 0,
+      "IsGraceful": false
     },
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "0"
+      "Duration": 0,
+      "UserLatency": 0,
+      "IsGraceful": false
     }
   ],
   "fast": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "2500",
-          "EndTime": "5000"
+          "StartTime": 2500,
+          "EndTime": 5000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "8000",
-          "EndTime": "9000"
+          "StartTime": 8000,
+          "EndTime": 9000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "6000",
-          "EndTime": "7000"
+          "StartTime": 6000,
+          "EndTime": 7000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
 
       },
-      "expectedDuration": "3000"
+      "Duration": 3000,
+      "UserLatency": 2000,
+      "IsGraceful": true
     }
   ]
 }

Reply via email to