This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new bd1d28c52 [apache/helix] -- Fixes #2646 (Part-1), Optimize 
WagedInstanceCapacity Calculation to improve Helix Controller Pipeline (#2649)
bd1d28c52 is described below

commit bd1d28c52126c27bbc3a3c156a18bee13f8079c4
Author: Himanshu Kandwal <[email protected]>
AuthorDate: Mon Oct 23 11:09:14 2023 -0700

    [apache/helix] -- Fixes #2646 (Part-1), Optimize WagedInstanceCapacity 
Calculation to improve Helix Controller Pipeline (#2649)
    
    WagedInstanceCapacity data-structure is computed every time during a 
pipeline run and in case of large clusters, this computation takes ~80% of 
total time. Hence, in this PR we are skipping certain cluster events when this 
cache should not be rebuild, and improving the overall runtime.
---
 .../ResourceControllerDataProvider.java            |   8 +
 .../rebalancer/waged/WagedInstanceCapacity.java    |   6 +-
 .../waged/WagedResourceWeightsProvider.java        |   5 +-
 .../stages/CurrentStateComputationStage.java       |  68 ++++-
 .../waged/TestWagedInstanceCapacity.java           | 180 ++++++++++++
 ...ntStateComputationStageForHandlingCapacity.java | 306 +++++++++++++++++++++
 .../controller/TestPipelinePerformance.java        | 146 ++++++++++
 7 files changed, 706 insertions(+), 13 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index efd80ff54..9ca6de4f7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -513,4 +513,12 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     return _wagedInstanceCapacity.checkAndReduceInstanceCapacity(instance, 
resourceName, partition,
         partitionWeightMap);
   }
+
+  /**
+   * Getter for cached waged instance capacity map.
+   * @return
+   */
+  public WagedInstanceCapacity getWagedInstanceCapacity() {
+    return _wagedInstanceCapacity;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index cd19c301c..d8380a058 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -175,6 +175,10 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
     return _instanceCapacityMap.get(instanceName);
   }
 
+  public Map<String, Map<String, Set<String>>> getAllocatedPartitionsMap() {
+    return _allocatedPartitionsMap;
+  }
+
   @Override
   public boolean isInstanceCapacityAvailable(String instance, Map<String, 
Integer> partitionCapacity) {
     Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
@@ -215,7 +219,7 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
     }
     _allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
         .computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
-    LOG.info("Reduced capacity for instance: " + instance + " for resource: " 
+ resName
+    LOG.debug("Reduced capacity for instance: " + instance + " for resource: " 
+ resName
         + " for partition: " + partitionName);
     return true;
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
index a058da10e..4aa2e90b0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
@@ -40,10 +40,7 @@ public class WagedResourceWeightsProvider {
 
   public Map<String, Integer> getPartitionWeights(String resourceName, String 
partition) {
     @Nullable ResourceConfig resourceConfig = 
_clusterData.getResourceConfig(resourceName);
-    IdealState is = _clusterData.getIdealState(resourceName);
-    ResourceConfig mergedResourceConfig =
-        ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);
 
-    return WagedRebalanceUtil.fetchCapacityUsage(partition, 
mergedResourceConfig, _clusterData.getClusterConfig());
+    return WagedRebalanceUtil.fetchCapacityUsage(partition, resourceConfig, 
_clusterData.getClusterConfig());
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 51abca36b..6fbb2b63e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
@@ -113,14 +114,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
       
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
           clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
 
-      // TODO: we only need to compute when there are resource using Waged. We 
should
-      // do this as perf improvement in future.
-      WagedInstanceCapacity capacityProvider = new 
WagedInstanceCapacity(dataProvider);
-      WagedResourceWeightsProvider weightProvider = new 
WagedResourceWeightsProvider(dataProvider);
-
-      // Process the currentState and update the available instance capacity.
-      capacityProvider.process(dataProvider, currentStateOutput, resourceMap, 
weightProvider);
-      dataProvider.setWagedCapacityProviders(capacityProvider, weightProvider);
+      handleResourceCapacityCalculation(event, 
(ResourceControllerDataProvider) cache, currentStateOutput);
     }
   }
 
@@ -339,4 +333,62 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
       return null;
     });
   }
