Adding RoutingTableProvider monitor for tracking refresh actions and routing table update callbacks.
The monitor contains following metrics. DataRefreshLatencyGauge CallbackCounter EventQueueSizeGauge DataRefreshCounter Also add tests for this monitor. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/993beb38 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/993beb38 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/993beb38 Branch: refs/heads/master Commit: 993beb3834f4013de8d6d8221bd71ccdced93632 Parents: 3deeeab Author: Jiajun Wang <[email protected]> Authored: Thu Jul 12 10:33:21 2018 -0700 Committer: jiajunwang <[email protected]> Committed: Tue Jul 17 11:57:12 2018 -0700 ---------------------------------------------------------------------- .../monitoring/mbeans/MonitorDomainNames.java | 1 + .../mbeans/RoutingTableProviderMonitor.java | 100 +++++++++++++++++++ .../helix/spectator/RoutingTableProvider.java | 28 +++++- ...TestRoutingTableProviderPeriodicRefresh.java | 4 +- .../mbeans/TestRoutingTableProviderMonitor.java | 100 +++++++++++++++++++ 5 files changed, 228 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java index c28570d..73bf057 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java @@ -27,5 +27,6 @@ public enum MonitorDomainNames { HelixZkClient, HelixThreadPoolExecutor, HelixCallback, + RoutingTableProvider, CLMParticipantReport } http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java new file mode 100644 index 0000000..1c64783 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java @@ -0,0 +1,100 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import org.apache.helix.PropertyType; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; + +import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class RoutingTableProviderMonitor extends DynamicMBeanProvider { + public static final String DATA_TYPE_KEY = "DataType"; + public static final String CLUSTER_KEY = "Cluster"; + public static final String DEFAULT = "DEFAULT"; + + private static final String MBEAN_DESCRIPTION = "Helix RoutingTableProvider Monitor"; + private final String _sensorName; + private final PropertyType _propertyType; + private final String _clusterName; + + private SimpleDynamicMetric<Long> _callbackCounter; + private SimpleDynamicMetric<Long> _eventQueueSizeGauge; + private SimpleDynamicMetric<Long> _dataRefreshCounter; + private HistogramDynamicMetric _dataRefreshLatencyGauge; + + public RoutingTableProviderMonitor(final PropertyType propertyType, String clusterName) { + _propertyType = propertyType; + _clusterName = clusterName == null ? DEFAULT : clusterName; + + // Don't put instanceName into sensor name. This detail information is in the MBean name already. + _sensorName = String + .format("%s.%s.%s", MonitorDomainNames.RoutingTableProvider.name(), _clusterName, + _propertyType.name()); + + _dataRefreshLatencyGauge = new HistogramDynamicMetric("DataRefreshLatencyGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l); + _eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l); + _dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l); + } + + @Override + public String getSensorName() { + return _sensorName; + } + + private ObjectName getMBeanName() throws MalformedObjectNameException { + return new ObjectName(String + .format("%s:%s=%s,%s=%s", MonitorDomainNames.RoutingTableProvider.name(), CLUSTER_KEY, + _clusterName, DATA_TYPE_KEY, _propertyType.name())); + } + + public void increaseCallbackCounters(long currentQueueSize) { + _callbackCounter.updateValue(_callbackCounter.getValue() + 1); + _eventQueueSizeGauge.updateValue(currentQueueSize); + } + + public void increaseDataRefreshCounters(long startTime) { + _dataRefreshCounter.updateValue(_dataRefreshCounter.getValue() + 1); + _dataRefreshLatencyGauge.updateValue(System.currentTimeMillis() - startTime); + } + + @Override + public RoutingTableProviderMonitor register() throws JMException { + List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); + attributeList.add(_dataRefreshLatencyGauge); + attributeList.add(_callbackCounter); + attributeList.add(_eventQueueSizeGauge); + attributeList.add(_dataRefreshCounter); + + doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName()); + return this; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 57b2fad..cc373db 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.management.JMException; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; @@ -52,6 +54,7 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +68,7 @@ public class RoutingTableProvider private final RouterUpdater _routerUpdater; private final PropertyType _sourceDataType; private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap; + private final RoutingTableProviderMonitor _monitor; // For periodic refresh private long _lastRefreshTimestamp; @@ -101,6 +105,14 @@ public class RoutingTableProvider _sourceDataType = sourceDataType; _routingTableChangeListenerMap = new ConcurrentHashMap<>(); String clusterName = _helixManager != null ? _helixManager.getClusterName() : null; + + _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName); + try { + _monitor.register(); + } catch (JMException e) { + logger.error("Failed to register RoutingTableProvider monitor MBean.", e); + } + _routerUpdater = new RouterUpdater(clusterName, _sourceDataType); _routerUpdater.start(); @@ -189,6 +201,9 @@ public class RoutingTableProvider _periodicRefreshExecutor.shutdown(); } _routerUpdater.shutdown(); + + _monitor.unregister(); + if (_helixManager != null) { PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder(); switch (_sourceDataType) { @@ -511,7 +526,7 @@ public class RoutingTableProvider _routingTableRef.set(newRoutingTable); } - public void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) { + protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) { HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -520,7 +535,7 @@ public class RoutingTableProvider refresh(externalViewList, configList, liveInstances); } - public void refresh(Collection<ExternalView> externalViews, + protected void refresh(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { long startTime = System.currentTimeMillis(); RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances); @@ -581,6 +596,9 @@ public class RoutingTableProvider logger.error(String.format("HelixManager is not connected for router update event: %s", event)); throw new HelixException("HelixManager is not connected for router update event."); } + + long startTime = System.currentTimeMillis(); + _dataCache.refresh(manager.getHelixDataAccessor()); switch (_sourceDataType) { case EXTERNALVIEW: @@ -599,6 +617,8 @@ public class RoutingTableProvider logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", _sourceDataType); } + + _monitor.increaseDataRefreshCounters(startTime); } } @@ -616,6 +636,8 @@ public class RoutingTableProvider event.addAttribute(AttributeName.helixmanager.name(), context.getManager()); event.addAttribute(AttributeName.changeContext.name(), context); queueEvent(event); + + _monitor.increaseCallbackCounters(_eventQueue.size()); } } @@ -630,4 +652,4 @@ public class RoutingTableProvider return _context; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java index c78b8e6..e2b6df1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java @@ -142,7 +142,7 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase { } @Override - public synchronized void refresh(List<ExternalView> externalViewList, + protected synchronized void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) { super.refresh(externalViewList, changeContext); _refreshCount++; @@ -152,7 +152,7 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase { } @Override - public synchronized void refresh(Collection<ExternalView> externalViews, + protected synchronized void refresh(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { super.refresh(externalViews, instanceConfigs, liveInstances); _refreshCount++; http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java new file mode 100644 index 0000000..05240c1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java @@ -0,0 +1,100 @@ +package org.apache.helix.monitoring.mbeans; + +import org.apache.helix.PropertyType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.HashSet; +import java.util.Set; + +public class TestRoutingTableProviderMonitor { + + private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer(); + + private final String TEST_CLUSTER = "test_cluster"; + + private ObjectName buildObjectName(PropertyType type, String cluster) + throws MalformedObjectNameException { + return MBeanRegistrar.buildObjectName(MonitorDomainNames.RoutingTableProvider.name(), + RoutingTableProviderMonitor.CLUSTER_KEY, cluster, RoutingTableProviderMonitor.DATA_TYPE_KEY, + type.name()); + } + + private ObjectName buildObjectName(PropertyType type, String cluster, int num) + throws MalformedObjectNameException { + ObjectName objectName = buildObjectName(type, cluster); + if (num > 0) { + return new ObjectName(String + .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE, + String.valueOf(num))); + } else { + return objectName; + } + } + + @Test + public void testMBeanRegisteration() throws JMException { + Set<RoutingTableProviderMonitor> monitors = new HashSet<>(); + for (PropertyType type : PropertyType.values()) { + monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register()); + Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER))); + } + + for (PropertyType type : PropertyType.values()) { + monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register()); + Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 1))); + } + + for (PropertyType type : PropertyType.values()) { + monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register()); + Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 2))); + } + + // Un-register all monitors + for (RoutingTableProviderMonitor monitor : monitors) { + monitor.unregister(); + } + + for (PropertyType type : PropertyType.values()) { + Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER))); + Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 1))); + Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 2))); + } + } + + @Test + public void testMetrics() throws JMException, InterruptedException { + PropertyType type = PropertyType.EXTERNALVIEW; + RoutingTableProviderMonitor monitor = new RoutingTableProviderMonitor(type, TEST_CLUSTER); + monitor.register(); + ObjectName name = buildObjectName(type, TEST_CLUSTER); + + monitor.increaseCallbackCounters(10); + Assert.assertEquals((long) _beanServer.getAttribute(name, "CallbackCounter"), 1); + Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 10); + monitor.increaseCallbackCounters(15); + Assert.assertEquals((long) _beanServer.getAttribute(name, "CallbackCounter"), 2); + Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 15); + Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"), 0); + Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 0); + + long startTime = System.currentTimeMillis(); + Thread.sleep(5); + monitor.increaseDataRefreshCounters(startTime); + long latency = (long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"); + Assert.assertTrue(latency >= 5 && latency <= System.currentTimeMillis() - startTime); + Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 1); + + monitor.increaseDataRefreshCounters(startTime); + long newLatency = (long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"); + Assert.assertTrue(newLatency >= latency); + Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 2); + + monitor.unregister(); + } +}
