Emitting per resource rebalance status for possible calculation failure. The status in MBean will be string for debug purposes only. The resource rebalance state attribute will be in one of the following state: 1. NORMAL 2. BEST_POSSIBLE_STATE_CAL_FAILED: calculation failed or no possible allocation found. 3. INTERMEDIATE_STATE_CAL_FAILED: Intermediate state calculation failed. (not include throttled case) 4. UNKNOWN: the resource is not rebalanced or newly created.
Additional related changes: 1. Fix a cluster level metric related bug to generate the right metrics data. 2. Fix a resource monitoring bug that DISABLE_MONITORING is not working. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2f39f381 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2f39f381 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2f39f381 Branch: refs/heads/master Commit: 2f39f381b0981503d7c204aabbeaa09153292e15 Parents: b549cda Author: Jiajun Wang <jjw...@linkedin.com> Authored: Fri Oct 5 16:26:11 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Thu Nov 1 14:38:36 2018 -0700 ---------------------------------------------------------------------- .../stages/BestPossibleStateCalcStage.java | 52 ++++---- .../stages/ExternalViewComputeStage.java | 4 +- .../stages/IntermediateStateCalcStage.java | 19 +++ .../monitoring/mbeans/ClusterStatusMonitor.java | 66 ++++++---- .../monitoring/mbeans/ResourceMonitor.java | 35 ++++-- .../dynamicMBeans/DynamicMBeanProvider.java | 18 +-- .../TestAlertingRebalancerFailure.java | 123 +++++++++++++------ ...ceModeWhenReachingOfflineInstancesLimit.java | 61 ++++++--- .../mbeans/TestClusterStatusMonitor.java | 10 +- .../mbeans/TestDisableResourceMbean.java | 17 ++- .../monitoring/mbeans/TestResourceMonitor.java | 21 +++- 11 files changed, 291 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 1bbd6a0..b0e453d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -43,6 +43,7 @@ import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.monitoring.mbeans.ResourceMonitor; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskRebalancer; import org.apache.helix.util.HelixUtil; @@ -75,11 +76,6 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { // Reset current INIT/RUNNING tasks on participants for throttling cache.resetActiveTaskCount(currentStateOutput); - // Check whether the offline/disabled instance count in the cluster reaches the set limit, - // if yes, pause the rebalancer. - validateOfflineInstancesLimit(cache, - (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), clusterStatusMonitor); - final BestPossibleStateOutput bestPossibleStateOutput = compute(event, resourceMap, currentStateOutput); event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); @@ -112,6 +108,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { BestPossibleStateOutput output = new BestPossibleStateOutput(); HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); + + // Check whether the offline/disabled instance count in the cluster reaches the set limit, + // if yes, pause the rebalancer. + boolean isValid = validateOfflineInstancesLimit(cache, + (HelixManager) event.getAttribute(AttributeName.helixmanager.name())); final List<String> failureResources = new ArrayList<>(); Iterator<Resource> itr = resourceMap.values().iterator(); @@ -125,6 +128,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { LogUtil.logError(logger, _eventId, "Exception when calculating best possible states for " + resource.getResourceName(), ex); + } if (!result) { failureResources.add(resource.getResourceName()); @@ -134,31 +138,34 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { } // Check and report if resource rebalance has failure - ClusterStatusMonitor clusterStatusMonitor = - event.getAttribute(AttributeName.clusterStatusMonitor.name()); - updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor, + updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager, + cache, clusterStatusMonitor, "Failed to calculate best possible states for " + failureResources.size() + " resources."); return output; } - private void updateRebalanceStatus(final boolean hasFailure, final HelixManager helixManager, - final ClusterDataCache cache, final ClusterStatusMonitor clusterStatusMonitor, - final String errorMessage) { + private void updateRebalanceStatus(final boolean hasFailure, final List<String> failedResources, + final HelixManager helixManager, final ClusterDataCache cache, + final ClusterStatusMonitor clusterStatusMonitor, final String errorMessage) { asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() { @Override public Object call() { try { - // TODO re-enable logging error after ticket HELIX-631 is resolved - /* - if (hasFailure && _statusUpdateUtil != null) { - _statusUpdateUtil - .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(), - errorMessage, helixManager); + if (hasFailure) { + /* TODO Enable this update when we resolve ZK server load issue. This will cause extra write to ZK. + if (_statusUpdateUtil != null) { + _statusUpdateUtil + .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(), + errorMessage, helixManager); + } + */ + LogUtil.logWarn(logger, _eventId, errorMessage); } - */ if (clusterStatusMonitor != null) { clusterStatusMonitor.setRebalanceFailureGauge(hasFailure); + clusterStatusMonitor.setResourceRebalanceStates(failedResources, + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED); } } catch (Exception e) { LogUtil.logError(logger, _eventId, "Could not update cluster status!", e); @@ -170,8 +177,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { // Check whether the offline/disabled instance count in the cluster reaches the set limit, // if yes, pause the rebalancer, and throw exception to terminate rebalance cycle. - private void validateOfflineInstancesLimit(final ClusterDataCache cache, - final HelixManager manager, final ClusterStatusMonitor clusterStatusMonitor) { + private boolean validateOfflineInstancesLimit(final ClusterDataCache cache, + final HelixManager manager) { int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed(); if (maxOfflineInstancesAllowed >= 0) { int offlineCount = cache.getAllInstances().size() - cache.getEnabledLiveInstances().size(); @@ -190,11 +197,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { LogUtil.logError(logger, _eventId, "Failed to put cluster " + cache.getClusterName() + " into maintenance mode, HelixManager is not set!"); } - if (!cache.isTaskCache()) { - updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, errMsg); - } + return false; } } + return true; } private boolean computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache, http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index e3a504b..667b254 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -187,9 +187,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { cache.getStateModelDef(idealState.getStateModelDefRef()); clusterStatusMonitor .setResourceStatus(view, cache.getIdealState(view.getResourceName()), - stateModelDef); - clusterStatusMonitor - .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount); + stateModelDef, totalPendingMessageCount); monitoringResources.add(resourceName); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index c4d11d6..915a90f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -45,6 +45,7 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.monitoring.mbeans.ResourceMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +138,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); + List<String> failedResources = new ArrayList<>(); // Priority is applied in assignment computation because higher priority by looping in order of // decreasing priority @@ -170,8 +172,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } catch (HelixException ex) { LogUtil.logInfo(logger, _eventId, "Failed to calculate intermediate partition states for resource " + resourceName, ex); + failedResources.add(resourceName); } } + + if (clusterStatusMonitor != null) { + clusterStatusMonitor.setResourceRebalanceStates(failedResources, + ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); + clusterStatusMonitor + .setResourceRebalanceStates(output.resourceSet(), ResourceMonitor.RebalanceStatus.NORMAL); + } + return output; } @@ -237,6 +248,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { + " mode due to an instance being assigned more replicas/partitions than " + "the limit."); } + + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); + if (clusterStatusMonitor != null) { + clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource), + ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); + } + throw new HelixException(errMsg); } instancePartitionCounts.put(instance, partitionCount); http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index f870ddc..803bd3c 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -19,9 +19,10 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collection; @@ -33,10 +34,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -61,7 +62,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { static final String RESOURCE_STATUS_KEY = "ResourceStatus"; public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus"; public static final String CLUSTER_DN_KEY = "cluster"; - static final String RESOURCE_DN_KEY = "resourceName"; + public static final String RESOURCE_DN_KEY = "resourceName"; static final String INSTANCE_DN_KEY = "instanceName"; static final String MESSAGE_QUEUE_DN_KEY = "messageQueue"; static final String WORKFLOW_TYPE_DN_KEY = "workflowType"; @@ -160,6 +161,16 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { this._rebalanceFailure = isFailure; } + public void setResourceRebalanceStates(Collection<String> resources, + ResourceMonitor.RebalanceStatus state) { + for (String resource : resources) { + ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource); + if (resourceMonitor != null) { + resourceMonitor.setRebalanceState(state); + } + } + } + @Override public long getMaxMessageQueueSizeGauge() { long maxQueueSize = 0; @@ -421,11 +432,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { public void retainResourceMonitor(Set<String> resourceNames) { Set<String> resourcesToRemove = new HashSet<>(); synchronized (this) { + resourceNames.retainAll(_resourceMbeanMap.keySet()); resourcesToRemove.addAll(_resourceMbeanMap.keySet()); } resourcesToRemove.removeAll(resourceNames); try { + registerResources(resourceNames); + } catch (JMException e) { + LOG.error(String.format("Could not register beans for the following resources: %s", + Joiner.on(',').join(resourceNames)), e); + } + + try { unregisterResources(resourcesToRemove); } catch (MalformedObjectNameException e) { LOG.error(String.format("Could not unregister beans for the following resources: %s", @@ -433,12 +452,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } } - public void setResourceStatus(ExternalView externalView, IdealState idealState, StateModelDefinition stateModelDef) { + public void setResourceStatus(ExternalView externalView, IdealState idealState, + StateModelDefinition stateModelDef, int messageCount) { try { ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(externalView.getId()); if (resourceMonitor != null) { - resourceMonitor.updateResource(externalView, idealState, stateModelDef); + resourceMonitor.updateResourceState(externalView, idealState, stateModelDef); + resourceMonitor.updatePendingStateTransitionMessages(messageCount); } } catch (Exception e) { LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e); @@ -461,24 +482,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName); if (resourceMonitor != null) { - resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions, + resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions, numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions, numLoadRebalanceThrottledPartitions); } } - public synchronized void updatePendingMessages(String resourceName, int messageCount) { - try { - ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName); - - if (resourceMonitor != null) { - resourceMonitor.updatePendingStateTransitionMessages(messageCount); - } - } catch (Exception e) { - LOG.error("Fail to update resource pending messages, resource: " + resourceName, e); - } - } - private ResourceMonitor getOrCreateResourceMonitor(String resourceName) { try { if (!_resourceMbeanMap.containsKey(resourceName)) { @@ -487,7 +496,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { String beanName = getResourceBeanName(resourceName); ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName)); - bean.register(); _resourceMbeanMap.put(resourceName, bean); } } @@ -663,6 +671,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { _instanceMbeanMap.keySet().removeAll(instances); } + private synchronized void registerResources(Collection<String> resources) throws JMException { + for (String resourceName : resources) { + ResourceMonitor monitor = _resourceMbeanMap.get(resourceName); + if (monitor != null) { + monitor.register(); + } + } + } + private synchronized void unregisterResources(Collection<String> resources) throws MalformedObjectNameException { for (String resourceName : resources) { ResourceMonitor monitor = _resourceMbeanMap.get(resourceName); @@ -729,6 +746,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } } + // For test only protected ResourceMonitor getResourceMonitor(String resourceName) { return _resourceMbeanMap.get(resourceName); } http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index c3dd242..fb9a779 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -41,6 +41,13 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; public class ResourceMonitor extends DynamicMBeanProvider { + public enum RebalanceStatus { + UNKNOWN, + NORMAL, + BEST_POSSIBLE_STATE_CAL_FAILED, + INTERMEDIATE_STATE_CAL_FAILED + } + // Gauges private SimpleDynamicMetric<Long> _numOfPartitions; private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView; @@ -67,6 +74,8 @@ public class ResourceMonitor extends DynamicMBeanProvider { private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge; private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge; + private SimpleDynamicMetric<String> _rebalanceState; + private String _tag = ClusterStatusMonitor.DEFAULT_TAG; private long _lastResetTime; private final String _resourceName; @@ -96,6 +105,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge); attributeList.add(_totalMessageReceived); attributeList.add(_numPendingStateTransitions); + attributeList.add(_rebalanceState); doRegister(attributeList, _initObjectName); return this; } @@ -146,6 +156,8 @@ public class ResourceMonitor extends DynamicMBeanProvider { _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L); _successfulTopStateHandoffDurationCounter = new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L); + + _rebalanceState = new SimpleDynamicMetric<>("RebalanceStatus", RebalanceStatus.UNKNOWN.name()); } @Override @@ -214,7 +226,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { return _clusterName + " " + _resourceName; } - public void updateResource(ExternalView externalView, IdealState idealState, + public void updateResourceState(ExternalView externalView, IdealState idealState, StateModelDefinition stateModelDef) { if (externalView == null) { _logger.warn("External view is null"); @@ -229,7 +241,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { } } - resetGauges(); + resetResourceStateGauges(); if (idealState == null) { _logger.warn("ideal state is null for {}", _resourceName); @@ -319,20 +331,13 @@ public class ResourceMonitor extends DynamicMBeanProvider { } } - private void resetGauges() { + private void resetResourceStateGauges() { _numOfErrorPartitions.updateValue(0L); _numNonTopStatePartitions.updateValue(0L); _externalViewIdealStateDiff.updateValue(0L); _numOfPartitionsInExternalView.updateValue(0L); - - // The following gauges are computed each call to updateResource by way of looping so need to be reset. _numLessMinActiveReplicaPartitions.updateValue(0L); _numLessReplicaPartitions.updateValue(0L); - _numPendingRecoveryRebalancePartitions.updateValue(0L); - _numPendingLoadRebalancePartitions.updateValue(0L); - _numRecoveryRebalanceThrottledPartitions.updateValue(0L); - _numLoadRebalanceThrottledPartitions.updateValue(0L); - _numPendingStateTransitions.updateValue(0L); } public void updatePendingStateTransitionMessages(int messageCount) { @@ -367,7 +372,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { } } - public void updateRebalancerStat(long numPendingRecoveryRebalancePartitions, + public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions, long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions, long numLoadRebalanceThrottledPartitions) { _numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions); @@ -376,6 +381,10 @@ public class ResourceMonitor extends DynamicMBeanProvider { _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions); } + public void setRebalanceState(RebalanceStatus state) { + _rebalanceState.updateValue(state.name()); + } + public long getExternalViewPartitionGauge() { return _numOfPartitionsInExternalView.getValue(); } @@ -408,6 +417,10 @@ public class ResourceMonitor extends DynamicMBeanProvider { return _numPendingStateTransitions.getValue(); } + public String getRebalanceState() { + return _rebalanceState.getValue(); + } + public void resetMaxTopStateHandoffGauge() { if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) { _maxSinglePartitionTopStateHandoffDuration.updateValue(0L); http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java index 988ba9b..fbbb9e6 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java @@ -51,14 +51,15 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr * @param domain the MBean domain name * @param keyValuePairs the MBean object name components */ - protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, String description, String domain, String... keyValuePairs) throws JMException { if (_objectName != null) { - throw new HelixException( - "Mbean has been registered before. Please create new object for new registration."); + _logger.warn("Mbean has been registered before. Please create new object for new registration."); + return false; } updateAttributtInfos(dynamicMetrics, description); _objectName = MBeanRegistrar.register(this, domain, keyValuePairs); + return true; } /** @@ -68,19 +69,20 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr * @param description the MBean description * @param objectName the proposed MBean ObjectName */ - protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, String description, ObjectName objectName) throws JMException { if (_objectName != null) { - throw new HelixException( - "Mbean has been registered before. Please create new object for new registration."); + _logger.warn("Mbean has been registered before. Please create new object for new registration."); + return false; } updateAttributtInfos(dynamicMetrics, description); _objectName = MBeanRegistrar.register(this, objectName); + return true; } - protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, + protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics, ObjectName objectName) throws JMException { - doRegister(dynamicMetrics, null, objectName); + return doRegister(dynamicMetrics, null, objectName); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java index e732c85..7defef4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java @@ -19,18 +19,19 @@ package org.apache.helix.integration; * under the License. */ +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Set; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; + import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; @@ -42,6 +43,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.InstanceConfig; import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.mbeans.ResourceMonitor; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -50,6 +52,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY; +import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.RESOURCE_DN_KEY; import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure; public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { @@ -91,6 +94,9 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); errorNodeKey = accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name()); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); } @BeforeMethod @@ -99,8 +105,8 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { accessor.removeProperty(errorNodeKey); } - @Test (enabled = false) - public void testParticipantUnavailable() { + @Test + public void testParticipantUnavailable() throws Exception { _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name()); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); @@ -119,6 +125,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { // Verify there is no rebalance error logged Assert.assertNull(accessor.getProperty(errorNodeKey)); checkRebalanceFailureGauge(false); + checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb); // kill nodes, so rebalance cannot be done for (int i = 0; i < NODE_NR; i++) { @@ -126,8 +133,10 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { } // Verify the rebalance error caused by no node available - Assert.assertNotNull(pollForError(accessor, errorNodeKey)); + pollForError(accessor, errorNodeKey); checkRebalanceFailureGauge(true); + checkResourceBestPossibleCalFailureState( + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb); // clean up _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); @@ -138,10 +147,20 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { } } - @Test (enabled = false) - public void testTagSetIncorrect() { + @Test (dependsOnMethods = "testParticipantUnavailable") + public void testTagSetIncorrect() throws Exception { _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name()); + ZkHelixClusterVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(new HashSet<>(Collections.singleton(testDb))).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Verify there is no rebalance error logged + Assert.assertNull(accessor.getProperty(errorNodeKey)); + checkRebalanceFailureGauge(false); + checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb); + // set expected instance tag IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb); @@ -150,15 +169,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); // Verify there is rebalance error logged - Assert.assertNotNull(pollForError(accessor, errorNodeKey)); + pollForError(accessor, errorNodeKey); checkRebalanceFailureGauge(true); + checkResourceBestPossibleCalFailureState( + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb); // clean up _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); } - @Test (enabled = false) - public void testWithDomainId() throws InterruptedException { + @Test (dependsOnMethods = "testTagSetIncorrect") + public void testWithDomainId() throws Exception { int replicas = 2; ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); // 1. disable all participants except one node, then set domain Id @@ -192,14 +213,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { // Verify there is no rebalance error logged Assert.assertNull(accessor.getProperty(errorNodeKey)); checkRebalanceFailureGauge(false); + checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb); // 2. enable the rest nodes with no domain Id for (int i = replicas; i < NODE_NR; i++) { setInstanceEnable(_participants[i].getInstanceName(), true, configAccessor); } // Verify there is rebalance error logged - Assert.assertNotNull(pollForError(accessor, errorNodeKey)); + pollForError(accessor, errorNodeKey); checkRebalanceFailureGauge(true); + checkResourceBestPossibleCalFailureState( + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb); // 3. reset all nodes domain Id to be correct setting for (int i = replicas; i < NODE_NR; i++) { @@ -211,6 +235,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { // Verify that rebalance error state is removed checkRebalanceFailureGauge(false); + checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb); // clean up _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); @@ -223,6 +248,14 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName)); } + private ObjectName getResourceMbeanName(String clusterName, String resourceName) + throws MalformedObjectNameException { + String resourceBeanName = + String.format("%s=%s,%s=%s", CLUSTER_DN_KEY, clusterName, RESOURCE_DN_KEY, resourceName); + return new ObjectName( + String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName)); + } + private void setDomainId(String instanceName, ConfigAccessor configAccessor) { String domain = String.format("Rack=%s, Instance=%s", instanceName, instanceName); InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName); @@ -237,30 +270,50 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig); } - private void checkRebalanceFailureGauge(boolean expectFailure) { - try { - Long value = (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge"); - Assert.assertNotNull(value); - Assert.assertEquals(value == 1, expectFailure); - } catch (Exception e) { - Assert.fail("Failed to get attribute!"); - } + private void checkRebalanceFailureGauge(final boolean expectFailure) throws Exception { + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override + public boolean verify() { + try { + Long value = + (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge"); + return value != null && (value == 1) == expectFailure; + } catch (Exception e) { + return false; + } + } + }, 5000); Assert.assertTrue(result); } - private HelixProperty pollForError(HelixDataAccessor accessor, PropertyKey key) { - final int POLL_TIMEOUT = 5000; - final int POLL_INTERVAL = 100; - HelixProperty property = accessor.getProperty(key); - int timeWaited = 0; - while (property == null && timeWaited < POLL_TIMEOUT) { - try { - Thread.sleep(POLL_INTERVAL); - } catch (InterruptedException e) { - return null; + private void checkResourceBestPossibleCalFailureState( + final ResourceMonitor.RebalanceStatus expectedState, final String resourceName) + throws Exception { + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override + public boolean verify() { + try { + String state = (String) _server + .getAttribute(getResourceMbeanName(CLUSTER_NAME, resourceName), "RebalanceStatus"); + return state != null && state.equals(expectedState.name()); + } catch (Exception e) { + return false; + } } - timeWaited += POLL_INTERVAL; - property = accessor.getProperty(key); - } - return property; + }, 5000); + Assert.assertTrue(result); + } + + private void pollForError(final HelixDataAccessor accessor, final PropertyKey key) + throws Exception { + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override + public boolean verify() { + /* TODO re-enable this check when we start recording rebalance error again + return accessor.getProperty(key) != null; + */ + return true; + } + }, 5000); + Assert.assertTrue(result); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java index c89505b..0de2fe3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java @@ -19,16 +19,17 @@ package org.apache.helix.integration.rebalancer; * under the License. */ +import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; +import javax.management.*; + import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; @@ -42,10 +43,12 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY; +import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure; public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit extends ZkTestBase { @@ -105,12 +108,19 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit Assert.assertTrue(_clusterVerifier.verifyByPolling()); } + @AfterMethod + public void afterMethod() { + cleanupRebalanceError(); + } + @Test public void testWithDisabledInstancesLimit() throws Exception { MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance()); Assert.assertNull(maintenanceSignal); + checkForRebalanceError(false); + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); // disable instance @@ -133,6 +143,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit Assert.assertNotNull(maintenanceSignal); Assert.assertNotNull(maintenanceSignal.getReason()); + checkForRebalanceError(true); + for (i = 2; i < 2 + _maxOfflineInstancesAllowed + 1; i++) { instance = _participants.get(i).getInstanceName(); admin.enableInstance(CLUSTER_NAME, instance, true); @@ -146,6 +158,9 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance()); Assert.assertNull(maintenanceSignal); + + checkForRebalanceError(false); + int i; for (i = 2; i < 2 + _maxOfflineInstancesAllowed; i++) { _participants.get(i).syncStop(); @@ -163,19 +178,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit Assert.assertNotNull(maintenanceSignal); Assert.assertNotNull(maintenanceSignal.getReason()); - // TODO re-enable the check after HELIX-631 is fixed - /* - // Verify there is no rebalance error logged - ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); - PropertyKey errorNodeKey = - accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name()); - Assert.assertNotNull(accessor.getProperty(errorNodeKey)); - - Long value = - (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge"); - Assert.assertNotNull(value); - Assert.assertTrue(value.longValue() > 0); - */ + // Verify there is rebalance error logged + checkForRebalanceError(true); } @AfterClass @@ -193,12 +197,33 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); } - private ObjectName getMbeanName(String clusterName) + private void checkForRebalanceError(boolean expectError) + throws MalformedObjectNameException, AttributeNotFoundException, MBeanException, + ReflectionException, InstanceNotFoundException, IOException { + /* TODO re-enable this check when we start recording rebalance error again + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + PropertyKey errorNodeKey = + accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name()); + Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, expectError); + */ + + Long value = + (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), "RebalanceFailureGauge"); + Assert.assertEquals(value != null && value.longValue() > 0, expectError); + } + + private void cleanupRebalanceError() { + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + PropertyKey errorNodeKey = + accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name()); + accessor.removeProperty(errorNodeKey); + } + + private ObjectName getClusterMbeanName(String clusterName) throws MalformedObjectNameException { String clusterBeanName = String.format("%s=%s", CLUSTER_DN_KEY, clusterName); return new ObjectName( String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName)); } - } http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java index 143d325..b2daba6 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java @@ -205,7 +205,7 @@ public class TestClusterStatusMonitor { StateModelDefinition stateModelDef = BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(); - monitor.setResourceStatus(externalView, idealState, stateModelDef); + monitor.setResourceStatus(externalView, idealState, stateModelDef, 0); Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition); Assert.assertEquals(monitor.getTotalResourceGauge(), 1); @@ -238,7 +238,7 @@ public class TestClusterStatusMonitor { externalView.setStateMap(partition, map); } - monitor.setResourceStatus(externalView, idealState, stateModelDef); + monitor.setResourceStatus(externalView, idealState, stateModelDef, 0); Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition); Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica); Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); @@ -266,7 +266,7 @@ public class TestClusterStatusMonitor { externalView.setStateMap(partition, map); } - monitor.setResourceStatus(externalView, idealState, stateModelDef); + monitor.setResourceStatus(externalView, idealState, stateModelDef, 0); Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition); Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState); @@ -291,7 +291,7 @@ public class TestClusterStatusMonitor { externalView.setStateMap(partition, map); } - monitor.setResourceStatus(externalView, idealState, stateModelDef); + monitor.setResourceStatus(externalView, idealState, stateModelDef, 0); Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition); Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); @@ -313,7 +313,7 @@ public class TestClusterStatusMonitor { // test pending state transition message report and read messageCount = new Random().nextInt(numPartition) + 1; - monitor.updatePendingMessages(testDB, messageCount); + monitor.setResourceStatus(externalView, idealState, stateModelDef, messageCount); Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java index fbbf4b8..0c8ebe7 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java @@ -88,9 +88,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase { Assert.assertTrue(clusterVerifier.verifyByPolling()); // Verify the bean was created for TestDB0, but not for TestDB1. - Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName))); - Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", clusterName))); - Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", clusterName))); + pollForMBeanExistance(getMbeanName("TestDB0", clusterName), true); + pollForMBeanExistance(getMbeanName("TestDB1", clusterName), false); + pollForMBeanExistance(getMbeanName("TestDB2", clusterName), true); controller.syncStop(); for (MockParticipantManager participant : participants) { @@ -100,6 +100,17 @@ public class TestDisableResourceMbean extends ZkUnitTestBase { System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } + private void pollForMBeanExistance(final ObjectName objectName, boolean expectation) + throws Exception { + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override + public boolean verify() throws Exception { + return _mbeanServer.isRegistered(objectName); + } + }, 3000); + Assert.assertEquals(result, expectation); + } + private ObjectName getMbeanName(String resourceName, String clusterName) throws MalformedObjectNameException { String clusterBeanName = http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java index 5310ded..713fd65 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java @@ -62,7 +62,7 @@ public class TestResourceMonitor { StateModelDefinition stateModelDef = BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(); - monitor.updateResource(externalView, idealState, stateModelDef); + monitor.updateResourceState(externalView, idealState, stateModelDef); Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0); Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); @@ -88,7 +88,7 @@ public class TestResourceMonitor { externalView.setStateMap(partition, map); } - monitor.updateResource(externalView, idealState, stateModelDef); + monitor.updateResourceState(externalView, idealState, stateModelDef); Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount); Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount); @@ -119,7 +119,7 @@ public class TestResourceMonitor { externalView.setStateMap(partition, map); } - monitor.updateResource(externalView, idealState, stateModelDef); + monitor.updateResourceState(externalView, idealState, stateModelDef); Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica); Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); @@ -151,7 +151,7 @@ public class TestResourceMonitor { externalView.setStateMap(partition, map); } - monitor.updateResource(externalView, idealState, stateModelDef); + monitor.updateResourceState(externalView, idealState, stateModelDef); Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica); Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); @@ -181,7 +181,7 @@ public class TestResourceMonitor { externalView.setStateMap(partition, map); } - monitor.updateResource(externalView, idealState, stateModelDef); + monitor.updateResourceState(externalView, idealState, stateModelDef); Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState); Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); @@ -196,6 +196,17 @@ public class TestResourceMonitor { int messageCount = new Random().nextInt(_partitions) + 1; monitor.updatePendingStateTransitionMessages(messageCount); Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount); + + Assert + .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL); + Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED); + Assert.assertEquals(monitor.getRebalanceState(), + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); + Assert.assertEquals(monitor.getRebalanceState(), + ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name()); } /**