+
+  void handleResourceCapacityCalculation(ClusterEvent event, 
ResourceControllerDataProvider cache,
+      CurrentStateOutput currentStateOutput) {
+    Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
+    if (skipCapacityCalculation(cache, resourceMap, event)) {
+      return;
+    }
+
+    Map<String, Resource> wagedEnabledResourceMap = resourceMap.entrySet()
+        .parallelStream()
+        .filter(entry -> 
WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    if (wagedEnabledResourceMap.isEmpty()) {
+      return;
+    }
+
+    // Phase 1: Rebuild Always
+    WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache);
+    WagedResourceWeightsProvider weightProvider = new 
WagedResourceWeightsProvider(cache);
+
+    capacityProvider.process(cache, currentStateOutput, 
wagedEnabledResourceMap, weightProvider);
+    cache.setWagedCapacityProviders(capacityProvider, weightProvider);
+  }
+
+  /**
+   * Function that checks whether we should return early, without any action 
on the capacity map or not.
+   *
+   * @param cache it is the cluster level cache for the resources.
+   * @param event the cluster event that is undergoing processing.
+   * @return true, of the condition evaluate to true and no action is needed, 
else false.
+   */
+  static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, 
Map<String, Resource> resourceMap,
+      ClusterEvent event) {
+    if (resourceMap == null || resourceMap.isEmpty()) {
+      return true;
+    }
+
+    if (Objects.isNull(cache.getWagedInstanceCapacity())) {
+      return false;
+    }
+
+    // TODO: We will change this logic to handle each event-type differently 
and depending on the resource type.
+    switch (event.getEventType()) {
+      case ClusterConfigChange:
+      case InstanceConfigChange:
+      case ResourceConfigChange:
+      case ControllerChange:
+      case LiveInstanceChange:
+      case CurrentStateChange:
+      case PeriodicalRebalance:
+      case MessageChange:
+        return false;
+      default:
+        return true;
+    }
+  }
+
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
new file mode 100644
index 000000000..82521787d
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
@@ -0,0 +1,180 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.RandomUtils;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestWagedInstanceCapacity {
+
+  private static final int INSTANCE_COUNT = 3;
+  private static final int RESOURCE_COUNT = 1;
+  private static final int PARTITION_COUNT = 3;
+  private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU", 
"PARTCOUNT", "DISK");
+  private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
+      ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);
+
+  private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
+      ImmutableMap.of("CU", 40, "PARTCOUNT", 1, "DISK", 1);
+
+  private ResourceControllerDataProvider _clusterData;
+  private Map<String, Resource> _resourceMap;
+  private CurrentStateOutput _currentStateOutput;
+  private WagedInstanceCapacity _wagedInstanceCapacity;
+
+  @BeforeMethod
+  public void setUp() {
+    // prepare cluster data
+    _clusterData = new ResourceControllerDataProvider();
+    Map<String, InstanceConfig> instanceConfigMap = 
generateInstanceCapacityConfigs();
+    _clusterData.setInstanceConfigMap(instanceConfigMap);
+    
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
+    _clusterData.setIdealStates(generateIdealStates());
+
+    ClusterConfig clusterConfig = new ClusterConfig("test");
+    clusterConfig.setTopologyAwareEnabled(false);
+    clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
+    _clusterData.setClusterConfig(clusterConfig);
+
+    // prepare current state output
+    _resourceMap = generateResourceMap();
+    _currentStateOutput = populateCurrentStatesForResources(_resourceMap, 
instanceConfigMap.keySet());
+
+    // prepare instance of waged-instance capacity
+    _wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
+  }
+
+  @Test
+  public void testProcessCurrentState() {
+    Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 
10, "DISK", 100);
+
+    Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+        "instance-0", "resource-0", "partition-0", partCapMap));
+
+    Map<String, Integer> instanceAvailableCapacity = 
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+    Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
+  }
+
+  @Test
+  public void testProcessCurrentStateWithUnableToAssignPart() {
+    Map<String, Integer> partCapMap = ImmutableMap.of("CU", 110, "PARTCOUNT", 
10, "DISK", 100);
+
+    Assert.assertFalse(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+        "instance-0", "resource-0", "partition-0", partCapMap));
+
+    Map<String, Integer> instanceAvailableCapacity = 
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+    Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(100));
+  }
+
+  @Test
+  public void testProcessCurrentStateWithDoubleCharge() {
+    Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 
10, "DISK", 100);
+
+    Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+        "instance-0", "resource-0", "partition-0", partCapMap));
+
+    // charge again
+    Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+        "instance-0", "resource-0", "partition-0", partCapMap));
+
+    Map<String, Integer> instanceAvailableCapacity = 
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+    Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
+  }
+
+  // -- static helpers
+  private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+
+    for (int i = 0; i < INSTANCE_COUNT; i ++) {
+      String instanceName = "instance-" + i;
+      InstanceConfig config = new InstanceConfig(instanceName);
+      config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
+      instanceConfigMap.put(instanceName, config);
+    }
+
+    return instanceConfigMap;
+  }
+
+  private Map<String, ResourceConfig> 
generateResourcePartitionCapacityConfigs() {
+    Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();
+
+    try {
+      Map<String, Map<String, Integer>> partitionsCapacityMap = new 
HashMap<>();
+      partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);
+
+      for (String resourceName : getResourceNames()) {
+        ResourceConfig config = new ResourceConfig(resourceName);
+        config.setPartitionCapacityMap(partitionsCapacityMap);
+        resourceConfigMap.put(resourceName, config);
+      }
+    } catch(IOException e) {
+      throw new RuntimeException("error while setting partition capacity map");
+    }
+    return resourceConfigMap;
+  }
+
+  private List<IdealState> generateIdealStates() {
+    return getResourceNames().stream()
+        .map(resourceName -> {
+          IdealState idealState = new IdealState(resourceName);
+          idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+          idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+          return idealState;
+        })
+        .collect(Collectors.toList());
+  }
+
+  private static CurrentStateOutput populateCurrentStatesForResources(
+      Map<String, Resource> resourceMap, Set<String> instanceNames) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+    resourceMap.forEach((resourceName, resource) ->
+      resource.getPartitions().forEach(partition -> {
+        int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
+        int idx = 0;
+        for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); idx 
++) {
+          currentStateOutput.setCurrentState(
+              resourceName, partition, it.next(), (idx == masterPartIdx) ? 
"MASTER" : "SLAVE");
+        }
+      }));
+
+    return currentStateOutput;
+  }
+
+  private static Map<String, Resource> generateResourceMap() {
+    return getResourceNames().stream()
+        .map(resourceName -> {
+          Resource resource = new Resource(resourceName);
+          IntStream.range(0, PARTITION_COUNT)
+              .mapToObj(i -> "partition-" + i)
+              .forEach(resource::addPartition);
+          return resource;
+        })
+        .collect(Collectors.toMap(Resource::getResourceName, 
Function.identity()));
+  }
+
+  private static List<String> getResourceNames() {
+    return IntStream.range(0, RESOURCE_COUNT)
+        .mapToObj(i -> "resource-" + i)
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
new file mode 100644
index 000000000..cc0c3196f
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
@@ -0,0 +1,306 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.helix.PropertyKey.Builder;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestCurrentStateComputationStageForHandlingCapacity {
+
+  private static final int INSTANCE_COUNT = 3;
+  private static final int RESOURCE_COUNT = 2;
+  private static final int PARTITION_COUNT = 3;
+  private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU", 
"PARTCOUNT", "DISK");
+  private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
+      ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);
+
+  private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
+      ImmutableMap.of("CU", 10, "PARTCOUNT", 1, "DISK", 1);
+
+  private ResourceControllerDataProvider _clusterData;
+  private Map<String, Resource> _resourceMap;
+  private CurrentStateOutput _currentStateOutput;
+  private WagedInstanceCapacity _wagedInstanceCapacity;
+  private CurrentStateComputationStage _currentStateComputationStage;
+
+  @BeforeMethod
+  public void setUp() {
+    // prepare cluster data
+    _clusterData = Mockito.spy(new ResourceControllerDataProvider());
+    Map<String, InstanceConfig> instanceConfigMap = 
generateInstanceCapacityConfigs();
+    _clusterData.setInstanceConfigMap(instanceConfigMap);
+    
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
+    _clusterData.setIdealStates(generateIdealStates());
+    
Mockito.doReturn(ImmutableMap.of()).when(_clusterData).getAllInstancesMessages();
+
+    ClusterConfig clusterConfig = new ClusterConfig("test");
+    clusterConfig.setTopologyAwareEnabled(false);
+    clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
+    _clusterData.setClusterConfig(clusterConfig);
+
+    // prepare current state output
+    _resourceMap = generateResourceMap();
+    _currentStateOutput = populateCurrentStatesForResources(_resourceMap, 
instanceConfigMap.keySet());
+
+    // prepare instance of waged-instance capacity
+    _wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
+    _currentStateComputationStage = new CurrentStateComputationStage();
+  }
+
+  @Test
+  public void testProcessEventWithNoWagedResources() {
+    // We create ideal states with all WAGED enabled.
+    Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+    // remove one WAGED resource from all resources.
+    idealStates.forEach((resourceName, idealState) -> {
+      idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+      idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    });
+
+    ClusterEvent clusterEvent = new ClusterEvent("test", 
ClusterEventType.CurrentStateChange);
+    clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), 
_clusterData);
+    clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+    
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, 
_clusterData, _currentStateOutput);
+
+    // validate that we did not compute and set the capacity map.
+    Assert.assertNull(_clusterData.getWagedInstanceCapacity());
+  }
+
+  @Test
+  public void testProcessEventWithSomeWagedResources() {
+    // We create ideal states with all WAGED enabled.
+    Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+    // remove WAGED from one resource.
+    IdealState firstIdealState = 
idealStates.entrySet().iterator().next().getValue();
+    firstIdealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    
firstIdealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    int totalIdealStates = idealStates.size();
+
+    ClusterEvent clusterEvent = new ClusterEvent("test", 
ClusterEventType.CurrentStateChange);
+    clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), 
_clusterData);
+    clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+    
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, 
_clusterData, _currentStateOutput);
+
+    // validate that we did not compute and set the capacity map.
+    WagedInstanceCapacity wagedInstanceCapacity = 
_clusterData.getWagedInstanceCapacity();
+    Assert.assertNotNull(wagedInstanceCapacity);
+
+    Map<String, Map<String, Set<String>>> allocatedPartitionsMap = 
wagedInstanceCapacity.getAllocatedPartitionsMap();
+    Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
+        .map(Map::keySet)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+
+    Assert.assertEquals(resourcesAllocated.size(), totalIdealStates - 1);
+  }
+
+  @Test
+  public void testProcessEventWithAllWagedResources() {
+    // We create ideal states with all WAGED enabled.
+    Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+    ClusterEvent clusterEvent = new ClusterEvent("test", 
ClusterEventType.CurrentStateChange);
+    clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), 
_clusterData);
+    clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+    
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent, 
_clusterData, _currentStateOutput);
+
+    // validate that we did not compute and set the capacity map.
+    WagedInstanceCapacity wagedInstanceCapacity = 
_clusterData.getWagedInstanceCapacity();
+    Assert.assertNotNull(wagedInstanceCapacity);
+
+    Map<String, Map<String, Set<String>>> allocatedPartitionsMap = 
wagedInstanceCapacity.getAllocatedPartitionsMap();
+    Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
+        .map(Map::keySet)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+
+    Assert.assertEquals(resourcesAllocated.size(), idealStates.size());
+  }
+
+  @Test
+  public void testSkipCapacityCalculation() {
+    // case: when resource-map is null
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        new ResourceControllerDataProvider(), null, new 
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+    // case: when resource-map is empty
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        new ResourceControllerDataProvider(), ImmutableMap.of(), new 
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+    // case: when instance capacity is null
+    Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+        new ResourceControllerDataProvider(), _resourceMap, new 
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+    // case: when event is of no-op
+    ResourceControllerDataProvider dataProvider = 
Mockito.mock(ResourceControllerDataProvider.class);
+    WagedInstanceCapacity instanceCapacity = 
Mockito.mock(WagedInstanceCapacity.class);
+    
Mockito.when(dataProvider.getWagedInstanceCapacity()).thenReturn(instanceCapacity);
+
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.CustomizedStateChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.CustomizedViewChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.CustomizeStateConfigChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.ExternalViewChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.IdealStateChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.OnDemandRebalance)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.Resume)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.RetryRebalance)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.StateVerifier)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.TargetExternalViewChange)));
+    Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.TaskCurrentStateChange)));
+
+    Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+    Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.CurrentStateChange)));
+    Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.MessageChange)));
+    Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+        dataProvider, _resourceMap, new 
ClusterEvent(ClusterEventType.PeriodicalRebalance)));
+  }
+
+  // -- static helpers
+  private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+
+    for (int i = 0; i < INSTANCE_COUNT; i ++) {
+      String instanceName = "instance-" + i;
+      InstanceConfig config = new InstanceConfig(instanceName);
+      config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
+      instanceConfigMap.put(instanceName, config);
+    }
+
+    return instanceConfigMap;
+  }
+
+  private Map<String, ResourceConfig> 
generateResourcePartitionCapacityConfigs() {
+    Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();
+
+    try {
+      Map<String, Map<String, Integer>> partitionsCapacityMap = new 
HashMap<>();
+      partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);
+
+      for (String resourceName : getResourceNames()) {
+        ResourceConfig config = new ResourceConfig(resourceName);
+        config.setPartitionCapacityMap(partitionsCapacityMap);
+        resourceConfigMap.put(resourceName, config);
+      }
+    } catch(IOException e) {
+      throw new RuntimeException("error while setting partition capacity map");
+    }
+    return resourceConfigMap;
+  }
+
+  private List<IdealState> generateIdealStates() {
+    return getResourceNames().stream()
+        .map(resourceName -> {
+          IdealState idealState = new IdealState(resourceName);
+          idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+          idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+          return idealState;
+        })
+        .collect(Collectors.toList());
+  }
+
+  private static CurrentStateOutput populateCurrentStatesForResources(
+      Map<String, Resource> resourceMap, Set<String> instanceNames) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+    resourceMap.forEach((resourceName, resource) ->
+        resource.getPartitions().forEach(partition -> {
+          int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
+          int idx = 0;
+          for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); 
idx ++) {
+            currentStateOutput.setCurrentState(
+                resourceName, partition, it.next(), (idx == masterPartIdx) ? 
"MASTER" : "SLAVE");
+          }
+        }));
+
+    return currentStateOutput;
+  }
+
+  private static Map<String, Resource> generateResourceMap() {
+    return getResourceNames().stream()
+        .map(resourceName -> {
+          Resource resource = new Resource(resourceName);
+          IntStream.range(0, PARTITION_COUNT)
+              .mapToObj(i -> "partition-" + i)
+              .forEach(resource::addPartition);
+          return resource;
+        })
+        .collect(Collectors.toMap(Resource::getResourceName, 
Function.identity()));
+  }
+
+  private static List<String> getResourceNames() {
+    return IntStream.range(0, RESOURCE_COUNT)
+        .mapToObj(i -> "resource-" + i)
+        .collect(Collectors.toList());
+  }
+
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
new file mode 100644
index 000000000..d5932d7b6
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
@@ -0,0 +1,146 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.management.ObjectName;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.model.BuiltInStateModelDefinitions.*;
+
+
+public class TestPipelinePerformance extends ZkTestBase {
+
+  private static final int NUM_NODE = 6;
+  private static final int START_PORT = 12918;
+  private static final int REPLICA = 3;
+  private static final int PARTITIONS = 20;
+  private static final String RESOURCE_NAME = "Test_WAGED_Resource";
+
+  private String _clusterName;
+  private ClusterControllerManager _controller;
+  private ZkHelixClusterVerifier _clusterVerifier;
+  private List<MockParticipantManager> _participants = new ArrayList<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _clusterName = String.format("CLUSTER_%s_%s", 
RandomStringUtils.randomAlphabetic(5), getShortClassName());
+    _gSetupTool.addCluster(_clusterName, true);
+
+    createResourceWithDelayedRebalance(_clusterName, RESOURCE_NAME, 
MasterSlave.name(), PARTITIONS, REPLICA, REPLICA, -1);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(_clusterName, instanceName);
+
+      // start participants
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
_clusterName, instanceName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, 
"controller_0");
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, _clusterName, true);
+
+    _clusterVerifier = new 
StrictMatchExternalViewVerifier.Builder(_clusterName)
+        .setZkClient(_gZkClient)
+        .setDeactivatedNodeAwareness(true)
+        .setResources(Sets.newHashSet(RESOURCE_NAME))
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
+
+    // Set test instance capacity and partition weights
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(_clusterName, 
_baseAccessor);
+    ClusterConfig clusterConfig = 
dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    String testCapacityKey = "TestCapacityKey";
+    
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
 100));
