Repository: helix
Updated Branches:
  refs/heads/master 11b721350 -> c701a9456


Adding latency metrics for the partition state propagation delay.

RoutingTableProvider depends on ZK notification to trigger refresh. 
Un-ignorable latency was noticed between ZK data change and RoutingTable 
updated. Multiple improvements have been done to minimize this latency. 
However, in order to evaluate the data propagation latency from business logic 
prespective, we need additional metrics.

The metric introduced by this change tracks the latency between the moment that 
a current state updated (in participants) and the moment that routing table is 
updated. The metric reports per-partition update latency when the provider is 
listening on current state.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c701a945
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c701a945
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c701a945

Branch: refs/heads/master
Commit: c701a945606e2f3fe7fad86a15907655c9f6c166
Parents: 11b7213
Author: Jiajun Wang <[email protected]>
Authored: Wed Aug 1 11:45:49 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Fri Sep 21 14:04:08 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/AbstractDataCache.java  |  15 ++-
 .../common/caches/AbstractDataSnapshot.java     |  19 +++
 .../helix/common/caches/CurrentStateCache.java  |  38 ++++--
 .../common/caches/CurrentStateSnapshot.java     |  57 +++++++++
 .../controller/pipeline/AbstractBaseStage.java  |   7 +-
 .../mbeans/RoutingTableProviderMonitor.java     |  14 +++
 .../helix/spectator/RoutingDataCache.java       |   7 ++
 .../helix/spectator/RoutingTableProvider.java   | 120 +++++++++++++------
 ...stRoutingTableProviderFromCurrentStates.java |  41 ++++++-
 .../mbeans/TestRoutingTableProviderMonitor.java |  21 ++++
 10 files changed, 285 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index 1c5924e..4bee84d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -20,16 +20,17 @@ package org.apache.helix.common.caches;
  */
 
 import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 public abstract class AbstractDataCache {
   private static Logger LOG = 
LoggerFactory.getLogger(AbstractDataCache.class.getName());
   private String _eventId = "NO_ID";
@@ -52,7 +53,7 @@ public abstract class AbstractDataCache {
    * @param <T> the type of metadata
    * @return updated properties map
    */
-  protected  <T extends HelixProperty> Map<PropertyKey, T> refreshProperties(
+  protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties(
       HelixDataAccessor accessor, List<PropertyKey> reloadKeys, 
List<PropertyKey> cachedKeys,
       Map<PropertyKey, T> cachedPropertyMap) {
     // All new entries from zk not cached locally yet should be read from ZK.
@@ -90,4 +91,8 @@ public abstract class AbstractDataCache {
     return refreshedPropertyMap;
   }
 
+  public AbstractDataSnapshot getSnapshot() {
+    throw new HelixException(String.format("DataCache %s does not support 
generating snapshot.",
+        getClass().getSimpleName()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java
new file mode 100644
index 0000000..cef090f
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java
@@ -0,0 +1,19 @@
+package org.apache.helix.common.caches;
+
+import org.apache.helix.PropertyKey;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractDataSnapshot<T> {
+  protected final Map<PropertyKey, T> _properties;
+
+  protected AbstractDataSnapshot(Map<PropertyKey, T> cacheData) {
+    _properties = Collections.unmodifiableMap(new HashMap<>(cacheData));
+  }
+
+  public Map<PropertyKey, T> getPropertyMap() {
+    return _properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 7fee4b0..40ba036 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -20,13 +20,6 @@ package org.apache.helix.common.caches;
  */
 
 import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.LogUtil;
@@ -35,6 +28,14 @@ import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Cache to hold all CurrentStates of a cluster.
  */
@@ -43,12 +44,15 @@ public class CurrentStateCache extends AbstractDataCache {
 
   private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   private Map<PropertyKey, CurrentState> _currentStateCache = 
Maps.newHashMap();
-
   private String _clusterName;
+  // If the cache is already refreshed with current state data.
+  private boolean _initialized = false;
+  private CurrentStateSnapshot _snapshot;
 
   public CurrentStateCache(String clusterName) {
     _clusterName = clusterName;
     _currentStateMap = Collections.emptyMap();
+    _snapshot = new CurrentStateSnapshot(_currentStateCache);
   }
 
   /**
@@ -107,6 +111,7 @@ public class CurrentStateCache extends AbstractDataCache {
   // reload current states that has been changed from zk to local cache.
   private void refreshCurrentStatesCache(HelixDataAccessor accessor,
       Map<String, LiveInstance> liveInstanceMap) {
+
     long start = System.currentTimeMillis();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -127,10 +132,20 @@ public class CurrentStateCache extends AbstractDataCache {
     Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet());
     cachedKeys.retainAll(currentStateKeys);
 
-    _currentStateCache = Collections.unmodifiableMap(
+    Map<PropertyKey, CurrentState> newStateCache = Collections.unmodifiableMap(
         refreshProperties(accessor, new ArrayList<>(reloadKeys), new 
ArrayList<>(cachedKeys),
             _currentStateCache));
 
+    // if the cache was not initialized, the previous state should not be 
included in the snapshot
+    if (_initialized) {
+      _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, 
reloadKeys);
+    } else {
+      _snapshot = new CurrentStateSnapshot(newStateCache);
+      _initialized = true;
+    }
+
+    _currentStateCache = newStateCache;
+
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, getEventId(),
           "# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + (
@@ -178,4 +193,9 @@ public class CurrentStateCache extends AbstractDataCache {
     }
     return 
Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId));
   }
+
+  @Override
+  public CurrentStateSnapshot getSnapshot() {
+    return _snapshot;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
new file mode 100644
index 0000000..dff9861
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
@@ -0,0 +1,57 @@
+package org.apache.helix.common.caches;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState> {
+  private Set<PropertyKey> _updatedStateKeys = null;
+  private Map<PropertyKey, CurrentState> _prevStateMap = null;
+
+  public CurrentStateSnapshot(final Map<PropertyKey, CurrentState> 
currentStateMap) {
+    super(currentStateMap);
+  }
+
+  public CurrentStateSnapshot(final Map<PropertyKey, CurrentState> 
currentStateMap,
+      final Map<PropertyKey, CurrentState> prevStateMap, final 
Set<PropertyKey> updatedStateKeys) {
+    this(currentStateMap);
+    _updatedStateKeys = Collections.unmodifiableSet(new 
HashSet<>(updatedStateKeys));
+    _prevStateMap = Collections.unmodifiableMap(new HashMap<>(prevStateMap));
+  }
+
+  /**
+   * Return the end times of all recent changed current states update.
+   */
+  public Map<PropertyKey, Map<String, Long>> getNewCurrentStateEndTimes() {
+    Map<PropertyKey, Map<String, Long>> endTimeMap = new HashMap<>();
+    if (_updatedStateKeys != null && _prevStateMap != null) {
+      // Note if the prev state map is empty, this is the first time refresh.
+      // So the update is not considered as "recent" change.
+      for (PropertyKey propertyKey : _updatedStateKeys) {
+        CurrentState prevState = _prevStateMap.get(propertyKey);
+        CurrentState curState = _properties.get(propertyKey);
+
+        Map<String, Long> partitionUpdateEndTimes = null;
+        for (String partition : curState.getPartitionStateMap().keySet()) {
+          long newEndTime = curState.getEndTime(partition);
+          if (prevState == null || prevState.getEndTime(partition) < 
newEndTime) {
+            if (partitionUpdateEndTimes == null) {
+              partitionUpdateEndTimes = new HashMap<>();
+            }
+            partitionUpdateEndTimes.put(partition, newEndTime);
+          }
+        }
+
+        if (partitionUpdateEndTimes != null) {
+          endTimeMap.put(propertyKey, partitionUpdateEndTimes);
+        }
+      }
+    }
+    return endTimeMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
index 37259a8..2bd45c5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
@@ -63,10 +65,11 @@ public class AbstractBaseStage implements Stage {
     return className;
   }
 
-  public static <T> void asyncExecute(ExecutorService service, Callable<T> 
task) {
+  public static <T> Future asyncExecute(ExecutorService service, Callable<T> 
task) {
     if (service != null) {
-      service.submit(task);
+      return service.submit(task);
     }
+    return null;
   }
 
   protected DedupEventProcessor<String, Runnable> 
getAsyncWorkerFromClusterEvent(ClusterEvent event,

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
index 1c64783..325ea1b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
@@ -48,6 +48,7 @@ public class RoutingTableProviderMonitor extends 
DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _eventQueueSizeGauge;
   private SimpleDynamicMetric<Long> _dataRefreshCounter;
   private HistogramDynamicMetric _dataRefreshLatencyGauge;
+  private HistogramDynamicMetric _statePropLatencyGauge;
 
   public RoutingTableProviderMonitor(final PropertyType propertyType, String 
clusterName) {
     _propertyType = propertyType;
@@ -63,6 +64,10 @@ public class RoutingTableProviderMonitor extends 
DynamicMBeanProvider {
     _callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l);
     _eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l);
     _dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l);
+    if (propertyType.equals(PropertyType.CURRENTSTATES)) {
+      _statePropLatencyGauge = new 
HistogramDynamicMetric("StatePropagationLatencyGauge", new Histogram(
+          new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, 
TimeUnit.MILLISECONDS)));
+    }
   }
 
   @Override
@@ -86,6 +91,12 @@ public class RoutingTableProviderMonitor extends 
DynamicMBeanProvider {
     _dataRefreshLatencyGauge.updateValue(System.currentTimeMillis() - 
startTime);
   }
 
+  public void recordStatePropagationLatency(long latency) {
+    if (_statePropLatencyGauge != null) {
+      _statePropLatencyGauge.updateValue(latency);
+    }
+  }
+
   @Override
   public RoutingTableProviderMonitor register() throws JMException {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
@@ -93,6 +104,9 @@ public class RoutingTableProviderMonitor extends 
DynamicMBeanProvider {
     attributeList.add(_callbackCounter);
     attributeList.add(_eventQueueSizeGauge);
     attributeList.add(_dataRefreshCounter);
+    if (_statePropLatencyGauge != null) {
+      attributeList.add(_statePropLatencyGauge);
+    }
 
     doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName());
     return this;

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 9b95d54..68f163c 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -20,11 +20,14 @@ package org.apache.helix.spectator;
  */
 
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
 import org.apache.helix.common.caches.BasicClusterDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
+import org.apache.helix.common.caches.CurrentStateSnapshot;
 import org.apache.helix.common.caches.TargetExternalViewCache;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -110,5 +113,9 @@ class RoutingDataCache extends BasicClusterDataCache {
   public Map<String, Map<String, Map<String, CurrentState>>> 
getCurrentStatesMap() {
     return _currentStateCache.getCurrentStatesMap();
   }
+
+  public CurrentStateSnapshot getCurrentStateSnapshot() {
+    return _currentStateCache.getSnapshot();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index cc373db..3403d12 100644
--- 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -18,35 +18,22 @@ package org.apache.helix.spectator;
  * specific language governing permissions and limitations
  * under the License.
  */
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.management.JMException;
-
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.common.caches.CurrentStateSnapshot;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
@@ -58,6 +45,22 @@ import 
org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.JMException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class RoutingTableProvider
     implements ExternalViewChangeListener, InstanceConfigChangeListener, 
ConfigChangeListener,
                LiveInstanceChangeListener, CurrentStateChangeListener {
@@ -75,6 +78,10 @@ public class RoutingTableProvider
   private boolean _isPeriodicRefreshEnabled = true; // Default is enabled
   private long _periodRefreshInterval;
   private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+  // For computing intensive reporting logic
+  private ExecutorService _reportExecutor;
+  private Future _reportingTask = null;
+
 
   public RoutingTableProvider() {
     this(null);
@@ -190,6 +197,8 @@ public class RoutingTableProvider
     } else {
       _isPeriodicRefreshEnabled = false;
     }
+
+    _reportExecutor = Executors.newSingleThreadExecutor();
   }
 
   /**
@@ -495,12 +504,11 @@ public class RoutingTableProvider
           try {
             // add current-state listeners for new sessions
             manager.addCurrentStateChangeListener(this, instanceName, session);
-            logger.info(
-                "{} added current-state listener for instance: {}, session: 
{}, listener: {}",
+            logger.info("{} added current-state listener for instance: {}, 
session: {}, listener: {}",
                 manager.getInstanceName(), instanceName, session, this);
           } catch (Exception e) {
-            logger.error("Fail to add current state listener for instance: {} 
with session: {}",
-                instanceName, session, e);
+            logger.error("Fail to add current state listener for instance: {} 
with session: {}", instanceName, session,
+                e);
           }
         }
       }
@@ -601,27 +609,65 @@ public class RoutingTableProvider
 
         _dataCache.refresh(manager.getHelixDataAccessor());
         switch (_sourceDataType) {
-          case EXTERNALVIEW:
-            refresh(_dataCache.getExternalViews().values(),
-                _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
-            break;
-          case TARGETEXTERNALVIEW:
-            refresh(_dataCache.getTargetExternalViews().values(),
-                _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
-            break;
-          case CURRENTSTATES:
-            refresh(_dataCache.getCurrentStatesMap(), 
_dataCache.getInstanceConfigMap().values(),
-                _dataCache.getLiveInstances().values());
-            break;
-          default:
-            logger.warn("Unsupported source data type: {}, stop refreshing the 
routing table!",
-                _sourceDataType);
+        case EXTERNALVIEW:
+          refresh(_dataCache.getExternalViews().values(),
+              _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
+          break;
+        case TARGETEXTERNALVIEW:
+          refresh(_dataCache.getTargetExternalViews().values(),
+              _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
+          break;
+        case CURRENTSTATES:
+          refresh(_dataCache.getCurrentStatesMap(), 
_dataCache.getInstanceConfigMap().values(),
+              _dataCache.getLiveInstances().values());
+
+          recordPropagationLatency(System.currentTimeMillis(), 
_dataCache.getCurrentStateSnapshot());
+          break;
+        default:
+          logger.warn("Unsupported source data type: {}, stop refreshing the 
routing table!",
+              _sourceDataType);
         }
 
         _monitor.increaseDataRefreshCounters(startTime);
       }
     }
 
+    /**
+     * Report current state to routing table propagation latency
+     * This method is not threadsafe. Take care of _reportingTask atomicity if 
use in multi-threads.
+     */
+    private void recordPropagationLatency(final long currentTime, final 
CurrentStateSnapshot currentStateSnapshot) {
+      // Note that due to the extra mem footprint introduced by 
currentStateSnapshot ref, we restrict running report task count to be 1.
+      // Any parallel tasks will be skipped. So the reporting metric data is 
sampled.
+      if (_reportingTask == null || _reportingTask.isDone()) {
+        _reportingTask = _reportExecutor.submit(new Callable<Object>() {
+          @Override public Object call() {
+            // getNewCurrentStateEndTimes() needs to iterate all current 
states. Make it async to avoid performance impact.
+            Map<PropertyKey, Map<String, Long>> currentStateEndTimeMap =
+                currentStateSnapshot.getNewCurrentStateEndTimes();
+            for (PropertyKey key : currentStateEndTimeMap.keySet()) {
+              Map<String, Long> partitionStateEndTimes = 
currentStateEndTimeMap.get(key);
+              for (String partition : partitionStateEndTimes.keySet()) {
+                long endTime = partitionStateEndTimes.get(partition);
+                if (currentTime >= endTime) {
+                  _monitor.recordStatePropagationLatency(currentTime - 
endTime);
+                  logger.debug(
+                      "CurrentState updated in the routing table. Node Key {}, 
Partition {}, end time {}, Propagation latency {}",
+                      key.toString(), partition, endTime, currentTime - 
endTime);
+                } else {
+                  // Verbose log in case currentTime < endTime. This could be 
the case that Router clock is slower than the participant clock.
+                  logger.trace(
+                      "CurrentState updated in the routing table. Node Key {}, 
Partition {}, end time {}, Propagation latency {}",
+                      key.toString(), partition, endTime, currentTime - 
endTime);
+                }
+              }
+            }
+            return null;
+          }
+        });
+      }
+    }
+
     public void queueEvent(NotificationContext context, ClusterEventType 
eventType,
         HelixConstants.ChangeType changeType) {
       ClusterEvent event = new ClusterEvent(_clusterName, eventType);

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index 219b753..a598a92 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -1,5 +1,6 @@
 package org.apache.helix.integration.spectator;
 
+import java.lang.management.ManagementFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -9,12 +10,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
 import org.apache.helix.spectator.RoutingTableProvider;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -23,6 +28,14 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
 public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
   private HelixManager _manager;
   private final int NUM_NODES = 10;
@@ -32,6 +45,7 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + 
getShortClassName();
   private MockParticipantManager[] _participants;
   private ClusterControllerManager _controller;
+  private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -83,7 +97,7 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
   }
 
   @Test
-  public void testRoutingTableWithCurrentStates() throws InterruptedException {
+  public void testRoutingTableWithCurrentStates() throws Exception {
     RoutingTableProvider routingTableEV =
         new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
     RoutingTableProvider routingTableCurrentStates = new 
RoutingTableProvider(_manager, PropertyType.CURRENTSTATES);
@@ -91,12 +105,14 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
     try {
       String db1 = "TestDB-1";
       _gSetupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, 
"MasterSlave", IdealState.RebalanceMode.FULL_AUTO.name());
+      long startTime = System.currentTimeMillis();
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
 
       Thread.sleep(200);
       ZkHelixClusterVerifier clusterVerifier =
           new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
       Assert.assertTrue(clusterVerifier.verifyByPolling());
+      validatePropagationLatency(PropertyType.CURRENTSTATES, 
System.currentTimeMillis() - startTime);
 
       IdealState idealState1 =
           
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db1);
@@ -105,19 +121,24 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
       // add new DB
       String db2 = "TestDB-2";
       _gSetupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, 
"MasterSlave", IdealState.RebalanceMode.FULL_AUTO.name());
+      startTime = System.currentTimeMillis();
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
 
       Thread.sleep(200);
       Assert.assertTrue(clusterVerifier.verifyByPolling());
+      validatePropagationLatency(PropertyType.CURRENTSTATES, 
System.currentTimeMillis() - startTime);
 
       IdealState idealState2 =
           
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db2);
       validate(idealState2, routingTableEV, routingTableCurrentStates);
 
       // shutdown an instance
+      startTime = System.currentTimeMillis();
       _participants[0].syncStop();
       Thread.sleep(200);
       Assert.assertTrue(clusterVerifier.verifyByPolling());
+      validatePropagationLatency(PropertyType.CURRENTSTATES, 
System.currentTimeMillis() - startTime);
+
       validate(idealState1, routingTableEV, routingTableCurrentStates);
       validate(idealState2, routingTableEV, routingTableCurrentStates);
     } finally {
@@ -126,6 +147,24 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
     }
   }
 
+  private ObjectName buildObjectName(PropertyType type)
+      throws MalformedObjectNameException {
+    return 
MBeanRegistrar.buildObjectName(MonitorDomainNames.RoutingTableProvider.name(),
+        RoutingTableProviderMonitor.CLUSTER_KEY, CLUSTER_NAME, 
RoutingTableProviderMonitor.DATA_TYPE_KEY,
+        type.name());
+  }
+
+  private void validatePropagationLatency(PropertyType type, final long 
upperBound)
+      throws Exception {
+    final ObjectName name = buildObjectName(type);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() throws Exception {
+        long stateLatency = (long) _beanServer.getAttribute(name, 
"StatePropagationLatencyGauge.Max");
+        return stateLatency > 0 && stateLatency <= upperBound;
+      }
+    }, 1000));
+  }
+
   @Test (dependsOnMethods = {"testRoutingTableWithCurrentStates"})
   public void testWithSupportSourceDataType() {
     new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown();

http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
index 05240c1..1ab8713 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
@@ -82,6 +82,8 @@ public class TestRoutingTableProviderMonitor {
     Assert.assertEquals((long) _beanServer.getAttribute(name, 
"EventQueueSizeGauge"), 15);
     Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshLatencyGauge.Max"), 0);
     Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshCounter"), 0);
+    // StatePropagationLatencyGauge only apply for current state
+    Assert.assertEquals(_beanServer.getAttribute(name, 
"StatePropagationLatencyGauge.Max"), null);
 
     long startTime = System.currentTimeMillis();
     Thread.sleep(5);
@@ -97,4 +99,23 @@ public class TestRoutingTableProviderMonitor {
 
     monitor.unregister();
   }
+
+  public void testCurrentStateMetrics() throws JMException, 
InterruptedException {
+    PropertyType type = PropertyType.CURRENTSTATES;
+    RoutingTableProviderMonitor monitor = new 
RoutingTableProviderMonitor(type, TEST_CLUSTER);
+    monitor.register();
+    ObjectName name = buildObjectName(type, TEST_CLUSTER);
+
+    monitor.increaseCallbackCounters(10);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"StatePropagationLatencyGauge.Max"), 0);
+
+    monitor.recordStatePropagationLatency(5);
+    long statelatency = (long) _beanServer.getAttribute(name, 
"StatePropagationLatencyGauge.Max");
+    Assert.assertEquals(statelatency, 5);
+    monitor.recordStatePropagationLatency(10);
+    statelatency = (long) _beanServer.getAttribute(name, 
"StatePropagationLatencyGauge.Max");
+    Assert.assertEquals(statelatency, 10);
+
+    monitor.unregister();
+  }
 }

Reply via email to