This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch helix-stickiness-rebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6b520e01d6bf4049fb29c60e1551fafc2fabf831 Author: frankmu <muteng...@gmail.com> AuthorDate: Thu Jul 25 14:36:56 2024 -0700 Create condition based rebalancer (#2846) Create condition based rebalancer --- .../ResourceControllerDataProvider.java | 26 +++ .../rebalancer/ConditionBasedRebalancer.java | 215 +++++++++++++++++++++ .../condition/ConfigChangeBasedCondition.java | 11 ++ .../rebalancer/condition/RebalanceCondition.java | 19 ++ .../condition/RebalanceConditionsBuilder.java | 22 +++ .../condition/TopologyChangeBasedCondition.java | 11 ++ 6 files changed, 304 insertions(+) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 021aab6a8..cdfcb0f24 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -73,6 +73,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { // a map from customized state type to customized view cache private final Map<String, CustomizedViewCache> _customizedViewCacheMap; + // maintain a cache of ideal state (preference list + best possible assignment) which will be managed ondemand in rebalancer + private final Map<String, ZNRecord> _ondemandIdealStateCache; + // maintain a cache of bestPossible assignment across pipeline runs // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache. private Map<String, ResourceAssignment> _resourceAssignmentCache; @@ -149,6 +152,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { _refreshedChangeTypes = ConcurrentHashMap.newKeySet(); _customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes); _customizedViewCacheMap = new HashMap<>(); + _ondemandIdealStateCache = new HashMap<>(); } public synchronized void refresh(HelixDataAccessor accessor) { @@ -388,6 +392,28 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { return _lastTopStateLocationMap; } + /** + * Get cached ideal state (preference list + best possible assignment) for a resource + * @param resource + * @return + */ + public ZNRecord getCachedOndemandIdealState(String resource) { + return _ondemandIdealStateCache.get(resource); + } + + /** + * Cache ideal state (preference list + best possible assignment) for a resource + * @param resource + * @return + */ + public void setCachedOndemandIdealState(String resource, ZNRecord idealState) { + _ondemandIdealStateCache.put(resource, idealState); + } + + public void clearCachedOndemandIdealStates() { + _ondemandIdealStateCache.clear(); + } + /** * Get cached resourceAssignment (bestPossible mapping) for a resource * @param resource diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java new file mode 100644 index 000000000..158699a97 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -0,0 +1,215 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.condition.RebalanceCondition; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code ConditionBasedRebalancer} class extends the {@link AbstractRebalancer} and + * perform the rebalance operation based on specific list of conditions defined by the + * {@link RebalanceCondition} interface. + */ +public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { + private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class); + private final List<RebalanceCondition> _rebalanceConditions; + + public ConditionBasedRebalancer() { + this._rebalanceConditions = new ArrayList<>(); + } + + public ConditionBasedRebalancer(List<RebalanceCondition> rebalanceConditions) { + this._rebalanceConditions = rebalanceConditions; + } + + /** + * Compute new Ideal State iff all conditions are met, otherwise just return from cached Ideal State + * + * @param resourceName the name of the resource for which to compute the new ideal state. + * @param currentIdealState the current {@link IdealState} of the resource. + * @param currentStateOutput the current state output, containing the actual states of the + * partitions. + * @param clusterData the {@link ResourceControllerDataProvider} instance providing + * additional data required for the computation. + * @return the newly computed {@link IdealState} for the resource. + */ + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { + if (!this._rebalanceConditions.stream() + .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) { + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + if (cachedIdealState != null) { + return new IdealState(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn( + "Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State", + resourceName); + } + + LOG.info("Computing IdealState for " + resourceName); + + List<String> partitions = getStablePartitionList(clusterData, currentIdealState); + String stateModelName = currentIdealState.getStateModelDefRef(); + StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName); + if (stateModelDef == null) { + LOG.error("State Model Definition null for resource: " + resourceName); + throw new HelixException("State Model Definition null for resource: " + resourceName); + } + Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances(); + int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size()); + + LinkedHashMap<String, Integer> stateCountMap = + stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas); + List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet()); + List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances()); + assignableNodes.removeAll(clusterData.getDisabledInstances()); + assignableLiveNodes.retainAll(assignableNodes); + + Map<String, Map<String, String>> currentMapping = + currentMapping(currentStateOutput, resourceName, partitions, stateCountMap); + + // If there are nodes tagged with resource name, use only those nodes + Set<String> taggedNodes = new HashSet<String>(); + Set<String> taggedLiveNodes = new HashSet<String>(); + if (currentIdealState.getInstanceGroupTag() != null) { + for (String instanceName : assignableNodes) { + if (clusterData.getAssignableInstanceConfigMap().get(instanceName) + .containsTag(currentIdealState.getInstanceGroupTag())) { + taggedNodes.add(instanceName); + if (assignableLiveInstance.containsKey(instanceName)) { + taggedLiveNodes.add(instanceName); + } + } + } + if (!taggedLiveNodes.isEmpty()) { + // live nodes exist that have this tag + if (LOG.isInfoEnabled()) { + LOG.info( + "found the following participants with tag " + currentIdealState.getInstanceGroupTag() + + " for " + resourceName + ": " + taggedLiveNodes); + } + } else if (taggedNodes.isEmpty()) { + // no live nodes and no configured nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no configured participants have this tag"); + } else { + // configured nodes have this tag, but no live nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no live participants have this tag"); + } + assignableNodes = new ArrayList<>(taggedNodes); + assignableLiveNodes = new ArrayList<>(taggedLiveNodes); + } + + // sort node lists to ensure consistent preferred assignments + Collections.sort(assignableNodes); + Collections.sort(assignableLiveNodes); + + int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); + _rebalanceStrategy = + getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName, + stateCountMap, maxPartition); + ZNRecord newMapping = + _rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes, + currentMapping, clusterData); + + if (LOG.isDebugEnabled()) { + LOG.debug("currentMapping: {}", currentMapping); + LOG.debug("stateCountMap: {}", stateCountMap); + LOG.debug("assignableLiveNodes: {}", assignableLiveNodes); + LOG.debug("assignableNodes: {}", assignableNodes); + LOG.debug("maxPartition: {}", maxPartition); + LOG.debug("newMapping: {}", newMapping); + } + + IdealState newIdealState = new IdealState(resourceName); + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + newIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + newIdealState.getRecord().setListFields(newMapping.getListFields()); + + clusterData.setCachedOndemandIdealState(resourceName, newIdealState.getRecord()); + + return newIdealState; + } + + /** + * Compute new assignment iff all conditions are met, otherwise just return from cached assignment + * + * @param cache the {@link ResourceControllerDataProvider} instance providing + * metadata and state information about the cluster. + * @param idealState the {@link IdealState} representing the current ideal state. + * @param resource the {@link Resource} for which to compute the best possible partition + * state. + * @param currentStateOutput the {@link CurrentStateOutput} containing the current states of the + * partitions. + * @return the {@link ResourceAssignment} representing the best possible state assignment for the + * partitions of the resource. + */ + @Override + public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache, + IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) { + ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName()); + if (!this._rebalanceConditions.stream() + .allMatch(condition -> condition.shouldPerformRebalance(cache))) { + if (cachedIdealState != null && cachedIdealState.getMapFields() != null) { + ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); + for (Partition partition : resource.getPartitions()) { + partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition)); + } + return new ResourceAssignment(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment", + resource.getResourceName()); + } + + LOG.info("Computing BestPossibleMapping for " + resource.getResourceName()); + + // TODO: Change the logic to apply different assignment strategy + ResourceAssignment assignment = + super.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput); + // Cache the assignment so no need to recompute the result next time + cachedIdealState.setMapFields(assignment.getRecord().getMapFields()); + cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState); + + return assignment; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java new file mode 100644 index 000000000..3089b4342 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java @@ -0,0 +1,11 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +public class ConfigChangeBasedCondition implements RebalanceCondition { + @Override + public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) { + // TODO: implement the condition check for config change + return false; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java new file mode 100644 index 000000000..eafff4ef0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java @@ -0,0 +1,19 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +/** + * The {@code RebalanceCondition} interface defines a condition under which a rebalance operation + * should be performed. Implementations of this interface provide specific criteria to determine + * whether a rebalance is necessary based on the current state of the system. + */ +public interface RebalanceCondition { + /** + * Determines whether a rebalance should be performed based on the provided + * {@link ResourceControllerDataProvider} cache data. + * + * @param cache the {@code ResourceControllerDataProvider} cached data of the resources being managed. + * @return {@code true} if the rebalance should be performed, {@code false} otherwise. + */ + boolean shouldPerformRebalance(ResourceControllerDataProvider cache); +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java new file mode 100644 index 000000000..a5b11e8d6 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java @@ -0,0 +1,22 @@ +package org.apache.helix.controller.rebalancer.condition; + +import java.util.ArrayList; +import java.util.List; + +public class RebalanceConditionsBuilder { + private final List<RebalanceCondition> _rebalanceConditions = new ArrayList<>(); + + public RebalanceConditionsBuilder withConfigChangeBasedCondition() { + _rebalanceConditions.add(new ConfigChangeBasedCondition()); + return this; + } + + public RebalanceConditionsBuilder withTopologyChangeBasedCondition() { + _rebalanceConditions.add(new TopologyChangeBasedCondition()); + return this; + } + + public List<RebalanceCondition> build() { + return _rebalanceConditions; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java new file mode 100644 index 000000000..99356a6d0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java @@ -0,0 +1,11 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +public class TopologyChangeBasedCondition implements RebalanceCondition { + @Override + public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) { + // TODO: implement the condition check for topology change + return false; + } +}