This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new bd1d28c52 [apache/helix] -- Fixes #2646 (Part-1), Optimize
WagedInstanceCapacity Calculation to improve Helix Controller Pipeline (#2649)
bd1d28c52 is described below
commit bd1d28c52126c27bbc3a3c156a18bee13f8079c4
Author: Himanshu Kandwal <[email protected]>
AuthorDate: Mon Oct 23 11:09:14 2023 -0700
[apache/helix] -- Fixes #2646 (Part-1), Optimize WagedInstanceCapacity
Calculation to improve Helix Controller Pipeline (#2649)
WagedInstanceCapacity data-structure is computed every time during a
pipeline run and in case of large clusters, this computation takes ~80% of
total time. Hence, in this PR we are skipping certain cluster events when this
cache should not be rebuild, and improving the overall runtime.
---
.../ResourceControllerDataProvider.java | 8 +
.../rebalancer/waged/WagedInstanceCapacity.java | 6 +-
.../waged/WagedResourceWeightsProvider.java | 5 +-
.../stages/CurrentStateComputationStage.java | 68 ++++-
.../waged/TestWagedInstanceCapacity.java | 180 ++++++++++++
...ntStateComputationStageForHandlingCapacity.java | 306 +++++++++++++++++++++
.../controller/TestPipelinePerformance.java | 146 ++++++++++
7 files changed, 706 insertions(+), 13 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index efd80ff54..9ca6de4f7 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -513,4 +513,12 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
return _wagedInstanceCapacity.checkAndReduceInstanceCapacity(instance,
resourceName, partition,
partitionWeightMap);
}
+
+ /**
+ * Getter for cached waged instance capacity map.
+ * @return
+ */
+ public WagedInstanceCapacity getWagedInstanceCapacity() {
+ return _wagedInstanceCapacity;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index cd19c301c..d8380a058 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -175,6 +175,10 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
return _instanceCapacityMap.get(instanceName);
}
+ public Map<String, Map<String, Set<String>>> getAllocatedPartitionsMap() {
+ return _allocatedPartitionsMap;
+ }
+
@Override
public boolean isInstanceCapacityAvailable(String instance, Map<String,
Integer> partitionCapacity) {
Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
@@ -215,7 +219,7 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
}
_allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
.computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
- LOG.info("Reduced capacity for instance: " + instance + " for resource: "
+ resName
+ LOG.debug("Reduced capacity for instance: " + instance + " for resource: "
+ resName
+ " for partition: " + partitionName);
return true;
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
index a058da10e..4aa2e90b0 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
@@ -40,10 +40,7 @@ public class WagedResourceWeightsProvider {
public Map<String, Integer> getPartitionWeights(String resourceName, String
partition) {
@Nullable ResourceConfig resourceConfig =
_clusterData.getResourceConfig(resourceName);
- IdealState is = _clusterData.getIdealState(resourceName);
- ResourceConfig mergedResourceConfig =
- ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);
- return WagedRebalanceUtil.fetchCapacityUsage(partition,
mergedResourceConfig, _clusterData.getClusterConfig());
+ return WagedRebalanceUtil.fetchCapacityUsage(partition, resourceConfig,
_clusterData.getClusterConfig());
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 51abca36b..6fbb2b63e 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -113,14 +114,7 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
- // TODO: we only need to compute when there are resource using Waged. We
should
- // do this as perf improvement in future.
- WagedInstanceCapacity capacityProvider = new
WagedInstanceCapacity(dataProvider);
- WagedResourceWeightsProvider weightProvider = new
WagedResourceWeightsProvider(dataProvider);
-
- // Process the currentState and update the available instance capacity.
- capacityProvider.process(dataProvider, currentStateOutput, resourceMap,
weightProvider);
- dataProvider.setWagedCapacityProviders(capacityProvider, weightProvider);
+ handleResourceCapacityCalculation(event,
(ResourceControllerDataProvider) cache, currentStateOutput);
}
}
@@ -339,4 +333,62 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
return null;
});
}
+
+ void handleResourceCapacityCalculation(ClusterEvent event,
ResourceControllerDataProvider cache,
+ CurrentStateOutput currentStateOutput) {
+ Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES.name());
+ if (skipCapacityCalculation(cache, resourceMap, event)) {
+ return;
+ }
+
+ Map<String, Resource> wagedEnabledResourceMap = resourceMap.entrySet()
+ .parallelStream()
+ .filter(entry ->
WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (wagedEnabledResourceMap.isEmpty()) {
+ return;
+ }
+
+ // Phase 1: Rebuild Always
+ WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache);
+ WagedResourceWeightsProvider weightProvider = new
WagedResourceWeightsProvider(cache);
+
+ capacityProvider.process(cache, currentStateOutput,
wagedEnabledResourceMap, weightProvider);
+ cache.setWagedCapacityProviders(capacityProvider, weightProvider);
+ }
+
+ /**
+ * Function that checks whether we should return early, without any action
on the capacity map or not.
+ *
+ * @param cache it is the cluster level cache for the resources.
+ * @param event the cluster event that is undergoing processing.
+ * @return true, of the condition evaluate to true and no action is needed,
else false.
+ */
+ static boolean skipCapacityCalculation(ResourceControllerDataProvider cache,
Map<String, Resource> resourceMap,
+ ClusterEvent event) {
+ if (resourceMap == null || resourceMap.isEmpty()) {
+ return true;
+ }
+
+ if (Objects.isNull(cache.getWagedInstanceCapacity())) {
+ return false;
+ }
+
+ // TODO: We will change this logic to handle each event-type differently
and depending on the resource type.
+ switch (event.getEventType()) {
+ case ClusterConfigChange:
+ case InstanceConfigChange:
+ case ResourceConfigChange:
+ case ControllerChange:
+ case LiveInstanceChange:
+ case CurrentStateChange:
+ case PeriodicalRebalance:
+ case MessageChange:
+ return false;
+ default:
+ return true;
+ }
+ }
+
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
new file mode 100644
index 000000000..82521787d
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedInstanceCapacity.java
@@ -0,0 +1,180 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.RandomUtils;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestWagedInstanceCapacity {
+
+ private static final int INSTANCE_COUNT = 3;
+ private static final int RESOURCE_COUNT = 1;
+ private static final int PARTITION_COUNT = 3;
+ private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU",
"PARTCOUNT", "DISK");
+ private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
+ ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);
+
+ private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
+ ImmutableMap.of("CU", 40, "PARTCOUNT", 1, "DISK", 1);
+
+ private ResourceControllerDataProvider _clusterData;
+ private Map<String, Resource> _resourceMap;
+ private CurrentStateOutput _currentStateOutput;
+ private WagedInstanceCapacity _wagedInstanceCapacity;
+
+ @BeforeMethod
+ public void setUp() {
+ // prepare cluster data
+ _clusterData = new ResourceControllerDataProvider();
+ Map<String, InstanceConfig> instanceConfigMap =
generateInstanceCapacityConfigs();
+ _clusterData.setInstanceConfigMap(instanceConfigMap);
+
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
+ _clusterData.setIdealStates(generateIdealStates());
+
+ ClusterConfig clusterConfig = new ClusterConfig("test");
+ clusterConfig.setTopologyAwareEnabled(false);
+ clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
+ _clusterData.setClusterConfig(clusterConfig);
+
+ // prepare current state output
+ _resourceMap = generateResourceMap();
+ _currentStateOutput = populateCurrentStatesForResources(_resourceMap,
instanceConfigMap.keySet());
+
+ // prepare instance of waged-instance capacity
+ _wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
+ }
+
+ @Test
+ public void testProcessCurrentState() {
+ Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT",
10, "DISK", 100);
+
+ Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+ "instance-0", "resource-0", "partition-0", partCapMap));
+
+ Map<String, Integer> instanceAvailableCapacity =
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+ Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
+ }
+
+ @Test
+ public void testProcessCurrentStateWithUnableToAssignPart() {
+ Map<String, Integer> partCapMap = ImmutableMap.of("CU", 110, "PARTCOUNT",
10, "DISK", 100);
+
+ Assert.assertFalse(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+ "instance-0", "resource-0", "partition-0", partCapMap));
+
+ Map<String, Integer> instanceAvailableCapacity =
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+ Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(100));
+ }
+
+ @Test
+ public void testProcessCurrentStateWithDoubleCharge() {
+ Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT",
10, "DISK", 100);
+
+ Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+ "instance-0", "resource-0", "partition-0", partCapMap));
+
+ // charge again
+ Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
+ "instance-0", "resource-0", "partition-0", partCapMap));
+
+ Map<String, Integer> instanceAvailableCapacity =
_wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
+ Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
+ }
+
+ // -- static helpers
+ private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+
+ for (int i = 0; i < INSTANCE_COUNT; i ++) {
+ String instanceName = "instance-" + i;
+ InstanceConfig config = new InstanceConfig(instanceName);
+ config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
+ instanceConfigMap.put(instanceName, config);
+ }
+
+ return instanceConfigMap;
+ }
+
+ private Map<String, ResourceConfig>
generateResourcePartitionCapacityConfigs() {
+ Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();
+
+ try {
+ Map<String, Map<String, Integer>> partitionsCapacityMap = new
HashMap<>();
+ partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);
+
+ for (String resourceName : getResourceNames()) {
+ ResourceConfig config = new ResourceConfig(resourceName);
+ config.setPartitionCapacityMap(partitionsCapacityMap);
+ resourceConfigMap.put(resourceName, config);
+ }
+ } catch(IOException e) {
+ throw new RuntimeException("error while setting partition capacity map");
+ }
+ return resourceConfigMap;
+ }
+
+ private List<IdealState> generateIdealStates() {
+ return getResourceNames().stream()
+ .map(resourceName -> {
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+ return idealState;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static CurrentStateOutput populateCurrentStatesForResources(
+ Map<String, Resource> resourceMap, Set<String> instanceNames) {
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+ resourceMap.forEach((resourceName, resource) ->
+ resource.getPartitions().forEach(partition -> {
+ int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
+ int idx = 0;
+ for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); idx
++) {
+ currentStateOutput.setCurrentState(
+ resourceName, partition, it.next(), (idx == masterPartIdx) ?
"MASTER" : "SLAVE");
+ }
+ }));
+
+ return currentStateOutput;
+ }
+
+ private static Map<String, Resource> generateResourceMap() {
+ return getResourceNames().stream()
+ .map(resourceName -> {
+ Resource resource = new Resource(resourceName);
+ IntStream.range(0, PARTITION_COUNT)
+ .mapToObj(i -> "partition-" + i)
+ .forEach(resource::addPartition);
+ return resource;
+ })
+ .collect(Collectors.toMap(Resource::getResourceName,
Function.identity()));
+ }
+
+ private static List<String> getResourceNames() {
+ return IntStream.range(0, RESOURCE_COUNT)
+ .mapToObj(i -> "resource-" + i)
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
new file mode 100644
index 000000000..cc0c3196f
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStageForHandlingCapacity.java
@@ -0,0 +1,306 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.helix.PropertyKey.Builder;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestCurrentStateComputationStageForHandlingCapacity {
+
+ private static final int INSTANCE_COUNT = 3;
+ private static final int RESOURCE_COUNT = 2;
+ private static final int PARTITION_COUNT = 3;
+ private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU",
"PARTCOUNT", "DISK");
+ private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
+ ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);
+
+ private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
+ ImmutableMap.of("CU", 10, "PARTCOUNT", 1, "DISK", 1);
+
+ private ResourceControllerDataProvider _clusterData;
+ private Map<String, Resource> _resourceMap;
+ private CurrentStateOutput _currentStateOutput;
+ private WagedInstanceCapacity _wagedInstanceCapacity;
+ private CurrentStateComputationStage _currentStateComputationStage;
+
+ @BeforeMethod
+ public void setUp() {
+ // prepare cluster data
+ _clusterData = Mockito.spy(new ResourceControllerDataProvider());
+ Map<String, InstanceConfig> instanceConfigMap =
generateInstanceCapacityConfigs();
+ _clusterData.setInstanceConfigMap(instanceConfigMap);
+
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
+ _clusterData.setIdealStates(generateIdealStates());
+
Mockito.doReturn(ImmutableMap.of()).when(_clusterData).getAllInstancesMessages();
+
+ ClusterConfig clusterConfig = new ClusterConfig("test");
+ clusterConfig.setTopologyAwareEnabled(false);
+ clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
+ _clusterData.setClusterConfig(clusterConfig);
+
+ // prepare current state output
+ _resourceMap = generateResourceMap();
+ _currentStateOutput = populateCurrentStatesForResources(_resourceMap,
instanceConfigMap.keySet());
+
+ // prepare instance of waged-instance capacity
+ _wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
+ _currentStateComputationStage = new CurrentStateComputationStage();
+ }
+
+ @Test
+ public void testProcessEventWithNoWagedResources() {
+ // We create ideal states with all WAGED enabled.
+ Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+ // remove one WAGED resource from all resources.
+ idealStates.forEach((resourceName, idealState) -> {
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ });
+
+ ClusterEvent clusterEvent = new ClusterEvent("test",
ClusterEventType.CurrentStateChange);
+ clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
_clusterData);
+ clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent,
_clusterData, _currentStateOutput);
+
+ // validate that we did not compute and set the capacity map.
+ Assert.assertNull(_clusterData.getWagedInstanceCapacity());
+ }
+
+ @Test
+ public void testProcessEventWithSomeWagedResources() {
+ // We create ideal states with all WAGED enabled.
+ Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+ // remove WAGED from one resource.
+ IdealState firstIdealState =
idealStates.entrySet().iterator().next().getValue();
+ firstIdealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
firstIdealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ int totalIdealStates = idealStates.size();
+
+ ClusterEvent clusterEvent = new ClusterEvent("test",
ClusterEventType.CurrentStateChange);
+ clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
_clusterData);
+ clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent,
_clusterData, _currentStateOutput);
+
+ // validate that we did not compute and set the capacity map.
+ WagedInstanceCapacity wagedInstanceCapacity =
_clusterData.getWagedInstanceCapacity();
+ Assert.assertNotNull(wagedInstanceCapacity);
+
+ Map<String, Map<String, Set<String>>> allocatedPartitionsMap =
wagedInstanceCapacity.getAllocatedPartitionsMap();
+ Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
+ .map(Map::keySet)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+
+ Assert.assertEquals(resourcesAllocated.size(), totalIdealStates - 1);
+ }
+
+ @Test
+ public void testProcessEventWithAllWagedResources() {
+ // We create ideal states with all WAGED enabled.
+ Map<String, IdealState> idealStates = _clusterData.getIdealStates();
+
+ ClusterEvent clusterEvent = new ClusterEvent("test",
ClusterEventType.CurrentStateChange);
+ clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
_clusterData);
+ clusterEvent.addAttribute(AttributeName.RESOURCES.name(), _resourceMap);
+
+
_currentStateComputationStage.handleResourceCapacityCalculation(clusterEvent,
_clusterData, _currentStateOutput);
+
+ // validate that we did not compute and set the capacity map.
+ WagedInstanceCapacity wagedInstanceCapacity =
_clusterData.getWagedInstanceCapacity();
+ Assert.assertNotNull(wagedInstanceCapacity);
+
+ Map<String, Map<String, Set<String>>> allocatedPartitionsMap =
wagedInstanceCapacity.getAllocatedPartitionsMap();
+ Set<String> resourcesAllocated = allocatedPartitionsMap.values().stream()
+ .map(Map::keySet)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+
+ Assert.assertEquals(resourcesAllocated.size(), idealStates.size());
+ }
+
+ @Test
+ public void testSkipCapacityCalculation() {
+ // case: when resource-map is null
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ new ResourceControllerDataProvider(), null, new
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+ // case: when resource-map is empty
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ new ResourceControllerDataProvider(), ImmutableMap.of(), new
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+ // case: when instance capacity is null
+ Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+ new ResourceControllerDataProvider(), _resourceMap, new
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+
+ // case: when event is of no-op
+ ResourceControllerDataProvider dataProvider =
Mockito.mock(ResourceControllerDataProvider.class);
+ WagedInstanceCapacity instanceCapacity =
Mockito.mock(WagedInstanceCapacity.class);
+
Mockito.when(dataProvider.getWagedInstanceCapacity()).thenReturn(instanceCapacity);
+
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.CustomizedStateChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.CustomizedViewChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.CustomizeStateConfigChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.ExternalViewChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.IdealStateChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.OnDemandRebalance)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.Resume)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.RetryRebalance)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.StateVerifier)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.TargetExternalViewChange)));
+ Assert.assertTrue(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.TaskCurrentStateChange)));
+
+ Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.LiveInstanceChange)));
+ Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.CurrentStateChange)));
+ Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.MessageChange)));
+ Assert.assertFalse(CurrentStateComputationStage.skipCapacityCalculation(
+ dataProvider, _resourceMap, new
ClusterEvent(ClusterEventType.PeriodicalRebalance)));
+ }
+
+ // -- static helpers
+ private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+
+ for (int i = 0; i < INSTANCE_COUNT; i ++) {
+ String instanceName = "instance-" + i;
+ InstanceConfig config = new InstanceConfig(instanceName);
+ config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
+ instanceConfigMap.put(instanceName, config);
+ }
+
+ return instanceConfigMap;
+ }
+
+ private Map<String, ResourceConfig>
generateResourcePartitionCapacityConfigs() {
+ Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();
+
+ try {
+ Map<String, Map<String, Integer>> partitionsCapacityMap = new
HashMap<>();
+ partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);
+
+ for (String resourceName : getResourceNames()) {
+ ResourceConfig config = new ResourceConfig(resourceName);
+ config.setPartitionCapacityMap(partitionsCapacityMap);
+ resourceConfigMap.put(resourceName, config);
+ }
+ } catch(IOException e) {
+ throw new RuntimeException("error while setting partition capacity map");
+ }
+ return resourceConfigMap;
+ }
+
+ private List<IdealState> generateIdealStates() {
+ return getResourceNames().stream()
+ .map(resourceName -> {
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+ return idealState;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static CurrentStateOutput populateCurrentStatesForResources(
+ Map<String, Resource> resourceMap, Set<String> instanceNames) {
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+ resourceMap.forEach((resourceName, resource) ->
+ resource.getPartitions().forEach(partition -> {
+ int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
+ int idx = 0;
+ for (Iterator<String> it = instanceNames.iterator(); it.hasNext();
idx ++) {
+ currentStateOutput.setCurrentState(
+ resourceName, partition, it.next(), (idx == masterPartIdx) ?
"MASTER" : "SLAVE");
+ }
+ }));
+
+ return currentStateOutput;
+ }
+
+ private static Map<String, Resource> generateResourceMap() {
+ return getResourceNames().stream()
+ .map(resourceName -> {
+ Resource resource = new Resource(resourceName);
+ IntStream.range(0, PARTITION_COUNT)
+ .mapToObj(i -> "partition-" + i)
+ .forEach(resource::addPartition);
+ return resource;
+ })
+ .collect(Collectors.toMap(Resource::getResourceName,
Function.identity()));
+ }
+
+ private static List<String> getResourceNames() {
+ return IntStream.range(0, RESOURCE_COUNT)
+ .mapToObj(i -> "resource-" + i)
+ .collect(Collectors.toList());
+ }
+
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
new file mode 100644
index 000000000..d5932d7b6
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestPipelinePerformance.java
@@ -0,0 +1,146 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.management.ObjectName;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.model.BuiltInStateModelDefinitions.*;
+
+
+public class TestPipelinePerformance extends ZkTestBase {
+
+ private static final int NUM_NODE = 6;
+ private static final int START_PORT = 12918;
+ private static final int REPLICA = 3;
+ private static final int PARTITIONS = 20;
+ private static final String RESOURCE_NAME = "Test_WAGED_Resource";
+
+ private String _clusterName;
+ private ClusterControllerManager _controller;
+ private ZkHelixClusterVerifier _clusterVerifier;
+ private List<MockParticipantManager> _participants = new ArrayList<>();
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _clusterName = String.format("CLUSTER_%s_%s",
RandomStringUtils.randomAlphabetic(5), getShortClassName());
+ _gSetupTool.addCluster(_clusterName, true);
+
+ createResourceWithDelayedRebalance(_clusterName, RESOURCE_NAME,
MasterSlave.name(), PARTITIONS, REPLICA, REPLICA, -1);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(_clusterName, instanceName);
+
+ // start participants
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
_clusterName, instanceName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,
"controller_0");
+ _controller.syncStart();
+
+ enablePersistBestPossibleAssignment(_gZkClient, _clusterName, true);
+
+ _clusterVerifier = new
StrictMatchExternalViewVerifier.Builder(_clusterName)
+ .setZkClient(_gZkClient)
+ .setDeactivatedNodeAwareness(true)
+ .setResources(Sets.newHashSet(RESOURCE_NAME))
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+
+ // Set test instance capacity and partition weights
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(_clusterName,
_baseAccessor);
+ ClusterConfig clusterConfig =
dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+ String testCapacityKey = "TestCapacityKey";
+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
100));
+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
1));
+ dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(),
clusterConfig);
+ }
+
+ @AfterClass
+ private void windDownTest() {
+ _controller.syncStop();
+ _participants.forEach(MockParticipantManager::syncStop);
+ deleteCluster(_clusterName);
+ }
+
+ @Test(enabled = false)
+ public void testWagedInstanceCapacityCalculationPerformance() throws
Exception {
+ ObjectName currentStateMbeanObjectName = new ObjectName(
+
String.format("ClusterStatus:cluster=%s,eventName=ClusterEvent,phaseName=CurrentStateComputationStage",
+ _clusterName));
+
+ Assert.assertTrue(_server.isRegistered(currentStateMbeanObjectName));
+ long initialValue = (Long)
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+
+
/************************************************************************************************************
+ * Round 1:
+ * Enable WAGED on existing resource (this will trigger computation of
WagedInstanceCapacity for first time)
+
************************************************************************************************************/
+ IdealState currentIdealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(_clusterName,
RESOURCE_NAME);
+ currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(_clusterName,
RESOURCE_NAME, currentIdealState);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ long withComputationValue = (Long)
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+ long durationWithComputation = withComputationValue - initialValue;
+
+
/************************************************************************************************************
+ * Round 2:
+ * Perform ideal state change, this wil not cause re-computation of
WagedInstanceCapacity
+
************************************************************************************************************/
+ currentIdealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(_clusterName,
RESOURCE_NAME);
+ currentIdealState.setInstanceGroupTag("Test");
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(_clusterName,
RESOURCE_NAME, currentIdealState);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ long withoutComputationValue = (Long)
_server.getAttribute(currentStateMbeanObjectName, "TotalDurationCounter");
+ long durationWithoutComputation = withoutComputationValue -
durationWithComputation;
+ double pctDecrease = (durationWithComputation -
durationWithoutComputation) * 100 / durationWithComputation;
+ System.out.println(String.format("durationWithComputation: %s,
durationWithoutComputation: %s, pctDecrease: %s",
+ durationWithComputation, durationWithoutComputation, pctDecrease));
+
+ Assert.assertTrue(durationWithComputation > durationWithoutComputation);
+ Assert.assertTrue(pctDecrease > 75.0);
+ }
+
+}