This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-stickiness-rebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6b520e01d6bf4049fb29c60e1551fafc2fabf831
Author: frankmu <muteng...@gmail.com>
AuthorDate: Thu Jul 25 14:36:56 2024 -0700

    Create condition based rebalancer (#2846)
    
    Create condition based rebalancer
---
 .../ResourceControllerDataProvider.java            |  26 +++
 .../rebalancer/ConditionBasedRebalancer.java       | 215 +++++++++++++++++++++
 .../condition/ConfigChangeBasedCondition.java      |  11 ++
 .../rebalancer/condition/RebalanceCondition.java   |  19 ++
 .../condition/RebalanceConditionsBuilder.java      |  22 +++
 .../condition/TopologyChangeBasedCondition.java    |  11 ++
 6 files changed, 304 insertions(+)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 021aab6a8..cdfcb0f24 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -73,6 +73,9 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   // a map from customized state type to customized view cache
   private final Map<String, CustomizedViewCache> _customizedViewCacheMap;
 
+  // maintain a cache of ideal state (preference list + best possible 
assignment) which will be managed ondemand in rebalancer
+  private final Map<String, ZNRecord> _ondemandIdealStateCache;
+
   // maintain a cache of bestPossible assignment across pipeline runs
   // TODO: this is only for customRebalancer, remove it and merge it with 
_idealMappingCache.
   private Map<String, ResourceAssignment> _resourceAssignmentCache;
@@ -149,6 +152,7 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     _refreshedChangeTypes = ConcurrentHashMap.newKeySet();
     _customizedStateCache = new CustomizedStateCache(this, 
_aggregationEnabledTypes);
     _customizedViewCacheMap = new HashMap<>();
+    _ondemandIdealStateCache = new HashMap<>();
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
@@ -388,6 +392,28 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     return _lastTopStateLocationMap;
   }
 
