Avoid instance name in the HelixZkClient, HelixCallback and ParticipantMessageMonitor metric sensor name.
Instance name is duplicate information in most cases. The monitor system already be aware of this information when it collects metrics from a host. To make it worse, instance name makes the metric sensor name hard to consolidate. This change remove the instance related information from "sensorName". Note that for the use cases that multiple applications are deployed on the same host, instance name is still required to distinguish monitor items. So this info is still kept in MBean name. Additional change: Refine MessageLatencyMonitor sensor name. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ead6a729 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ead6a729 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ead6a729 Branch: refs/heads/master Commit: ead6a729d71d12020f480eba1620f0c91c36c671 Parents: f464242 Author: Jiajun Wang <jjw...@linkedin.com> Authored: Wed Sep 13 15:04:45 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:08:24 2017 -0800 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 13 +- .../org/apache/helix/manager/zk/ZkClient.java | 36 +++- .../messaging/DefaultMessagingService.java | 2 +- .../messaging/handling/HelixTaskExecutor.java | 2 +- .../monitoring/ParticipantStatusMonitor.java | 190 ------------------ .../monitoring/mbeans/ClusterEventMonitor.java | 7 +- .../monitoring/mbeans/ClusterStatusMonitor.java | 54 ++---- .../monitoring/mbeans/HelixCallbackMonitor.java | 36 ++-- .../monitoring/mbeans/InstanceMonitor.java | 8 - .../mbeans/MessageLatencyMonitor.java | 31 +-- .../monitoring/mbeans/MessageQueueMonitor.java | 2 +- .../mbeans/ParticipantMessageMonitor.java | 8 +- .../mbeans/ParticipantStatusMonitor.java | 192 +++++++++++++++++++ .../mbeans/PerInstanceResourceMonitor.java | 8 +- .../monitoring/mbeans/ResourceMonitor.java | 30 +-- .../monitoring/mbeans/ResourceMonitorMBean.java | 74 ------- .../mbeans/StateTransitionStatMonitor.java | 5 - .../monitoring/mbeans/ZkClientMonitor.java | 33 ++-- .../monitoring/mbeans/ZkClientPathMonitor.java | 29 ++- .../dynamicMBeans/DynamicMBeanProvider.java | 39 ++-- .../helix/store/zk/ZkHelixPropertyStore.java | 8 +- .../helix/task/TaskStateModelFactory.java | 3 + .../TestClusterEventStatusMonitor.java | 6 +- .../monitoring/TestParticipantMonitor.java | 4 +- .../mbeans/TestDisableResourceMbean.java | 2 +- .../mbeans/TestDropResourceMetricsReset.java | 2 +- .../mbeans/TestHelixCallbackMonitor.java | 11 +- .../mbeans/TestResetClusterMetrics.java | 2 +- .../monitoring/mbeans/TestResourceMonitor.java | 1 + .../monitoring/mbeans/TestZkClientMonitor.java | 47 +++-- .../store/zk/TestZkHelixPropertyStore.java | 2 +- 31 files changed, 427 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index d048ad1..db660bd 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -209,8 +209,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } _instanceName = instanceName; - _preConnectCallbacks = new ArrayList<PreConnectCallback>(); - _handlers = new ArrayList<CallbackHandler>(); + _preConnectCallbacks = new ArrayList<>(); + _handlers = new ArrayList<>(); _properties = new HelixManagerProperties("cluster-manager-version.properties"); _version = _properties.getVersion(); @@ -219,8 +219,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { try { _callbackMonitors = new HashMap<>(); for (ChangeType changeType : ChangeType.values()) { - _callbackMonitors.put(changeType, new HelixCallbackMonitor(instanceType, - String.format("%s.%s", clusterName, instanceName), changeType)); + HelixCallbackMonitor callbackMonitor = + new HelixCallbackMonitor(instanceType, clusterName, instanceName, changeType); + callbackMonitor.register(); + _callbackMonitors.put(changeType, callbackMonitor); } } catch (JMException e) { LOG.error("Error in creating callback monitor.", e); @@ -603,7 +605,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { zkClientBuilder.setZkServer(_zkAddress).setSessionTimeout(_sessionTimeout) .setConnectionTimeout(_clientConnectionTimeout).setZkSerializer(zkSerializer) .setMonitorType(_instanceType.name()) - .setMonitorKey(String.format("%s.%s", _clusterName, _instanceName)) + .setMonitorKey(_clusterName) + .setMonitorInstanceName(_instanceName) .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType .equals(InstanceType.CONTROLLER_PARTICIPANT)); _zkclient = zkClientBuilder.build(); http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java index 7ec693f..d5ca32a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java @@ -64,16 +64,16 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { private ZkClient(IZkConnection connection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, - boolean monitorRootPathOnly) { + String monitorInstanceName, boolean monitorRootPathOnly) { super(connection, connectionTimeout, new ByteArraySerializer(), operationRetryTimeout); - init(zkSerializer, monitorType, monitorKey, monitorRootPathOnly); + init(zkSerializer, monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); } public ZkClient(IZkConnection connection, int connectionTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, long operationRetryTimeout) { this(connection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, - monitorKey, true); + monitorKey, null, true); } public ZkClient(IZkConnection connection, int connectionTimeout, @@ -133,16 +133,17 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { } protected void init(PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, - boolean monitorRootPathOnly) { + String monitorInstanceName, boolean monitorRootPathOnly) { _zkSerializer = zkSerializer; if (LOG.isTraceEnabled()) { StackTraceElement[] calls = Thread.currentThread().getStackTrace(); LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls)); } try { - if (monitorKey != null && !monitorKey.isEmpty() && - monitorType != null && !monitorType.isEmpty()) { - _monitor = new ZkClientMonitor(monitorType, monitorKey, monitorRootPathOnly); + if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType + .isEmpty()) { + _monitor = + new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); } else { LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); } @@ -605,6 +606,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { String _monitorType; String _monitorKey; + String _monitorInstanceName = null; boolean _monitorRootPathOnly = true; public Builder setConnection(IZkConnection connection) { @@ -627,16 +629,34 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { return this; } + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * @param monitorType + */ public Builder setMonitorType(String monitorType) { this._monitorType = monitorType; return this; } + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * @param monitorKey + */ public Builder setMonitorKey(String monitorKey) { this._monitorKey = monitorKey; return this; } + /** + * Used as part of the MBean ObjectName. This item is optional. + * @param instanceName + */ + public Builder setMonitorInstanceName(String instanceName) { + this._monitorInstanceName = instanceName; + return this; + } + + public Builder setMonitorRootPathOnly(Boolean monitorRootPathOnly) { this._monitorRootPathOnly = monitorRootPathOnly; return this; @@ -676,7 +696,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { } return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer, - _monitorType, _monitorKey, _monitorRootPathOnly); + _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java index a1de26a..cd8f573 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -42,7 +42,7 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.builder.ConfigScopeBuilder; -import org.apache.helix.monitoring.ParticipantStatusMonitor; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.log4j.Logger; public class DefaultMessagingService implements ClusterMessagingService { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 6bddd16..507f273 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -59,7 +59,7 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.monitoring.ParticipantStatusMonitor; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.monitoring.mbeans.MessageQueueMonitor; import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor; import org.apache.helix.participant.HelixStateMachineEngine; http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java deleted file mode 100644 index ee15aec..0000000 --- a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java +++ /dev/null @@ -1,190 +0,0 @@ -package org.apache.helix.monitoring; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.model.Message; -import org.apache.helix.monitoring.mbeans.*; -import org.apache.log4j.Logger; - -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; - -public class ParticipantStatusMonitor { - private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap = - new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>(); - private static final Logger LOG = Logger.getLogger(ParticipantStatusMonitor.class); - public static final String PARTICIPANT_KEY = "ParticipantName"; - public static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus"; - - private MBeanServer _beanServer; - private ParticipantMessageMonitor _messageMonitor; - private MessageLatencyMonitor _messageLatencyMonitor; - private Map<String, ThreadPoolExecutorMonitor> _executorMonitors; - - public ParticipantStatusMonitor(boolean isParticipant, String instanceName) { - try { - _beanServer = ManagementFactory.getPlatformMBeanServer(); - if (isParticipant) { - _messageMonitor = new ParticipantMessageMonitor(instanceName); - _messageLatencyMonitor = - new MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), instanceName); - _executorMonitors = new ConcurrentHashMap<>(); - register(_messageMonitor, getObjectName(_messageMonitor.getParticipantBeanName())); - } - } catch (Exception e) { - LOG.warn(e); - e.printStackTrace(); - _beanServer = null; - } - } - - public synchronized void reportReceivedMessage(Message message) { - if (_messageMonitor != null) { // is participant - _messageMonitor.incrementReceivedMessages(1); - _messageMonitor.incrementPendingMessages(1); - _messageLatencyMonitor.updateLatency(message); - } - } - - public synchronized void reportProcessedMessage(Message message, - ParticipantMessageMonitor.ProcessedMessageState processedMessageState) { - if (_messageMonitor != null) { // is participant - switch (processedMessageState) { - case DISCARDED: - _messageMonitor.incrementDiscardedMessages(1); - _messageMonitor.decrementPendingMessages(1); - break; - case FAILED: - _messageMonitor.incrementFailedMessages(1); - _messageMonitor.decrementPendingMessages(1); - break; - case COMPLETED: - _messageMonitor.incrementCompletedMessages(1); - _messageMonitor.decrementPendingMessages(1); - break; - } - } - } - - public void reportTransitionStat(StateTransitionContext cxt, StateTransitionDataPoint data) { - if (_beanServer == null) { - LOG.warn("bean server is null, skip reporting"); - return; - } - try { - if (!_monitorMap.containsKey(cxt)) { - synchronized (this) { - if (!_monitorMap.containsKey(cxt)) { - StateTransitionStatMonitor bean = - new StateTransitionStatMonitor(cxt); - _monitorMap.put(cxt, bean); - String beanName = cxt.toString(); - register(bean, getObjectName(beanName)); - } - } - } - _monitorMap.get(cxt).addDataPoint(data); - } catch (Exception e) { - LOG.warn(e); - e.printStackTrace(); - } - } - - private ObjectName getObjectName(String name) throws MalformedObjectNameException { - LOG.info("Registering bean: " + name); - return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name)); - } - - private void register(Object bean, ObjectName name) { - if (_beanServer == null) { - LOG.warn("bean server is null, skip reporting"); - return; - } - try { - _beanServer.unregisterMBean(name); - } catch (Exception e1) { - // Swallow silently - } - - try { - _beanServer.registerMBean(bean, name); - } catch (Exception e) { - LOG.warn("Could not register MBean", e); - } - } - - public void shutDown() { - if (_messageMonitor != null) { // is participant - try { - ObjectName name = getObjectName(_messageMonitor.getParticipantBeanName()); - if (_beanServer.isRegistered(name)) { - _beanServer.unregisterMBean(name); - } - } catch (Exception e) { - LOG.warn("fail to unregister " + _messageMonitor.getParticipantBeanName(), e); - } - } - if (_messageLatencyMonitor != null) { - _messageLatencyMonitor.unregister(); - } - for (StateTransitionContext cxt : _monitorMap.keySet()) { - try { - ObjectName name = getObjectName(cxt.toString()); - if (_beanServer.isRegistered(name)) { - _beanServer.unregisterMBean(name); - } - } catch (Exception e) { - LOG.warn("fail to unregister " + cxt.toString(), e); - } - } - _monitorMap.clear(); - } - - public void createExecutorMonitor(String type, ExecutorService executor) { - if (_executorMonitors == null) { - return; - } - if (! (executor instanceof ThreadPoolExecutor)) { - return; - } - - try { - _executorMonitors.put(type, - new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor)); - } catch (JMException e) { - LOG.warn(String.format( - "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e); - } - } - - public void removeExecutorMonitor(String type) { - if (_executorMonitors != null && _executorMonitors.containsKey(type)) { - _executorMonitors.get(type).unregister(); - _executorMonitors.remove(type); - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java index b6853cb..a3cd1ba 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java @@ -25,8 +25,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import javax.management.JMException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; import java.util.ArrayList; import java.util.List; @@ -82,12 +80,13 @@ public class ClusterEventMonitor extends DynamicMBeanProvider { "ClusterEvent", PHASE_DN_KEY, _phaseName); } - public void register() throws JMException { + public ClusterEventMonitor register() throws JMException { List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); attributeList.add(_totalDuration); attributeList.add(_maxDuration); attributeList.add(_count); attributeList.add(_duration); - register(attributeList, _clusterStatusMonitor.getObjectName(getBeanName())); + doRegister(attributeList, _clusterStatusMonitor.getObjectName(getBeanName())); + return this; } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 536865b..88d9f68 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -21,34 +21,18 @@ package org.apache.helix.monitoring.mbeans; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.lang.management.ManagementFactory; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.model.*; +import org.apache.helix.task.*; +import org.apache.log4j.Logger; + import javax.management.JMException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.helix.controller.stages.BestPossibleStateOutput; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.task.JobConfig; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskState; -import org.apache.helix.task.WorkflowConfig; -import org.apache.helix.task.WorkflowContext; -import org.apache.log4j.Logger; +import java.lang.management.ManagementFactory; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { @@ -83,7 +67,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>(); // phaseName -> eventMonitor - private final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap = + new ConcurrentHashMap<>(); /** * PerInstanceResource bean map: beanName->bean @@ -106,13 +91,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } public ObjectName getObjectName(String name) throws MalformedObjectNameException { - return new ObjectName(String.format("%s: %s", MonitorDomainNames.ClusterStatus.name(), name)); - } - - // TODO remove getBeanName()? - // Used by other external JMX consumers like ingraph - public String getBeanName() { - return MonitorDomainNames.ClusterStatus.name() + " " + _clusterName; + return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), name)); } public String getClusterName() { @@ -287,8 +266,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } monitor.register(); } catch (JMException e) { - LOG.error( - "Failed to register ClusterEventMonitorMbean for cluster " + _clusterName + " and phase type: " + phase, e); + LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + _clusterName + + " and phase type: " + phase, e); return; } } @@ -458,7 +437,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { synchronized (this) { if (!_resourceMbeanMap.containsKey(resourceName)) { String beanName = getResourceBeanName(resourceName); - ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName)); + ResourceMonitor bean = + new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName)); + bean.register(); _resourceMbeanMap.put(resourceName, bean); } } @@ -555,6 +536,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { continue; } WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow); + if (workflowConfig == null) { + continue; + } Set<String> allJobs = workflowConfig.getJobDag().getAllNodes(); WorkflowContext workflowContext = driver.getWorkflowContext(workflow); for (String job : allJobs) { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java index d23c405..ad9c787 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java @@ -38,6 +38,9 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider { private static final String MBEAN_DESCRIPTION = "Helix Callback Monitor"; private final String _sensorName; private final HelixConstants.ChangeType _changeType; + private final InstanceType _type; + private final String _clusterName; + private final String _instanceName; private SimpleDynamicMetric<Long> _counter = new SimpleDynamicMetric("Counter", 0l); private SimpleDynamicMetric<Long> _unbatchedCounter = @@ -48,20 +51,17 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider { private HistogramDynamicMetric _latencyGauge = new HistogramDynamicMetric("LatencyGauge", _metricRegistry.histogram(getMetricRegistryNamePrefix() + "LatencyGauge")); - public HelixCallbackMonitor(InstanceType type, String key, HelixConstants.ChangeType changeType) - throws JMException { + public HelixCallbackMonitor(InstanceType type, String clusterName, String instanceName, + HelixConstants.ChangeType changeType) throws JMException { _changeType = changeType; + _type = type; + _clusterName = clusterName; + _instanceName = instanceName; + + // Don't put instanceName into sensor name. This detail information is in the MBean name already. _sensorName = String - .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), key, + .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), clusterName, changeType.name()); - - List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); - attributeList.add(_counter); - attributeList.add(_unbatchedCounter); - attributeList.add(_totalLatencyCounter); - attributeList.add(_latencyGauge); - register(attributeList, MBEAN_DESCRIPTION, MonitorDomainNames.HelixCallback.name(), - MONITOR_TYPE, type.name(), MONITOR_KEY, key, MONITOR_CHANGE_TYPE, changeType.name()); } @Override @@ -82,4 +82,18 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider { public void increaseCallbackUnbatchedCounters() { _unbatchedCounter.updateValue(_unbatchedCounter.getValue() + 1); } + + @Override + public HelixCallbackMonitor register() throws JMException { + List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); + attributeList.add(_counter); + attributeList.add(_unbatchedCounter); + attributeList.add(_totalLatencyCounter); + attributeList.add(_latencyGauge); + doRegister(attributeList, MBEAN_DESCRIPTION, MonitorDomainNames.HelixCallback.name(), + MONITOR_TYPE, _type.name(), MONITOR_KEY, + _clusterName + (_instanceName == null ? "" : "." + _instanceName), MONITOR_CHANGE_TYPE, + _changeType.name()); + return this; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java index 8c8c80c..dc43d48 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java @@ -97,14 +97,6 @@ public class InstanceMonitor implements InstanceMonitorMBean { return _participantName; } - /** - * Helper for basic formatted view of this bean - * @return bean name - */ - public String getBeanName() { - return _clusterName + " " + serializedTags() + " " + _participantName; - } - private String serializedTags() { return Joiner.on('|').skipNulls().join(_tags).toString(); } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java index 76ab42e..cc00496 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java @@ -20,7 +20,6 @@ package org.apache.helix.monitoring.mbeans; */ import org.apache.helix.model.Message; -import org.apache.helix.monitoring.ParticipantStatusMonitor; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; @@ -31,10 +30,10 @@ import java.util.ArrayList; import java.util.List; public class MessageLatencyMonitor extends DynamicMBeanProvider { - public static String MONITOR_TYPE_KW = "MonitorType"; - private static final String MBEAN_DESCRIPTION = "Helix Message Latency Monitor"; private final String _sensorName; + private final String _domainName; + private final String _participantName; private SimpleDynamicMetric<Long> _totalMessageCount = new SimpleDynamicMetric("TotalMessageCount", 0l); @@ -45,16 +44,10 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider { _metricRegistry.histogram(getMetricRegistryNamePrefix() + "MessageLatencyGauge")); public MessageLatencyMonitor(String domainName, String participantName) throws JMException { - _sensorName = String - .format("%s.%s.%s.%s", ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName, - MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()); - - List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); - attributeList.add(_totalMessageCount); - attributeList.add(_totalMessageLatency); - attributeList.add(_messageLatencyGauge); - register(attributeList, MBEAN_DESCRIPTION, domainName, ParticipantStatusMonitor.PARTICIPANT_KEY, - participantName, MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()); + _domainName = domainName; + _participantName = participantName; + _sensorName = String.format("%s.%s", ParticipantMessageMonitor.PARTICIPANT_STATUS_KEY, + "MessageLatency"); } @Override @@ -70,4 +63,16 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider { _totalMessageLatency.updateValue(_totalMessageLatency.getValue() + latency); _messageLatencyGauge.updateValue(latency); } + + @Override + public MessageLatencyMonitor register() throws JMException { + List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); + attributeList.add(_totalMessageCount); + attributeList.add(_totalMessageLatency); + attributeList.add(_messageLatencyGauge); + doRegister(attributeList, MBEAN_DESCRIPTION, _domainName, ParticipantMessageMonitor.PARTICIPANT_KEY, + _participantName, "MonitorType", MessageLatencyMonitor.class.getSimpleName()); + + return this; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java index 9fcf279..4ae47ed 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java @@ -121,6 +121,6 @@ public class MessageQueueMonitor implements MessageQueueMonitorMBean { } public ObjectName getObjectName(String name) throws MalformedObjectNameException { - return new ObjectName(String.format("%s: %s", MonitorDomainNames.ClusterStatus.name(), name)); + return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), name)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java index b8055e5..1b47ce8 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java @@ -1,8 +1,8 @@ package org.apache.helix.monitoring.mbeans; -import org.apache.helix.monitoring.ParticipantStatusMonitor; - public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean { + public static final String PARTICIPANT_KEY = "ParticipantName"; + public static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus"; /** * The current processed state of the message @@ -25,7 +25,7 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean } public String getParticipantBeanName() { - return String.format("%s=%s", ParticipantStatusMonitor.PARTICIPANT_KEY, _participantName); + return String.format("%s=%s", PARTICIPANT_KEY, _participantName); } public void incrementReceivedMessages(int count) { @@ -79,7 +79,7 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean @Override public String getSensorName() { - return ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _participantName; + return PARTICIPANT_STATUS_KEY; } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java new file mode 100644 index 0000000..66bb3b7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java @@ -0,0 +1,192 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.model.Message; +import org.apache.helix.monitoring.StateTransitionContext; +import org.apache.helix.monitoring.StateTransitionDataPoint; +import org.apache.log4j.Logger; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +public class ParticipantStatusMonitor { + private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap = + new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>(); + private static final Logger LOG = Logger.getLogger(ParticipantStatusMonitor.class); + + private MBeanServer _beanServer; + private ParticipantMessageMonitor _messageMonitor; + private MessageLatencyMonitor _messageLatencyMonitor; + private Map<String, ThreadPoolExecutorMonitor> _executorMonitors; + + public ParticipantStatusMonitor(boolean isParticipant, String instanceName) { + try { + _beanServer = ManagementFactory.getPlatformMBeanServer(); + if (isParticipant) { + _messageMonitor = new ParticipantMessageMonitor(instanceName); + _messageLatencyMonitor = + new MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), instanceName); + _messageLatencyMonitor.register(); + _executorMonitors = new ConcurrentHashMap<>(); + register(_messageMonitor, getObjectName(_messageMonitor.getParticipantBeanName())); + } + } catch (Exception e) { + LOG.warn(e); + e.printStackTrace(); + _beanServer = null; + } + } + + public synchronized void reportReceivedMessage(Message message) { + if (_messageMonitor != null) { // is participant + _messageMonitor.incrementReceivedMessages(1); + _messageMonitor.incrementPendingMessages(1); + _messageLatencyMonitor.updateLatency(message); + } + } + + public synchronized void reportProcessedMessage(Message message, + ParticipantMessageMonitor.ProcessedMessageState processedMessageState) { + if (_messageMonitor != null) { // is participant + switch (processedMessageState) { + case DISCARDED: + _messageMonitor.incrementDiscardedMessages(1); + _messageMonitor.decrementPendingMessages(1); + break; + case FAILED: + _messageMonitor.incrementFailedMessages(1); + _messageMonitor.decrementPendingMessages(1); + break; + case COMPLETED: + _messageMonitor.incrementCompletedMessages(1); + _messageMonitor.decrementPendingMessages(1); + break; + } + } + } + + public void reportTransitionStat(StateTransitionContext cxt, StateTransitionDataPoint data) { + if (_beanServer == null) { + LOG.warn("bean server is null, skip reporting"); + return; + } + try { + if (!_monitorMap.containsKey(cxt)) { + synchronized (this) { + if (!_monitorMap.containsKey(cxt)) { + StateTransitionStatMonitor bean = + new StateTransitionStatMonitor(cxt); + _monitorMap.put(cxt, bean); + String beanName = cxt.toString(); + register(bean, getObjectName(beanName)); + } + } + } + _monitorMap.get(cxt).addDataPoint(data); + } catch (Exception e) { + LOG.warn(e); + e.printStackTrace(); + } + } + + private ObjectName getObjectName(String name) throws MalformedObjectNameException { + LOG.info("Registering bean: " + name); + return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name)); + } + + private void register(Object bean, ObjectName name) { + if (_beanServer == null) { + LOG.warn("bean server is null, skip reporting"); + return; + } + try { + _beanServer.unregisterMBean(name); + } catch (Exception e1) { + // Swallow silently + } + + try { + _beanServer.registerMBean(bean, name); + } catch (Exception e) { + LOG.warn("Could not register MBean", e); + } + } + + public void shutDown() { + if (_messageMonitor != null) { // is participant + try { + ObjectName name = getObjectName(_messageMonitor.getParticipantBeanName()); + if (_beanServer.isRegistered(name)) { + _beanServer.unregisterMBean(name); + } + } catch (Exception e) { + LOG.warn("fail to unregister " + _messageMonitor.getParticipantBeanName(), e); + } + } + if (_messageLatencyMonitor != null) { + _messageLatencyMonitor.unregister(); + } + for (StateTransitionContext cxt : _monitorMap.keySet()) { + try { + ObjectName name = getObjectName(cxt.toString()); + if (_beanServer.isRegistered(name)) { + _beanServer.unregisterMBean(name); + } + } catch (Exception e) { + LOG.warn("fail to unregister " + cxt.toString(), e); + } + } + _monitorMap.clear(); + } + + public void createExecutorMonitor(String type, ExecutorService executor) { + if (_executorMonitors == null) { + return; + } + if (! (executor instanceof ThreadPoolExecutor)) { + return; + } + + try { + _executorMonitors.put(type, + new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor)); + } catch (JMException e) { + LOG.warn(String.format( + "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e); + } + } + + public void removeExecutorMonitor(String type) { + if (_executorMonitors != null) { + ThreadPoolExecutorMonitor monitor = _executorMonitors.remove(type); + if (monitor != null) { + monitor.unregister(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java index 714767b..76959cf 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java @@ -92,11 +92,9 @@ public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBe @Override public String getSensorName() { - return Joiner - .on('.') - .join( - ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName, - serializedTags(), _participantName, _resourceName)).toString(); + return Joiner.on('.').join(ImmutableList + .of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName, serializedTags(), + _participantName, _resourceName)).toString(); } private String serializedTags() { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index 6382359..080ed62 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -76,18 +76,12 @@ public class ResourceMonitor extends DynamicMBeanProvider { private String _tag = ClusterStatusMonitor.DEFAULT_TAG; private long _lastResetTime; - private String _resourceName; - private String _clusterName; - - public enum MonitorState { - TOP_STATE - } - - public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) - throws JMException { - _clusterName = clusterName; - _resourceName = resourceName; + private final String _resourceName; + private final String _clusterName; + private final ObjectName _initObjectName; + @Override + public ResourceMonitor register() throws JMException { List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); attributeList.add(_numOfPartitions); attributeList.add(_numOfPartitionsInExternalView); @@ -106,8 +100,20 @@ public class ResourceMonitor extends DynamicMBeanProvider { attributeList.add(_maxSinglePartitionTopStateHandoffDuration); attributeList.add(_partitionTopStateHandoffDurationGauge); attributeList.add(_totalMessageReceived); + doRegister(attributeList, _initObjectName); - register(attributeList, objectName); + return this; + } + + public enum MonitorState { + TOP_STATE + } + + public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) + throws JMException { + _clusterName = clusterName; + _resourceName = resourceName; + _initObjectName = objectName; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java deleted file mode 100644 index 75e33c7..0000000 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.helix.monitoring.mbeans; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.monitoring.SensorNameProvider; - -public interface ResourceMonitorMBean extends SensorNameProvider { - public long getPartitionGauge(); - - public long getErrorPartitionGauge(); - - public long getMissingTopStatePartitionGauge(); - - public long getMissingMinActiveReplicaPartitionGauge(); - - public long getMissingReplicaPartitionGauge(); - - public long getPendingRecoveryRebalancePartitionGauge(); - - public long getPendingLoadRebalancePartitionGauge(); - - public long getRecoveryRebalanceThrottledPartitionGauge(); - - public long getLoadRebalanceThrottledPartitionGauge(); - - public long getDifferenceWithIdealStateGauge(); - - public long getExternalViewPartitionGauge(); - - /** - * Get aggregated successful handoff duration - * @return - */ - public long getSuccessfulTopStateHandoffDurationCounter(); - - /** - * Get number of top state successful handoffs - * @return - */ - public long getSucceededTopStateHandoffCounter(); - - /** - * Get number of top state failed handoffs - * @return - */ - public long getFailedTopStateHandoffCounter(); - - /** - * Get maximum single partition successful handoff - */ - public long getMaxSinglePartitionTopStateHandoffDurationGauge(); - - /** - * Get total message received for this resource - * */ - public long getTotalMessageReceived(); -} http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java index c751d5d..4a4c89e 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java @@ -51,11 +51,6 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe return _context; } - public String getBeanName() { - return _context.getClusterName() + " " + _context.getResourceName() + " " - + _context.getTransition(); - } - public String getSensorName() { return String.format("StateTransitionStat.%s.%s.%s", _context.getClusterName(), _context.getResourceName(), _context.getTransition()); http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java index b385068..639fd8a 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java @@ -22,6 +22,7 @@ package org.apache.helix.monitoring.mbeans; import org.apache.helix.HelixException; import javax.management.JMException; +import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,8 +32,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { public static final String MONITOR_KEY = "Key"; private ObjectName _objectName; - private String _monitorType; - private String _monitorKey; + private String _sensorName; private long _stateChangeEventCounter; private long _dataChangeEventCounter; @@ -40,31 +40,35 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> _zkClientPathMonitorMap = new ConcurrentHashMap<>(); - public ZkClientMonitor(String monitorType, String monitorKey, boolean monitorRootPathOnly) - throws JMException { + public ZkClientMonitor(String monitorType, String monitorKey, String monitorInstanceName, + boolean monitorRootPathOnly) throws JMException { if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || monitorType .isEmpty()) { throw new HelixException("Cannot create ZkClientMonitor without monitor key and type."); } - _monitorType = monitorType; - _monitorKey = monitorKey; - regitster(monitorType, monitorKey); + _sensorName = + String.format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey); + + _objectName = + MBeanRegistrar.register(this, getObjectName(monitorType, monitorKey, monitorInstanceName)); for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values()) { // If monitor root path only, check if the current path is Root. // Otherwise, add monitors for every path. if (!monitorRootPathOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) { - _zkClientPathMonitorMap - .put(path, new ZkClientPathMonitor(path.name(), monitorType, monitorKey)); + _zkClientPathMonitorMap.put(path, + new ZkClientPathMonitor(path, monitorType, monitorKey, monitorInstanceName).register()); } } } - private void regitster(String monitorType, String monitorKey) throws JMException { - _objectName = MBeanRegistrar - .register(this, MonitorDomainNames.HelixZkClient.name(), MONITOR_TYPE, monitorType, - MONITOR_KEY, monitorKey); + protected static ObjectName getObjectName(String monitorType, String monitorKey, + String monitorInstanceName) throws MalformedObjectNameException { + return MBeanRegistrar + .buildObjectName(MonitorDomainNames.HelixZkClient.name(), MONITOR_TYPE, monitorType, + MONITOR_KEY, + (monitorKey + (monitorInstanceName == null ? "" : "." + monitorInstanceName))); } /** @@ -79,8 +83,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { @Override public String getSensorName() { - return String - .format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), _monitorType, _monitorKey); + return _sensorName; } public void increaseStateChangeEventCounter() { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java index 08eb2bc..ccbbed9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java @@ -25,14 +25,19 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.ArrayList; import java.util.List; public class ZkClientPathMonitor extends DynamicMBeanProvider { public static final String MONITOR_PATH = "PATH"; - private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor"; private final String _sensorName; + private final String _type; + private final String _key; + private final String _instanceName; + private final PredefinedPath _path; protected enum PredefinedPath { IdealStates(".*/IDEALSTATES/.*"), @@ -87,12 +92,18 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider { return _sensorName; } - public ZkClientPathMonitor(String path, String monitorType, String monitorKey) - throws JMException { + public ZkClientPathMonitor(PredefinedPath path, String monitorType, String monitorKey, + String monitorInstanceName) { + _type = monitorType; + _key = monitorKey; + _instanceName = monitorInstanceName; + _path = path; _sensorName = String .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey, - path); + path.name()); + } + public ZkClientPathMonitor register() throws JMException { List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); attributeList.add(_readCounter); attributeList.add(_writeCounter); @@ -106,9 +117,13 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider { attributeList.add(_writeLatencyGauge); attributeList.add(_readBytesGauge); attributeList.add(_writeBytesGauge); - register(attributeList, MBEAN_DESCRIPTION, MonitorDomainNames.HelixZkClient.name(), - ZkClientMonitor.MONITOR_TYPE, monitorType, ZkClientMonitor.MONITOR_KEY, monitorKey, - MONITOR_PATH, path); + + ObjectName objectName = new ObjectName(String.format("%s,%s=%s", + ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(), + MONITOR_PATH, _path.name())); + doRegister(attributeList, MBEAN_DESCRIPTION, objectName); + + return this; } protected void record(int bytes, long latencyMilliSec, boolean isFailure, boolean isRead) { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java index a2d2662..346503b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java @@ -53,7 +53,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr * @param domain the MBean domain name * @param keyValuePairs the MBean object name components */ - protected synchronized void register(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, String description, String domain, String... keyValuePairs) throws JMException { if (_objectName != null) { throw new HelixException( @@ -70,7 +70,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr * @param description the MBean description * @param objectName the proposed MBean ObjectName */ - protected synchronized void register(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, String description, ObjectName objectName) throws JMException { if (_objectName != null) { throw new HelixException( @@ -80,22 +80,9 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr _objectName = MBeanRegistrar.register(this, objectName); } - protected synchronized void register(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, ObjectName objectName) throws JMException { - register(dynamicMetrics, null, objectName); - } - - /** - * After unregistered, the MBean can't be registered again, a new monitor has be to created. - */ - public synchronized void unregister() { - _metricRegistry.removeMatching(new MetricFilter() { - @Override - public boolean matches(String name, Metric metric) { - return name.startsWith(getMetricRegistryNamePrefix()); - } - }); - MBeanRegistrar.unregister(_objectName); + doRegister(dynamicMetrics, null, objectName); } protected String getMetricRegistryNamePrefix() { @@ -147,6 +134,24 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr new MBeanNotificationInfo[0]); } + /** + * Call doRegister() to finish registration MBean and the attributes. + */ + public abstract DynamicMBeanProvider register() throws JMException; + + /** + * After unregistered, the MBean can't be registered again, a new monitor has be to created. + */ + public synchronized void unregister() { + _metricRegistry.removeMatching(new MetricFilter() { + @Override + public boolean matches(String name, Metric metric) { + return name.startsWith(getMetricRegistryNamePrefix()); + } + }); + MBeanRegistrar.unregister(_objectName); + } + @Override public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java index b5f7075..0e91314 100644 --- a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java +++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java @@ -26,7 +26,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor; public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T> { - public static final String MONITOR_TAG = "HelixPropertyStore"; + public static final String MONITOR_TYPE = "HelixPropertyStore"; public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor, String root, List<String> subscribedPaths) { @@ -35,12 +35,10 @@ public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T> { public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath, List<String> zkCachePaths) { - super(zkAddress, serializer, chrootPath, null, zkCachePaths, - MONITOR_TAG, chrootPath); + super(zkAddress, serializer, chrootPath, null, zkCachePaths, MONITOR_TYPE, chrootPath); } public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath) { - super(zkAddress, serializer, chrootPath, null, null, - MONITOR_TAG, chrootPath); + super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, chrootPath); } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java index fa414d7..cce8f99 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java @@ -79,6 +79,9 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> { } public void shutdown() { + if (_monitor != null) { + _monitor.unregister(); + } _taskExecutor.shutdown(); _timerTaskExecutor.shutdown(); if (_monitor != null ) { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java index 5e64271..eb4f94b 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java @@ -52,7 +52,7 @@ public class TestClusterEventStatusMonitor { MBeanServer _server = ManagementFactory.getPlatformMBeanServer(); Set<ObjectInstance> mbeans = - _server.queryMBeans(new ObjectName("ClusterStatus" + ":Cluster=TestCluster,eventName=ClusterEvent,*"), null); + _server.queryMBeans(new ObjectName("ClusterStatus:Cluster=TestCluster,eventName=ClusterEvent,*"), null); Assert.assertEquals(mbeans.size(), 0); int count = 5; @@ -69,7 +69,7 @@ public class TestClusterEventStatusMonitor { mbeans = _server.queryMBeans( - new ObjectName("ClusterStatus: cluster=TestCluster,eventName=ClusterEvent,*"), null); + new ObjectName("ClusterStatus:cluster=TestCluster,eventName=ClusterEvent,*"), null); Assert.assertEquals(mbeans.size(), 6); for (ObjectInstance mbean : mbeans) { @@ -86,7 +86,7 @@ public class TestClusterEventStatusMonitor { mbeans = _server.queryMBeans( - new ObjectName("ClusterStatus: cluster=TestCluster,eventName=ClusterEvent,*"), null); + new ObjectName("ClusterStatus:cluster=TestCluster,eventName=ClusterEvent,*"), null); Assert.assertEquals(mbeans.size(), 0); System.out.println("END TestParticipantMonitor"); http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java index a01cd36..da8dce1 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java @@ -33,9 +33,7 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectInstance; import javax.management.ObjectName; -import org.apache.helix.monitoring.ParticipantStatusMonitor; -import org.apache.helix.monitoring.StateTransitionContext; -import org.apache.helix.monitoring.StateTransitionDataPoint; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; import org.apache.log4j.Logger; import org.testng.AssertJUnit; http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java index 36921e1..8c05626 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java @@ -104,6 +104,6 @@ public class TestDisableResourceMbean extends ZkUnitTestBase { String resourceBeanName = String .format("%s,%s=%s", clusterBeanName, ClusterStatusMonitor.RESOURCE_DN_KEY, resourceName); return new ObjectName( - String.format("%s: %s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName)); + String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java index 219a2db..2529360 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java @@ -108,7 +108,7 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase { String resourceBeanName = String.format("%s,%s=%s", clusterBeanName, ClusterStatusMonitor.RESOURCE_DN_KEY, resourceName); - return new ObjectName(String.format("%s: %s", MonitorDomainNames.ClusterStatus.name(), + return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName)); } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java index e183052..8758205 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java @@ -43,19 +43,19 @@ public class TestHelixCallbackMonitor { public void testMBeanRegisteration() throws JMException { Set<HelixCallbackMonitor> monitors = new HashSet<>(); for (HelixConstants.ChangeType changeType : HelixConstants.ChangeType.values()) { - monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, changeType)); + monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, changeType).register()); Assert.assertTrue( _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, changeType))); } for (HelixConstants.ChangeType changeType : HelixConstants.ChangeType.values()) { - monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, changeType)); + monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, changeType).register()); Assert.assertTrue( _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, changeType, 1))); } for (HelixConstants.ChangeType changeType : HelixConstants.ChangeType.values()) { - monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, changeType)); + monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, changeType).register()); Assert.assertTrue( _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, changeType, 2))); } @@ -77,8 +77,9 @@ public class TestHelixCallbackMonitor { @Test public void testCounter() throws JMException { - HelixCallbackMonitor monitor = - new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, HelixConstants.ChangeType.CURRENT_STATE); + HelixCallbackMonitor monitor = new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, + HelixConstants.ChangeType.CURRENT_STATE); + monitor.register(); ObjectName name = buildObjectName(TEST_TYPE, TEST_CLUSTER, HelixConstants.ChangeType.CURRENT_STATE); http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java index 8b1ec26..409a10a 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java @@ -102,6 +102,6 @@ public class TestResetClusterMetrics extends ZkUnitTestBase { } private ObjectName objectName(String beanName) throws Exception { - return new ObjectName(MonitorDomainNames.ClusterStatus.name() + ": " + beanName); + return new ObjectName(MonitorDomainNames.ClusterStatus.name() + ":" + beanName); } } http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java index 8b1ebe1..5c52195 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java @@ -47,6 +47,7 @@ public class TestResourceMonitor { @Test() public void testReportData() throws JMException { final int n = 5; ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value")); + monitor.register(); List<String> instances = new ArrayList<String>(); for (int i = 0; i < n; i++) { http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java index c5ee212..8bf136e 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java @@ -32,15 +32,13 @@ public class TestZkClientMonitor { private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer(); - private ObjectName buildObjectName(String tag, String key) throws MalformedObjectNameException { - return MBeanRegistrar - .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, tag, - ZkClientMonitor.MONITOR_KEY, key); + private ObjectName buildObjectName(String tag, String key, String instance) throws MalformedObjectNameException { + return ZkClientMonitor.getObjectName(tag, key, instance); } - private ObjectName buildObjectName(String tag, String key, int num) + private ObjectName buildObjectName(String tag, String key, String instance, int num) throws MalformedObjectNameException { - ObjectName objectName = buildObjectName(tag, key); + ObjectName objectName = buildObjectName(tag, key, instance); if (num > 0) { return new ObjectName(String .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE, @@ -50,11 +48,10 @@ public class TestZkClientMonitor { } } - private ObjectName buildPathMonitorObjectName(String tag, String key, String path) + private ObjectName buildPathMonitorObjectName(String tag, String key, String instance, String path) throws MalformedObjectNameException { - return MBeanRegistrar - .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, tag, - ZkClientMonitor.MONITOR_KEY, key, ZkClientPathMonitor.MONITOR_PATH, path); + return new ObjectName(String + .format("%s,%s=%s", buildObjectName(tag, key, instance).toString(), ZkClientPathMonitor.MONITOR_PATH, path)); } @Test @@ -62,37 +59,39 @@ public class TestZkClientMonitor { final String TEST_TAG_1 = "test_tag_1"; final String TEST_KEY_1 = "test_key_1"; - ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, true); - Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1))); + ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true); + Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null))); // no per-path monitor items created since "monitorRootPathOnly" = true - Assert.assertFalse(_beanServer.isRegistered(buildPathMonitorObjectName(TEST_TAG_1, TEST_KEY_1, - ZkClientPathMonitor.PredefinedPath.IdealStates.name()))); + Assert.assertFalse(_beanServer.isRegistered( + buildPathMonitorObjectName(TEST_TAG_1, TEST_KEY_1, null, + ZkClientPathMonitor.PredefinedPath.IdealStates.name()))); - ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, true); - Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, 1))); + ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true); + Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null, 1))); monitor.unregister(); monitorDuplicate.unregister(); - Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1))); - Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, 1))); + Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null))); + Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null, 1))); } @Test public void testCounter() throws JMException { final String TEST_TAG = "test_tag_3"; final String TEST_KEY = "test_key_3"; + final String TEST_INSTANCE = "test_instance_3"; - ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, false); - ObjectName name = buildObjectName(TEST_TAG, TEST_KEY); + ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, TEST_INSTANCE, false); + ObjectName name = buildObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE); ObjectName rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, - ZkClientPathMonitor.PredefinedPath.Root.name()); - ObjectName idealStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, + TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.Root.name()); + ObjectName idealStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.IdealStates.name()); - ObjectName instancesName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, + ObjectName instancesName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.Instances.name()); - ObjectName currentStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, + ObjectName currentStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.CurrentStates.name()); monitor.increaseDataChangeEventCounter(); http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java index 8a3ce54..4abada2 100644 --- a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java +++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java @@ -385,7 +385,7 @@ public class TestZkHelixPropertyStore extends ZkUnitTestBase { ObjectName name = MBeanRegistrar .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, - ZkHelixPropertyStore.MONITOR_TAG, ZkClientMonitor.MONITOR_KEY, TEST_ROOT, + ZkHelixPropertyStore.MONITOR_TYPE, ZkClientMonitor.MONITOR_KEY, TEST_ROOT, ZkClientPathMonitor.MONITOR_PATH, "Root"); MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); Assert.assertTrue(beanServer.isRegistered(name));