This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7a610  Fix MaxCapacityUsageGauge value not updated (#1464)
5d7a610 is described below

commit 5d7a610cdd9b733af8b38158cdfe04d562081713
Author: Huizhi Lu <[email protected]>
AuthorDate: Tue Oct 13 11:21:53 2020 -0700

    Fix MaxCapacityUsageGauge value not updated (#1464)
    
    MaxCapacityUsageGauge value not updated because the resourcesToMonitor map 
is empty.
    
    This commit fixes the bug and also adds an integration test to protect the 
metrics reporting logic.
---
 .../rebalancer/util/WagedValidationUtil.java       | 14 ++++
 .../rebalancer/waged/WagedRebalancer.java          | 10 +--
 .../stages/BestPossibleStateCalcStage.java         | 12 ++--
 .../stages/CurrentStateComputationStage.java       |  5 +-
 .../waged/TestWagedRebalancerMetrics.java          | 84 ++++++++++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  4 ++
 6 files changed, 115 insertions(+), 14 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
index 9742cb1..ac62ed5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 
@@ -88,4 +90,16 @@ public class WagedValidationUtil {
     }
     return partitionCapacity;
   }
+
+  /**
+   * Checks whether or not a resource has enabled WAGED rebalancer.
+   *
+   * @param idealState {@code IdealState} of the resource being checked.
+   * @return {@code true} if WAGED is enabled; otherwise, {@code false}.
+   */
+  public static boolean isWagedEnabled(IdealState idealState) {
+    return idealState != null
+        && 
idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+        && 
WagedRebalancer.class.getName().equals(idealState.getRebalancerClassName());
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 89ecc47..4451c1d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -44,6 +44,7 @@ import 
org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.StatefulRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import 
org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -598,11 +599,10 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
 
   private void validateInput(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap) throws HelixRebalanceException {
-    Set<String> nonCompatibleResources = 
resourceMap.entrySet().stream().filter(resourceEntry -> {
-      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
-      return is == null || 
!is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
-          || 
!WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
-    }).map(Map.Entry::getKey).collect(Collectors.toSet());
+    Set<String> nonCompatibleResources = resourceMap.keySet().stream()
+        .filter(resource -> 
!WagedValidationUtil.isWagedEnabled(clusterData.getIdealState(resource)))
+        .collect(Collectors.toSet());
+
     if (!nonCompatibleResources.isEmpty()) {
       throw new HelixRebalanceException(String.format(
           "Input contains invalid resource(s) that cannot be rebalanced by the 
WAGED rebalancer. %s",
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 029091c..7b346ec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -41,6 +41,7 @@ import 
org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -253,13 +254,10 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     }
 
     // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the 
WAGED rebalancer
-    Map<String, Resource> wagedRebalancedResourceMap =
-        resourceMap.entrySet().stream().filter(resourceEntry -> {
-          IdealState is = cache.getIdealState(resourceEntry.getKey());
-          return is != null && 
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
-              && 
WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
-        }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
-            resourceEntry -> resourceEntry.getValue()));
+    Map<String, Resource> wagedRebalancedResourceMap = 
resourceMap.entrySet().stream()
+        .filter(resourceEntry ->
+            
WagedValidationUtil.isWagedEnabled(cache.getIdealState(resourceEntry.getKey())))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6c9f245..4af7e27 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -34,6 +34,7 @@ import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -283,10 +284,10 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
       try {
         // ResourceToRebalance map also has resources from current states.
-        // Only use the resources in ideal states to parse all replicas.
+        // Only use the resources in ideal states that enable WAGED to parse 
all replicas.
         Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
         Map<String, Resource> resourceToMonitorMap = 
resourceMap.entrySet().stream()
-            .filter(idealStateMap::containsKey)
+            .filter(entry -> 
WagedValidationUtil.isWagedEnabled(idealStateMap.get(entry.getKey())))
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
         Map<String, ResourceAssignment> currentStateAssignment =
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index b02c30b..bdc677b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,19 +29,31 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
+import javax.management.AttributeNotFoundException;
 import javax.management.JMException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.TestHelper;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
 import 
org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import 
org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.mock.MockManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.InstanceMonitor;
 import org.apache.helix.monitoring.metrics.MetricCollector;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
@@ -144,6 +157,77 @@ public class TestWagedRebalancerMetrics extends 
AbstractTestClusterModel {
         RatioMetric.class).getLastEmittedMetricValue() == 0.0d, 
TestHelper.WAIT_DURATION));
   }
 
+  /*
+   * Integration test for WAGED instance capacity metrics.
+   */
+  @Test
+  public void testInstanceCapacityMetrics() throws Exception {
+    final String clusterName = TestHelper.getTestMethodName();
+    final ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+
+    ResourceControllerDataProvider cache = setupClusterDataCache();
+    Map<String, Resource> resourceMap = 
cache.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+
+    event.addAttribute(AttributeName.helixmanager.name(), new MockManager());
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
+
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ReadClusterDataStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    rebalancePipeline.handle(event);
+
+    final MBeanServerConnection mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+    for (String instance : _instances) {
+      String instanceBeanName = String.format("%s=%s,instanceName=%s",
+          ClusterStatusMonitor.CLUSTER_DN_KEY, clusterName, instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertTrue(TestHelper
+          .verify(() -> mBeanServer.isRegistered(instanceObjectName),
+              TestHelper.WAIT_DURATION));
+
+      // Verify capacity gauge metrics
+      for (Map.Entry<String, Integer> capacityEntry : 
_capacityDataMap.entrySet()) {
+        String capacityKey = capacityEntry.getKey();
+        String attributeName = capacityKey + "Gauge";
+        Assert.assertTrue(TestHelper.verify(() -> {
+          try {
+            return (long) mBeanServer.getAttribute(instanceObjectName, 
attributeName)
+                == _capacityDataMap.get(capacityKey);
+          } catch (AttributeNotFoundException e) {
+            return false;
+          }
+        }, TestHelper.WAIT_DURATION), "Instance capacity gauge metric is not 
found or incorrect!");
+        Assert.assertEquals((long) 
mBeanServer.getAttribute(instanceObjectName, attributeName),
+            (long) _capacityDataMap.get(capacityKey));
+      }
+
+      // Verify MaxCapacityUsageGauge
+      Assert.assertTrue(TestHelper.verify(() -> {
+        try {
+          double actualMaxUsage = (double) 
mBeanServer.getAttribute(instanceObjectName,
+              
InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName());
+          // The values are manually calculated from the capacity configs, to 
make the code simple.
+          double expectedMaxUsage = instance.equals(_testInstanceId) ? 0.4 : 
0.0;
+
+          return Math.abs(actualMaxUsage - expectedMaxUsage) < 0.000001d;
+        } catch (AttributeNotFoundException e) {
+          return false;
+        }
+      }, TestHelper.WAIT_DURATION), "MaxCapacityUsageGauge is not found or 
incorrect");
+    }
+  }
+
   @Override
   protected ResourceControllerDataProvider setupClusterDataCache() throws 
IOException {
     ResourceControllerDataProvider testCache = super.setupClusterDataCache();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 6d8b861..8887e87 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -137,6 +137,8 @@ public abstract class AbstractTestClusterModel {
     
when(testCurrentStateResource1.getStateModelDefRef()).thenReturn("MasterSlave");
     
when(testCurrentStateResource1.getState(_partitionNames.get(0))).thenReturn("MASTER");
     
when(testCurrentStateResource1.getState(_partitionNames.get(1))).thenReturn("SLAVE");
+    when(testCurrentStateResource1.getSessionId()).thenReturn(_sessionId);
+
     CurrentState testCurrentStateResource2 = Mockito.mock(CurrentState.class);
     Map<String, String> partitionStateMap2 = new HashMap<>();
     partitionStateMap2.put(_partitionNames.get(2), "MASTER");
@@ -146,6 +148,8 @@ public abstract class AbstractTestClusterModel {
     
when(testCurrentStateResource2.getStateModelDefRef()).thenReturn("MasterSlave");
     
when(testCurrentStateResource2.getState(_partitionNames.get(2))).thenReturn("MASTER");
     
when(testCurrentStateResource2.getState(_partitionNames.get(3))).thenReturn("SLAVE");
+    when(testCurrentStateResource2.getSessionId()).thenReturn(_sessionId);
+
     Map<String, CurrentState> currentStatemap = new HashMap<>();
     currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
     currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);

Reply via email to