Repository: helix Updated Branches: refs/heads/master 11b721350 -> c701a9456
Adding latency metrics for the partition state propagation delay. RoutingTableProvider depends on ZK notification to trigger refresh. Un-ignorable latency was noticed between ZK data change and RoutingTable updated. Multiple improvements have been done to minimize this latency. However, in order to evaluate the data propagation latency from business logic prespective, we need additional metrics. The metric introduced by this change tracks the latency between the moment that a current state updated (in participants) and the moment that routing table is updated. The metric reports per-partition update latency when the provider is listening on current state. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c701a945 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c701a945 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c701a945 Branch: refs/heads/master Commit: c701a945606e2f3fe7fad86a15907655c9f6c166 Parents: 11b7213 Author: Jiajun Wang <[email protected]> Authored: Wed Aug 1 11:45:49 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Fri Sep 21 14:04:08 2018 -0700 ---------------------------------------------------------------------- .../helix/common/caches/AbstractDataCache.java | 15 ++- .../common/caches/AbstractDataSnapshot.java | 19 +++ .../helix/common/caches/CurrentStateCache.java | 38 ++++-- .../common/caches/CurrentStateSnapshot.java | 57 +++++++++ .../controller/pipeline/AbstractBaseStage.java | 7 +- .../mbeans/RoutingTableProviderMonitor.java | 14 +++ .../helix/spectator/RoutingDataCache.java | 7 ++ .../helix/spectator/RoutingTableProvider.java | 120 +++++++++++++------ ...stRoutingTableProviderFromCurrentStates.java | 41 ++++++- .../mbeans/TestRoutingTableProviderMonitor.java | 21 ++++ 10 files changed, 285 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java index 1c5924e..4bee84d 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java @@ -20,16 +20,17 @@ package org.apache.helix.common.caches; */ import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + public abstract class AbstractDataCache { private static Logger LOG = LoggerFactory.getLogger(AbstractDataCache.class.getName()); private String _eventId = "NO_ID"; @@ -52,7 +53,7 @@ public abstract class AbstractDataCache { * @param <T> the type of metadata * @return updated properties map */ - protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties( + protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties( HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey> cachedKeys, Map<PropertyKey, T> cachedPropertyMap) { // All new entries from zk not cached locally yet should be read from ZK. @@ -90,4 +91,8 @@ public abstract class AbstractDataCache { return refreshedPropertyMap; } + public AbstractDataSnapshot getSnapshot() { + throw new HelixException(String.format("DataCache %s does not support generating snapshot.", + getClass().getSimpleName())); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java new file mode 100644 index 0000000..cef090f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataSnapshot.java @@ -0,0 +1,19 @@ +package org.apache.helix.common.caches; + +import org.apache.helix.PropertyKey; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public abstract class AbstractDataSnapshot<T> { + protected final Map<PropertyKey, T> _properties; + + protected AbstractDataSnapshot(Map<PropertyKey, T> cacheData) { + _properties = Collections.unmodifiableMap(new HashMap<>(cacheData)); + } + + public Map<PropertyKey, T> getPropertyMap() { + return _properties; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java index 7fee4b0..40ba036 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -20,13 +20,6 @@ package org.apache.helix.common.caches; */ import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.controller.LogUtil; @@ -35,6 +28,14 @@ import org.apache.helix.model.LiveInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * Cache to hold all CurrentStates of a cluster. */ @@ -43,12 +44,15 @@ public class CurrentStateCache extends AbstractDataCache { private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap; private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap(); - private String _clusterName; + // If the cache is already refreshed with current state data. + private boolean _initialized = false; + private CurrentStateSnapshot _snapshot; public CurrentStateCache(String clusterName) { _clusterName = clusterName; _currentStateMap = Collections.emptyMap(); + _snapshot = new CurrentStateSnapshot(_currentStateCache); } /** @@ -107,6 +111,7 @@ public class CurrentStateCache extends AbstractDataCache { // reload current states that has been changed from zk to local cache. private void refreshCurrentStatesCache(HelixDataAccessor accessor, Map<String, LiveInstance> liveInstanceMap) { + long start = System.currentTimeMillis(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -127,10 +132,20 @@ public class CurrentStateCache extends AbstractDataCache { Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet()); cachedKeys.retainAll(currentStateKeys); - _currentStateCache = Collections.unmodifiableMap( + Map<PropertyKey, CurrentState> newStateCache = Collections.unmodifiableMap( refreshProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys), _currentStateCache)); + // if the cache was not initialized, the previous state should not be included in the snapshot + if (_initialized) { + _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadKeys); + } else { + _snapshot = new CurrentStateSnapshot(newStateCache); + _initialized = true; + } + + _currentStateCache = newStateCache; + if (LOG.isDebugEnabled()) { LogUtil.logDebug(LOG, getEventId(), "# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + ( @@ -178,4 +193,9 @@ public class CurrentStateCache extends AbstractDataCache { } return Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId)); } + + @Override + public CurrentStateSnapshot getSnapshot() { + return _snapshot; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java new file mode 100644 index 0000000..dff9861 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java @@ -0,0 +1,57 @@ +package org.apache.helix.common.caches; + +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState> { + private Set<PropertyKey> _updatedStateKeys = null; + private Map<PropertyKey, CurrentState> _prevStateMap = null; + + public CurrentStateSnapshot(final Map<PropertyKey, CurrentState> currentStateMap) { + super(currentStateMap); + } + + public CurrentStateSnapshot(final Map<PropertyKey, CurrentState> currentStateMap, + final Map<PropertyKey, CurrentState> prevStateMap, final Set<PropertyKey> updatedStateKeys) { + this(currentStateMap); + _updatedStateKeys = Collections.unmodifiableSet(new HashSet<>(updatedStateKeys)); + _prevStateMap = Collections.unmodifiableMap(new HashMap<>(prevStateMap)); + } + + /** + * Return the end times of all recent changed current states update. + */ + public Map<PropertyKey, Map<String, Long>> getNewCurrentStateEndTimes() { + Map<PropertyKey, Map<String, Long>> endTimeMap = new HashMap<>(); + if (_updatedStateKeys != null && _prevStateMap != null) { + // Note if the prev state map is empty, this is the first time refresh. + // So the update is not considered as "recent" change. + for (PropertyKey propertyKey : _updatedStateKeys) { + CurrentState prevState = _prevStateMap.get(propertyKey); + CurrentState curState = _properties.get(propertyKey); + + Map<String, Long> partitionUpdateEndTimes = null; + for (String partition : curState.getPartitionStateMap().keySet()) { + long newEndTime = curState.getEndTime(partition); + if (prevState == null || prevState.getEndTime(partition) < newEndTime) { + if (partitionUpdateEndTimes == null) { + partitionUpdateEndTimes = new HashMap<>(); + } + partitionUpdateEndTimes.put(partition, newEndTime); + } + } + + if (partitionUpdateEndTimes != null) { + endTimeMap.put(propertyKey, partitionUpdateEndTimes); + } + } + } + return endTimeMap; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index 37259a8..2bd45c5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; @@ -63,10 +65,11 @@ public class AbstractBaseStage implements Stage { return className; } - public static <T> void asyncExecute(ExecutorService service, Callable<T> task) { + public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) { if (service != null) { - service.submit(task); + return service.submit(task); } + return null; } protected DedupEventProcessor<String, Runnable> getAsyncWorkerFromClusterEvent(ClusterEvent event, http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java index 1c64783..325ea1b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java @@ -48,6 +48,7 @@ public class RoutingTableProviderMonitor extends DynamicMBeanProvider { private SimpleDynamicMetric<Long> _eventQueueSizeGauge; private SimpleDynamicMetric<Long> _dataRefreshCounter; private HistogramDynamicMetric _dataRefreshLatencyGauge; + private HistogramDynamicMetric _statePropLatencyGauge; public RoutingTableProviderMonitor(final PropertyType propertyType, String clusterName) { _propertyType = propertyType; @@ -63,6 +64,10 @@ public class RoutingTableProviderMonitor extends DynamicMBeanProvider { _callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l); _eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l); _dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l); + if (propertyType.equals(PropertyType.CURRENTSTATES)) { + _statePropLatencyGauge = new HistogramDynamicMetric("StatePropagationLatencyGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + } } @Override @@ -86,6 +91,12 @@ public class RoutingTableProviderMonitor extends DynamicMBeanProvider { _dataRefreshLatencyGauge.updateValue(System.currentTimeMillis() - startTime); } + public void recordStatePropagationLatency(long latency) { + if (_statePropLatencyGauge != null) { + _statePropLatencyGauge.updateValue(latency); + } + } + @Override public RoutingTableProviderMonitor register() throws JMException { List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); @@ -93,6 +104,9 @@ public class RoutingTableProviderMonitor extends DynamicMBeanProvider { attributeList.add(_callbackCounter); attributeList.add(_eventQueueSizeGauge); attributeList.add(_dataRefreshCounter); + if (_statePropLatencyGauge != null) { + attributeList.add(_statePropLatencyGauge); + } doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName()); return this; http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index 9b95d54..68f163c 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -20,11 +20,14 @@ package org.apache.helix.spectator; */ import java.util.Map; +import java.util.Set; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyType; import org.apache.helix.common.caches.BasicClusterDataCache; import org.apache.helix.common.caches.CurrentStateCache; +import org.apache.helix.common.caches.CurrentStateSnapshot; import org.apache.helix.common.caches.TargetExternalViewCache; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; @@ -110,5 +113,9 @@ class RoutingDataCache extends BasicClusterDataCache { public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() { return _currentStateCache.getCurrentStatesMap(); } + + public CurrentStateSnapshot getCurrentStateSnapshot() { + return _currentStateCache.getSnapshot(); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index cc373db..3403d12 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -18,35 +18,22 @@ package org.apache.helix.spectator; * specific language governing permissions and limitations * under the License. */ - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.management.JMException; - import org.apache.helix.HelixConstants; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyType; import org.apache.helix.api.listeners.ConfigChangeListener; import org.apache.helix.api.listeners.CurrentStateChangeListener; -import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.api.listeners.ExternalViewChangeListener; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.NotificationContext; -import org.apache.helix.PropertyKey; +import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.api.listeners.RoutingTableChangeListener; import org.apache.helix.common.ClusterEventProcessor; +import org.apache.helix.common.caches.CurrentStateSnapshot; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; @@ -58,6 +45,22 @@ import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.JMException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener { @@ -75,6 +78,10 @@ public class RoutingTableProvider private boolean _isPeriodicRefreshEnabled = true; // Default is enabled private long _periodRefreshInterval; private ScheduledThreadPoolExecutor _periodicRefreshExecutor; + // For computing intensive reporting logic + private ExecutorService _reportExecutor; + private Future _reportingTask = null; + public RoutingTableProvider() { this(null); @@ -190,6 +197,8 @@ public class RoutingTableProvider } else { _isPeriodicRefreshEnabled = false; } + + _reportExecutor = Executors.newSingleThreadExecutor(); } /** @@ -495,12 +504,11 @@ public class RoutingTableProvider try { // add current-state listeners for new sessions manager.addCurrentStateChangeListener(this, instanceName, session); - logger.info( - "{} added current-state listener for instance: {}, session: {}, listener: {}", + logger.info("{} added current-state listener for instance: {}, session: {}, listener: {}", manager.getInstanceName(), instanceName, session, this); } catch (Exception e) { - logger.error("Fail to add current state listener for instance: {} with session: {}", - instanceName, session, e); + logger.error("Fail to add current state listener for instance: {} with session: {}", instanceName, session, + e); } } } @@ -601,27 +609,65 @@ public class RoutingTableProvider _dataCache.refresh(manager.getHelixDataAccessor()); switch (_sourceDataType) { - case EXTERNALVIEW: - refresh(_dataCache.getExternalViews().values(), - _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); - break; - case TARGETEXTERNALVIEW: - refresh(_dataCache.getTargetExternalViews().values(), - _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); - break; - case CURRENTSTATES: - refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(), - _dataCache.getLiveInstances().values()); - break; - default: - logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", - _sourceDataType); + case EXTERNALVIEW: + refresh(_dataCache.getExternalViews().values(), + _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); + break; + case TARGETEXTERNALVIEW: + refresh(_dataCache.getTargetExternalViews().values(), + _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); + break; + case CURRENTSTATES: + refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(), + _dataCache.getLiveInstances().values()); + + recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot()); + break; + default: + logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", + _sourceDataType); } _monitor.increaseDataRefreshCounters(startTime); } } + /** + * Report current state to routing table propagation latency + * This method is not threadsafe. Take care of _reportingTask atomicity if use in multi-threads. + */ + private void recordPropagationLatency(final long currentTime, final CurrentStateSnapshot currentStateSnapshot) { + // Note that due to the extra mem footprint introduced by currentStateSnapshot ref, we restrict running report task count to be 1. + // Any parallel tasks will be skipped. So the reporting metric data is sampled. + if (_reportingTask == null || _reportingTask.isDone()) { + _reportingTask = _reportExecutor.submit(new Callable<Object>() { + @Override public Object call() { + // getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to avoid performance impact. + Map<PropertyKey, Map<String, Long>> currentStateEndTimeMap = + currentStateSnapshot.getNewCurrentStateEndTimes(); + for (PropertyKey key : currentStateEndTimeMap.keySet()) { + Map<String, Long> partitionStateEndTimes = currentStateEndTimeMap.get(key); + for (String partition : partitionStateEndTimes.keySet()) { + long endTime = partitionStateEndTimes.get(partition); + if (currentTime >= endTime) { + _monitor.recordStatePropagationLatency(currentTime - endTime); + logger.debug( + "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}", + key.toString(), partition, endTime, currentTime - endTime); + } else { + // Verbose log in case currentTime < endTime. This could be the case that Router clock is slower than the participant clock. + logger.trace( + "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}", + key.toString(), partition, endTime, currentTime - endTime); + } + } + } + return null; + } + }); + } + } + public void queueEvent(NotificationContext context, ClusterEventType eventType, HelixConstants.ChangeType changeType) { ClusterEvent event = new ClusterEvent(_clusterName, eventType); http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java index 219b753..a598a92 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java @@ -1,5 +1,6 @@ package org.apache.helix.integration.spectator; +import java.lang.management.ManagementFactory; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -9,12 +10,16 @@ import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.monitoring.mbeans.MBeanRegistrar; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor; import org.apache.helix.spectator.RoutingTableProvider; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; @@ -23,6 +28,14 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; + public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { private HelixManager _manager; private final int NUM_NODES = 10; @@ -32,6 +45,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); private MockParticipantManager[] _participants; private ClusterControllerManager _controller; + private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer(); @BeforeClass public void beforeClass() throws Exception { @@ -83,7 +97,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { } @Test - public void testRoutingTableWithCurrentStates() throws InterruptedException { + public void testRoutingTableWithCurrentStates() throws Exception { RoutingTableProvider routingTableEV = new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW); RoutingTableProvider routingTableCurrentStates = new RoutingTableProvider(_manager, PropertyType.CURRENTSTATES); @@ -91,12 +105,14 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { try { String db1 = "TestDB-1"; _gSetupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave", IdealState.RebalanceMode.FULL_AUTO.name()); + long startTime = System.currentTimeMillis(); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS); Thread.sleep(200); ZkHelixClusterVerifier clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); Assert.assertTrue(clusterVerifier.verifyByPolling()); + validatePropagationLatency(PropertyType.CURRENTSTATES, System.currentTimeMillis() - startTime); IdealState idealState1 = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db1); @@ -105,19 +121,24 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { // add new DB String db2 = "TestDB-2"; _gSetupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave", IdealState.RebalanceMode.FULL_AUTO.name()); + startTime = System.currentTimeMillis(); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS); Thread.sleep(200); Assert.assertTrue(clusterVerifier.verifyByPolling()); + validatePropagationLatency(PropertyType.CURRENTSTATES, System.currentTimeMillis() - startTime); IdealState idealState2 = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db2); validate(idealState2, routingTableEV, routingTableCurrentStates); // shutdown an instance + startTime = System.currentTimeMillis(); _participants[0].syncStop(); Thread.sleep(200); Assert.assertTrue(clusterVerifier.verifyByPolling()); + validatePropagationLatency(PropertyType.CURRENTSTATES, System.currentTimeMillis() - startTime); + validate(idealState1, routingTableEV, routingTableCurrentStates); validate(idealState2, routingTableEV, routingTableCurrentStates); } finally { @@ -126,6 +147,24 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { } } + private ObjectName buildObjectName(PropertyType type) + throws MalformedObjectNameException { + return MBeanRegistrar.buildObjectName(MonitorDomainNames.RoutingTableProvider.name(), + RoutingTableProviderMonitor.CLUSTER_KEY, CLUSTER_NAME, RoutingTableProviderMonitor.DATA_TYPE_KEY, + type.name()); + } + + private void validatePropagationLatency(PropertyType type, final long upperBound) + throws Exception { + final ObjectName name = buildObjectName(type); + Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() throws Exception { + long stateLatency = (long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"); + return stateLatency > 0 && stateLatency <= upperBound; + } + }, 1000)); + } + @Test (dependsOnMethods = {"testRoutingTableWithCurrentStates"}) public void testWithSupportSourceDataType() { new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown(); http://git-wip-us.apache.org/repos/asf/helix/blob/c701a945/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java index 05240c1..1ab8713 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java @@ -82,6 +82,8 @@ public class TestRoutingTableProviderMonitor { Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 15); Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"), 0); Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 0); + // StatePropagationLatencyGauge only apply for current state + Assert.assertEquals(_beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"), null); long startTime = System.currentTimeMillis(); Thread.sleep(5); @@ -97,4 +99,23 @@ public class TestRoutingTableProviderMonitor { monitor.unregister(); } + + public void testCurrentStateMetrics() throws JMException, InterruptedException { + PropertyType type = PropertyType.CURRENTSTATES; + RoutingTableProviderMonitor monitor = new RoutingTableProviderMonitor(type, TEST_CLUSTER); + monitor.register(); + ObjectName name = buildObjectName(type, TEST_CLUSTER); + + monitor.increaseCallbackCounters(10); + Assert.assertEquals((long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"), 0); + + monitor.recordStatePropagationLatency(5); + long statelatency = (long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"); + Assert.assertEquals(statelatency, 5); + monitor.recordStatePropagationLatency(10); + statelatency = (long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"); + Assert.assertEquals(statelatency, 10); + + monitor.unregister(); + } }
