This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b49d7ae6eadd403a1dc8276917ac0e3e8f8eca92
Author: Neal Sun <[email protected]>
AuthorDate: Mon Nov 23 18:27:26 2020 -0800

    POC
---
 .../ConstraintBasedAlgorithmFactory.java           |  2 +-
 .../ResourceTopStateUsageConstraint.java           | 46 ++++++++++++++++++++++
 .../rebalancer/waged/model/ClusterContext.java     | 17 ++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..237d16c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
+      put(ResourceTopStateUsageConstraint.class.getSimpleName(), 3f);
       put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
     }
   };
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
new file mode 100644
index 0000000..8ba9cdc
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -0,0 +1,46 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the top state resource usage 
on the instance.
+ * The higher the maximum usage value for the capacity key, the lower the 
score will be, implying
+ * that it is that much less desirable to assign anything on the given node.
+ * It is a greedy approach since it evaluates only on the most used capacity 
key.
+ */
+class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
+  @Override
+  protected double getAssignmentScore(AssignableNode node, AssignableReplica 
replica,
+      ClusterContext clusterContext) {
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = 
clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization = 
node.getProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, 
projectedHighestUtilization);
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..2f6650b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new 
HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = 
new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> 
totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? 
capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> 
totalUsage
             .compute(capacityEntry.getKey(),
@@ -90,15 +96,22 @@ public class ClusterContext {
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
       float estimatedMaxUsage = 0;
+      float estimatedTopStateMaxUsage = 0;
       for (String capacityKey : totalCapacity.keySet()) {
         int maxCapacity = totalCapacity.get(capacityKey);
         int usage = totalUsage.getOrDefault(capacityKey, 0);
         float utilization = (maxCapacity == 0) ? 1 : (float) usage / 
maxCapacity;
         estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+
+        int topStateUsage = totalTopStateUsage.getOrDefault(capacityKey, 0);
+        float topStateUtilization = (maxCapacity == 0) ? 1 : (float) 
topStateUsage / maxCapacity;
+        estimatedTopStateMaxUsage = Math.max(estimatedTopStateMaxUsage, 
topStateUtilization);
       }
       _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedTopStateMaxUtilization = estimatedTopStateMaxUsage;
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
     _estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +148,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, 
String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, 
Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());

Reply via email to