+    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
 1));
+    dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), 
clusterConfig);
+  }
+
+  @AfterClass
+  private void windDownTest() {
+    _controller.syncStop();
+    _participants.forEach(MockParticipantManager::syncStop);
+    deleteCluster(_clusterName);
+  }
+
+  @Test(enabled = false)
+  public void testWagedInstanceCapacityCalculationPerformance() throws 
Exception {
+    ObjectName currentStateMbeanObjectName = new ObjectName(
+        
String.format("ClusterStatus:cluster=%s,eventName=ClusterEvent,phaseName=CurrentStateComputationStage",
+            _clusterName));
+
+    Assert.assertTrue(_server.isRegistered(currentStateMbeanObjectName));
+    long initialValue = (Long) 
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+
+    
/************************************************************************************************************
+     * Round 1:
+     * Enable WAGED on existing resource (this will trigger computation of 
WagedInstanceCapacity for first time)
+     
************************************************************************************************************/
+    IdealState currentIdealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(_clusterName, 
RESOURCE_NAME);
+    currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(_clusterName, 
RESOURCE_NAME, currentIdealState);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    long withComputationValue = (Long) 
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+    long durationWithComputation = withComputationValue - initialValue;
+
+    
/************************************************************************************************************
+     * Round 2:
+     * Perform ideal state change, this wil not cause re-computation of 
WagedInstanceCapacity
+     
************************************************************************************************************/
+    currentIdealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(_clusterName, 
RESOURCE_NAME);
+    currentIdealState.setInstanceGroupTag("Test");
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(_clusterName, 
RESOURCE_NAME, currentIdealState);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    long withoutComputationValue = (Long) 
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+    long durationWithoutComputation = withoutComputationValue - 
durationWithComputation;
+    double pctDecrease = (durationWithComputation - 
durationWithoutComputation) * 100 / durationWithComputation;
+    System.out.println(String.format("durationWithComputation: %s, 
durationWithoutComputation: %s, pctDecrease: %s",
+        durationWithComputation, durationWithoutComputation, pctDecrease));
+
+    Assert.assertTrue(durationWithComputation > durationWithoutComputation);
+    Assert.assertTrue(pctDecrease > 75.0);
+  }
+
+}


Reply via email to