Repository: helix Updated Branches: refs/heads/master 0e310fa10 -> 70c9d76ad
[HELIX-378] Add instance gauges to ClusterStatusMonitor Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7cb6b86e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7cb6b86e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7cb6b86e Branch: refs/heads/master Commit: 7cb6b86e12c24af7a370c17a8049515337865947 Parents: f1ffa86 Author: Kanak Biscuitwala <[email protected]> Authored: Tue Feb 11 14:07:04 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Tue Feb 11 14:07:04 2014 -0800 ---------------------------------------------------------------------- .../controller/stages/ReadClusterDataStage.java | 40 +++-- .../monitoring/mbeans/ClusterStatusMonitor.java | 167 ++++++++++++++++--- .../monitoring/mbeans/InstanceMonitor.java | 124 ++++++++++++++ .../monitoring/mbeans/InstanceMonitorMBean.java | 39 +++++ .../monitoring/mbeans/MessageQueueMonitor.java | 2 +- .../monitoring/mbeans/ResourceMonitor.java | 16 +- 6 files changed, 352 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java index 2279d76..85252a0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java @@ -20,13 +20,16 @@ package org.apache.helix.controller.stages; */ import java.util.Map; +import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.api.Cluster; +import org.apache.helix.api.Participant; import org.apache.helix.api.accessor.ClusterAccessor; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ContextId; +import org.apache.helix.api.id.PartitionId; import org.apache.helix.controller.context.ControllerContext; import org.apache.helix.controller.context.ControllerContextProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -36,6 +39,7 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.log4j.Logger; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; public class ReadClusterDataStage extends AbstractBaseStage { private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName()); @@ -55,22 +59,32 @@ public class ReadClusterDataStage extends AbstractBaseStage { Cluster cluster = clusterAccessor.readCluster(); + // Update the cluster status gauges ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"); if (clusterStatusMonitor != null) { - // TODO fix it - // int disabledInstances = 0; - // int disabledPartitions = 0; - // for (InstanceConfig config : _cache._instanceConfigMap.values()) { - // if (config.getInstanceEnabled() == false) { - // disabledInstances++; - // } - // if (config.getDisabledPartitions() != null) { - // disabledPartitions += config.getDisabledPartitions().size(); - // } - // } - // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), - // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions); + Set<String> instanceSet = Sets.newHashSet(); + Set<String> liveInstanceSet = Sets.newHashSet(); + Set<String> disabledInstanceSet = Sets.newHashSet(); + Map<String, Set<String>> disabledPartitions = Maps.newHashMap(); + Map<String, Set<String>> tags = Maps.newHashMap(); + for (Participant participant : cluster.getParticipantMap().values()) { + instanceSet.add(participant.getId().toString()); + if (participant.isAlive()) { + liveInstanceSet.add(participant.getId().toString()); + } + if (!participant.isEnabled()) { + disabledInstanceSet.add(participant.getId().toString()); + } + Set<String> partitionNames = Sets.newHashSet(); + for (PartitionId partitionId : participant.getDisabledPartitionIds()) { + partitionNames.add(partitionId.toString()); + } + disabledPartitions.put(participant.getId().toString(), partitionNames); + tags.put(participant.getId().toString(), participant.getTags()); + } + clusterStatusMonitor.setClusterInstanceStatus(liveInstanceSet, instanceSet, + disabledInstanceSet, disabledPartitions, tags); } event.addAttribute("ClusterDataCache", cluster); http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 789bb67..b468856 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -20,7 +20,11 @@ package org.apache.helix.monitoring.mbeans; */ import java.lang.management.ManagementFactory; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -32,23 +36,28 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.log4j.Logger; +import com.google.common.collect.Sets; + public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class); static final String CLUSTER_STATUS_KEY = "ClusterStatus"; static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus"; static final String RESOURCE_STATUS_KEY = "ResourceStatus"; + static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus"; static final String CLUSTER_DN_KEY = "cluster"; static final String RESOURCE_DN_KEY = "resourceName"; static final String INSTANCE_DN_KEY = "instanceName"; + static final String DEFAULT_TAG = "DEFAULT"; + private final String _clusterName; private final MBeanServer _beanServer; - private int _numOfLiveInstances = 0; - private int _numOfInstances = 0; - private int _numOfDisabledInstances = 0; - private int _numOfDisabledPartitions = 0; + private Set<String> _liveInstances = Collections.emptySet(); + private Set<String> _instances = Collections.emptySet(); + private Set<String> _disabledInstances = Collections.emptySet(); + private Map<String, Set<String>> _disabledPartitions = Collections.emptyMap(); private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<String, ResourceMonitor>(); @@ -56,6 +65,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap = new ConcurrentHashMap<String, MessageQueueMonitor>(); + private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = + new ConcurrentHashMap<String, InstanceMonitor>(); + public ClusterStatusMonitor(String clusterName) { _clusterName = clusterName; _beanServer = ManagementFactory.getPlatformMBeanServer(); @@ -77,22 +89,26 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { @Override public long getDownInstanceGauge() { - return _numOfInstances - _numOfLiveInstances; + return _instances.size() - _liveInstances.size(); } @Override public long getInstancesGauge() { - return _numOfInstances; + return _instances.size(); } @Override public long getDisabledInstancesGauge() { - return _numOfDisabledInstances; + return _disabledInstances.size(); } @Override public long getDisabledPartitionsGauge() { - return _numOfDisabledPartitions; + int numDisabled = 0; + for (String instance : _disabledPartitions.keySet()) { + numDisabled += _disabledPartitions.get(instance).size(); + } + return numDisabled; } @Override @@ -146,12 +162,69 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } } - public void setClusterStatusCounters(int numberLiveInstances, int numberOfInstances, - int disabledInstances, int disabledPartitions) { - _numOfInstances = numberOfInstances; - _numOfLiveInstances = numberLiveInstances; - _numOfDisabledInstances = disabledInstances; - _numOfDisabledPartitions = disabledPartitions; + /** + * Update the gauges for all instances in the cluster + * @param liveInstanceSet the current set of live instances + * @param instanceSet the current set of configured instances (live or other + * @param disabledInstanceSet the current set of configured instances that are disabled + * @param disabledPartitions a map of instance name to the set of partitions disabled on it + * @param tags a map of instance name to the set of tags on it + */ + public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet, + Set<String> disabledInstanceSet, Map<String, Set<String>> disabledPartitions, + Map<String, Set<String>> tags) { + // Unregister beans for instances that are no longer configured + Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet()); + toUnregister.removeAll(instanceSet); + try { + unregisterInstances(toUnregister); + } catch (MalformedObjectNameException e) { + LOG.error("Could not unregister instances from MBean server: " + toUnregister, e); + } + + // Register beans for instances that are newly configured + Set<String> toRegister = Sets.newHashSet(instanceSet); + toRegister.removeAll(_instanceMbeanMap.keySet()); + Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet(); + for (String instanceName : toRegister) { + InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName); + bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName), + liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName)); + monitorsToRegister.add(bean); + } + try { + registerInstances(monitorsToRegister); + } catch (MalformedObjectNameException e) { + LOG.error("Could not register instances with MBean server: " + toRegister, e); + } + + // Update all the sets + _instances = instanceSet; + _liveInstances = liveInstanceSet; + _disabledInstances = disabledInstanceSet; + _disabledPartitions = disabledPartitions; + + // Update the instance MBeans + for (String instanceName : instanceSet) { + if (_instanceMbeanMap.containsKey(instanceName)) { + // Update the bean + InstanceMonitor bean = _instanceMbeanMap.get(instanceName); + String oldSensorName = bean.getSensorName(); + bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName), + liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName)); + + // If the sensor name changed, re-register the bean so that listeners won't miss it + String newSensorName = bean.getSensorName(); + if (!oldSensorName.equals(newSensorName)) { + try { + unregisterInstances(Arrays.asList(instanceName)); + registerInstances(Arrays.asList(bean)); + } catch (MalformedObjectNameException e) { + LOG.error("Could not refresh registration with MBean server: " + instanceName, e); + } + } + } + } } public void onExternalViewChange(ExternalView externalView, IdealState idealState) { @@ -161,14 +234,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { synchronized (this) { if (!_resourceMbeanMap.containsKey(resourceName)) { ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName); - String beanName = - CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName; - register(bean, getObjectName(beanName)); - _resourceMbeanMap.put(resourceName, bean); + bean.updateExternalView(externalView, idealState); + registerResources(Arrays.asList(bean)); } } } - _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState); + ResourceMonitor bean = _resourceMbeanMap.get(resourceName); + String oldSensorName = bean.getSensorName(); + bean.updateExternalView(externalView, idealState); + String newSensorName = bean.getSensorName(); + if (!oldSensorName.equals(newSensorName)) { + unregisterResources(Arrays.asList(resourceName)); + registerResources(Arrays.asList(bean)); + } } catch (Exception e) { LOG.warn(e); } @@ -205,15 +283,64 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } _instanceMsgQueueMbeanMap.clear(); + unregisterInstances(_instanceMbeanMap.keySet()); + _instanceMbeanMap.clear(); + unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName)); } catch (Exception e) { LOG.error("fail to reset ClusterStatusMonitor", e); } } + private synchronized void registerInstances(Collection<InstanceMonitor> instances) + throws MalformedObjectNameException { + for (InstanceMonitor monitor : instances) { + String instanceName = monitor.getInstanceName(); + String beanName = getInstanceBeanName(instanceName); + register(monitor, getObjectName(beanName)); + _instanceMbeanMap.put(instanceName, monitor); + } + } + + private synchronized void unregisterInstances(Collection<String> instances) + throws MalformedObjectNameException { + for (String instanceName : instances) { + String beanName = getInstanceBeanName(instanceName); + unregister(getObjectName(beanName)); + } + _instanceMbeanMap.keySet().removeAll(instances); + } + + private synchronized void registerResources(Collection<ResourceMonitor> resources) + throws MalformedObjectNameException { + for (ResourceMonitor monitor : resources) { + String resourceName = monitor.getResourceName(); + String beanName = getResourceBeanName(resourceName); + register(monitor, getObjectName(beanName)); + _resourceMbeanMap.put(resourceName, monitor); + } + } + + private synchronized void unregisterResources(Collection<String> resources) + throws MalformedObjectNameException { + for (String resourceName : resources) { + String beanName = getResourceBeanName(resourceName); + unregister(getObjectName(beanName)); + } + _resourceMbeanMap.keySet().removeAll(resources); + } + + private String getInstanceBeanName(String instanceName) { + return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName; + } + + private String getResourceBeanName(String resourceName) { + return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName; + } + @Override public String getSensorName() { - return CLUSTER_STATUS_KEY + "_" + _clusterName; + return CLUSTER_STATUS_KEY + "." + _clusterName; } } http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java new file mode 100644 index 0000000..1385568 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java @@ -0,0 +1,124 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Implementation of the instance status bean + */ +public class InstanceMonitor implements InstanceMonitorMBean { + private final String _clusterName; + private final String _participantName; + private List<String> _tags; + private List<String> _disabledPartitions; + private boolean _isUp; + private boolean _isEnabled; + + /** + * Initialize the bean + * @param clusterName the cluster to monitor + * @param participantName the instance whose statistics this holds + */ + public InstanceMonitor(String clusterName, String participantName) { + _clusterName = clusterName; + _participantName = participantName; + _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG); + _disabledPartitions = Collections.emptyList(); + _isUp = false; + _isEnabled = false; + } + + @Override + public String getSensorName() { + return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "." + + serializedTags() + "." + _participantName; + } + + @Override + public long getOnline() { + return _isUp ? 1 : 0; + } + + @Override + public long getEnabled() { + return _isEnabled ? 1 : 0; + } + + /** + * Get all the tags currently on this instance + * @return list of tags + */ + public List<String> getTags() { + return _tags; + } + + /** + * Get the name of the monitored instance + * @return instance name as a string + */ + public String getInstanceName() { + return _participantName; + } + + /** + * Helper for basic formatted view of this bean + * @return bean name + */ + public String getBeanName() { + return _clusterName + " " + serializedTags() + " " + _participantName; + } + + private String serializedTags() { + return Joiner.on('|').skipNulls().join(_tags).toString(); + } + + /** + * Update the gauges for this instance + * @param tags current tags + * @param disabledPartitions current disabled partitions + * @param isLive true if running, false otherwise + * @param isEnabled true if enabled, false if disabled + */ + public synchronized void updateInstance(Set<String> tags, Set<String> disabledPartitions, + boolean isLive, boolean isEnabled) { + if (tags == null || tags.isEmpty()) { + _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG); + } else { + _tags = Lists.newArrayList(tags); + Collections.sort(_tags); + } + if (disabledPartitions == null) { + _disabledPartitions = Collections.emptyList(); + } else { + _disabledPartitions = Lists.newArrayList(disabledPartitions); + Collections.sort(_disabledPartitions); + } + _isUp = isLive; + _isEnabled = isEnabled; + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java new file mode 100644 index 0000000..f148700 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java @@ -0,0 +1,39 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.monitoring.SensorNameProvider; + +/** + * A basic bean describing the status of a single instance + */ +public interface InstanceMonitorMBean extends SensorNameProvider { + /** + * Check if this instance is live + * @return 1 if running, 0 otherwise + */ + public long getOnline(); + + /** + * Check if this instance is enabled + * @return 1 if enabled, 0 if disabled + */ + public long getEnabled(); +} http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java index f2df162..6b8b9e3 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java @@ -55,6 +55,6 @@ public class MessageQueueMonitor implements MessageQueueMonitorMBean { @Override public String getSensorName() { - return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "_" + _clusterName + "_" + _instanceName; + return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "." + _clusterName + "." + _instanceName; } } http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index afd2886..d1ba595 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -30,10 +30,11 @@ import org.apache.helix.model.IdealState; import org.apache.log4j.Logger; public class ResourceMonitor implements ResourceMonitorMBean { - int _numOfPartitions; + private int _numOfPartitions; int _numOfPartitionsInExternalView; int _numOfErrorPartitions; int _externalViewIdealStateDiff; + String _tag = ClusterStatusMonitor.DEFAULT_TAG; private static final Logger LOG = Logger.getLogger(ResourceMonitor.class); String _resourceName, _clusterName; @@ -60,7 +61,12 @@ public class ResourceMonitor implements ResourceMonitorMBean { @Override public String getSensorName() { - return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "_" + _clusterName + "_" + _resourceName; + return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "." + + _resourceName; + } + + public String getResourceName() { + return _resourceName; } public void updateExternalView(ExternalView externalView, IdealState idealState) { @@ -114,6 +120,12 @@ public class ResourceMonitor implements ResourceMonitorMBean { _numOfErrorPartitions = numOfErrorPartitions; _externalViewIdealStateDiff = numOfDiff; _numOfPartitionsInExternalView = externalView.getPartitionIdSet().size(); + String tag = idealState.getInstanceGroupTag(); + if (tag == null || tag.equals("") || tag.equals("null")) { + _tag = ClusterStatusMonitor.DEFAULT_TAG; + } else { + _tag = tag; + } } @Override