+  /**
+   * Get cached ideal state (preference list + best possible assignment) for a 
resource
+   * @param resource
+   * @return
+   */
+  public ZNRecord getCachedOndemandIdealState(String resource) {
+    return _ondemandIdealStateCache.get(resource);
+  }
+
+  /**
+   * Cache ideal state (preference list + best possible assignment) for a 
resource
+   * @param resource
+   * @return
+   */
+  public void setCachedOndemandIdealState(String resource, ZNRecord 
idealState) {
+    _ondemandIdealStateCache.put(resource, idealState);
+  }
+
+  public void clearCachedOndemandIdealStates() {
+    _ondemandIdealStateCache.clear();
+  }
+
   /**
    * Get cached resourceAssignment (bestPossible mapping) for a resource
    * @param resource
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
new file mode 100644
index 000000000..158699a97
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
@@ -0,0 +1,215 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.condition.RebalanceCondition;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@code ConditionBasedRebalancer} class extends the {@link 
AbstractRebalancer} and
+ * perform the rebalance operation based on specific list of conditions 
defined by the
+ * {@link RebalanceCondition} interface.
+ */
+public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControllerDataProvider> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConditionBasedRebalancer.class);
+  private final List<RebalanceCondition> _rebalanceConditions;
+
+  public ConditionBasedRebalancer() {
+    this._rebalanceConditions = new ArrayList<>();
+  }
+
+  public ConditionBasedRebalancer(List<RebalanceCondition> 
rebalanceConditions) {
+    this._rebalanceConditions = rebalanceConditions;
+  }
+
+  /**
+   * Compute new Ideal State iff all conditions are met, otherwise just return 
from cached Ideal State
+   *
+   * @param resourceName        the name of the resource for which to compute 
the new ideal state.
+   * @param currentIdealState   the current {@link IdealState} of the resource.
+   * @param currentStateOutput  the current state output, containing the 
actual states of the
+   *                            partitions.
+   * @param clusterData         the {@link ResourceControllerDataProvider} 
instance providing
+   *                            additional data required for the computation.
+   * @return the newly computed {@link IdealState} for the resource.
+   */
+  @Override
+  public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
+      CurrentStateOutput currentStateOutput, ResourceControllerDataProvider 
clusterData) {
+    if (!this._rebalanceConditions.stream()
+        .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) 
{
+      ZNRecord cachedIdealState = 
clusterData.getCachedOndemandIdealState(resourceName);
+      if (cachedIdealState != null) {
+        return new IdealState(cachedIdealState);
+      }
+      // In theory, the cache should be populated already if no rebalance is 
needed
+      LOG.warn(
+          "Cannot fetch the cached Ideal State for resource: {}, will 
recompute the Ideal State",
+          resourceName);
+    }
+
+    LOG.info("Computing IdealState for " + resourceName);
+
+    List<String> partitions = getStablePartitionList(clusterData, 
currentIdealState);
+    String stateModelName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = 
clusterData.getStateModelDef(stateModelName);
+    if (stateModelDef == null) {
+      LOG.error("State Model Definition null for resource: " + resourceName);
+      throw new HelixException("State Model Definition null for resource: " + 
resourceName);
+    }
+    Map<String, LiveInstance> assignableLiveInstance = 
clusterData.getAssignableLiveInstances();
+    int replicas = 
currentIdealState.getReplicaCount(assignableLiveInstance.size());
+
+    LinkedHashMap<String, Integer> stateCountMap =
+        stateModelDef.getStateCountMap(assignableLiveInstance.size(), 
replicas);
+    List<String> assignableLiveNodes = new 
ArrayList<>(assignableLiveInstance.keySet());
+    List<String> assignableNodes = new 
ArrayList<>(clusterData.getAssignableInstances());
+    assignableNodes.removeAll(clusterData.getDisabledInstances());
+    assignableLiveNodes.retainAll(assignableNodes);
+
+    Map<String, Map<String, String>> currentMapping =
+        currentMapping(currentStateOutput, resourceName, partitions, 
stateCountMap);
+
+    // If there are nodes tagged with resource name, use only those nodes
+    Set<String> taggedNodes = new HashSet<String>();
+    Set<String> taggedLiveNodes = new HashSet<String>();
+    if (currentIdealState.getInstanceGroupTag() != null) {
+      for (String instanceName : assignableNodes) {
+        if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
+            .containsTag(currentIdealState.getInstanceGroupTag())) {
+          taggedNodes.add(instanceName);
+          if (assignableLiveInstance.containsKey(instanceName)) {
+            taggedLiveNodes.add(instanceName);
+          }
+        }
+      }
+      if (!taggedLiveNodes.isEmpty()) {
+        // live nodes exist that have this tag
+        if (LOG.isInfoEnabled()) {
+          LOG.info(
+              "found the following participants with tag " + 
currentIdealState.getInstanceGroupTag()
+                  + " for " + resourceName + ": " + taggedLiveNodes);
+        }
+      } else if (taggedNodes.isEmpty()) {
+        // no live nodes and no configured nodes have this tag
+        LOG.warn("Resource " + resourceName + " has tag " + 
currentIdealState.getInstanceGroupTag()
+            + " but no configured participants have this tag");
+      } else {
+        // configured nodes have this tag, but no live nodes have this tag
+        LOG.warn("Resource " + resourceName + " has tag " + 
currentIdealState.getInstanceGroupTag()
+            + " but no live participants have this tag");
+      }
+      assignableNodes = new ArrayList<>(taggedNodes);
+      assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
+    }
+
+    // sort node lists to ensure consistent preferred assignments
+    Collections.sort(assignableNodes);
+    Collections.sort(assignableLiveNodes);
+
+    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+    _rebalanceStrategy =
+        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), 
partitions, resourceName,
+            stateCountMap, maxPartition);
+    ZNRecord newMapping =
+        _rebalanceStrategy.computePartitionAssignment(assignableNodes, 
assignableLiveNodes,
+            currentMapping, clusterData);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("currentMapping: {}", currentMapping);
+      LOG.debug("stateCountMap: {}", stateCountMap);
+      LOG.debug("assignableLiveNodes: {}", assignableLiveNodes);
+      LOG.debug("assignableNodes: {}", assignableNodes);
+      LOG.debug("maxPartition: {}", maxPartition);
+      LOG.debug("newMapping: {}", newMapping);
+    }
+
+    IdealState newIdealState = new IdealState(resourceName);
+    
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    newIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    newIdealState.getRecord().setListFields(newMapping.getListFields());
+
+    clusterData.setCachedOndemandIdealState(resourceName, 
newIdealState.getRecord());
+
+    return newIdealState;
+  }
+
+  /**
+   * Compute new assignment iff all conditions are met, otherwise just return 
from cached assignment
+   *
+   * @param cache               the {@link ResourceControllerDataProvider} 
instance providing
+   *                            metadata and state information about the 
cluster.
+   * @param idealState          the {@link IdealState} representing the 
current ideal state.
+   * @param resource            the {@link Resource} for which to compute the 
best possible partition
+   *                            state.
+   * @param currentStateOutput  the {@link CurrentStateOutput} containing the 
current states of the
+   *                            partitions.
+   * @return the {@link ResourceAssignment} representing the best possible 
state assignment for the
+   *         partitions of the resource.
+   */
+  @Override
+  public ResourceAssignment 
computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
+      IdealState idealState, Resource resource, CurrentStateOutput 
currentStateOutput) {
+    ZNRecord cachedIdealState = 
cache.getCachedOndemandIdealState(resource.getResourceName());
+    if (!this._rebalanceConditions.stream()
+        .allMatch(condition -> condition.shouldPerformRebalance(cache))) {
+      if (cachedIdealState != null && cachedIdealState.getMapFields() != null) 
{
+        ResourceAssignment partitionMapping = new 
ResourceAssignment(resource.getResourceName());
+        for (Partition partition : resource.getPartitions()) {
+          partitionMapping.addReplicaMap(partition, 
cachedIdealState.getMapFields().get(partition));
+        }
+        return new ResourceAssignment(cachedIdealState);
+      }
+      // In theory, the cache should be populated already if no rebalance is 
needed
+      LOG.warn("Cannot fetch the cached assignment for resource: {}, will 
recompute the assignment",
+          resource.getResourceName());
+    }
+
+    LOG.info("Computing BestPossibleMapping for " + 
resource.getResourceName());
+
+    // TODO: Change the logic to apply different assignment strategy
+    ResourceAssignment assignment =
+        super.computeBestPossiblePartitionState(cache, idealState, resource, 
currentStateOutput);
+    // Cache the assignment so no need to recompute the result next time
+    cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
+    cache.setCachedOndemandIdealState(resource.getResourceName(), 
cachedIdealState);
+
+    return assignment;
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java
new file mode 100644
index 000000000..3089b4342
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java
@@ -0,0 +1,11 @@
+package org.apache.helix.controller.rebalancer.condition;
+
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+
+public class ConfigChangeBasedCondition implements RebalanceCondition {
+  @Override
+  public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) {
+    // TODO: implement the condition check for config change
+    return false;
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java
new file mode 100644
index 000000000..eafff4ef0
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java
@@ -0,0 +1,19 @@
+package org.apache.helix.controller.rebalancer.condition;
+
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+
+/**
+ * The {@code RebalanceCondition} interface defines a condition under which a 
rebalance operation
+ * should be performed. Implementations of this interface provide specific 
criteria to determine
+ * whether a rebalance is necessary based on the current state of the system.
+ */
+public interface RebalanceCondition {
+  /**
+   * Determines whether a rebalance should be performed based on the provided
+   * {@link ResourceControllerDataProvider} cache data.
+   *
+   * @param cache the {@code ResourceControllerDataProvider} cached data of 
the resources being managed.
+   * @return {@code true} if the rebalance should be performed, {@code false} 
otherwise.
+   */
+  boolean shouldPerformRebalance(ResourceControllerDataProvider cache);
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java
new file mode 100644
index 000000000..a5b11e8d6
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java
@@ -0,0 +1,22 @@
+package org.apache.helix.controller.rebalancer.condition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RebalanceConditionsBuilder {
+  private final List<RebalanceCondition> _rebalanceConditions = new 
ArrayList<>();
+
+  public RebalanceConditionsBuilder withConfigChangeBasedCondition() {
+    _rebalanceConditions.add(new ConfigChangeBasedCondition());
+    return this;
+  }
+
+  public RebalanceConditionsBuilder withTopologyChangeBasedCondition() {
+    _rebalanceConditions.add(new TopologyChangeBasedCondition());
+    return this;
+  }
+
+  public List<RebalanceCondition> build() {
+    return _rebalanceConditions;
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java
new file mode 100644
index 000000000..99356a6d0
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java
@@ -0,0 +1,11 @@
+package org.apache.helix.controller.rebalancer.condition;
+
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+
+public class TopologyChangeBasedCondition implements RebalanceCondition {
+  @Override
+  public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) {
+    // TODO: implement the condition check for topology change
+    return false;
+  }
+}

Reply via email to