pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307943700
 
 

 ##########
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterDataProvider.java
 ##########
 @@ -0,0 +1,146 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The data provider generates the Cluster Model based on the controller's 
data cache.
+ */
+public class ClusterDataProvider {
+  /**
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be 
rebalanced. Note that any
+   *                               resources that are not in this list will be 
removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in 
the calculation.
+   *                               Note this list can be different from the 
real active node list
+   *                               according to the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after 
the previous rebalance.
+   * @param baselineAssignment     The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that 
was generated in the
+   *                               previous rebalance.
+   * @return The cluster model as the input for the upcoming rebalance.
+   */
+  public static ClusterModel 
generateClusterModel(ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+      Map<String, IdealState> baselineAssignment, Map<String, IdealState> 
bestPossibleAssignment) {
+    // Initialize all assignable replicas according to the input
+    Set<AssignableReplica> assignableReplicas =
+        generateAssignableReplicas(dataProvider, resourceMap, 
activeInstances.size(),
+            bestPossibleAssignment, clusterChanges);
+
+    // TODO split the to-be-assigned replications to separate list.
+    Map<String, Set<AssignableReplica>> assignedReplicaMap = new HashMap<>();
+
+    // Construct cluster context.
+    ClusterContext context = new ClusterContext(assignableReplicas, 
activeInstances.size());
+
+    // Construct all the assignble nodes and initialize with the confirmed 
assignment.
+    Set<AssignableNode> assignableNodes = activeInstances.stream().map(
+        instanceName -> new AssignableNode(dataProvider, instanceName,
+            assignedReplicaMap.getOrDefault(instanceName, 
Collections.emptySet())))
+        .collect(Collectors.toSet());
+
+    // Initial the cluster context object with the confirmed assignments.
+    Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap = new 
HashMap<>();
+    assignableNodes.stream()
+        .forEach(node -> addNodeAssignmentsToFaultZone(node, 
assignmentForFaultZoneMap));
+    context.setAssignmentForFaultZoneMap(assignmentForFaultZoneMap);
+
+    return new ClusterModel(context, assignableReplicas, assignableNodes, 
baselineAssignment,
+        bestPossibleAssignment);
+  }
+
+  /**
+   * Find all the replications that need to be reallocated.
+   *
+   * @param dataProvider           The cluster status cache that contains the 
current cluster status.
+   * @param resourceMap            All the valid resources that are managed by 
the rebalancer.
+   * @param bestPossibleAssignment The persisted Best Possible State.
+   * @param clusterChanges         All the cluster changes that happened after 
the previous rebalance.
+   * @return A set of assignable replications that need reallocation.
+   */
+  private static Set<AssignableReplica> generateAssignableReplicas(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> 
resourceMap,
+      int instanceCount, Map<String, IdealState> bestPossibleAssignment,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges) {
+    Set<AssignableReplica> totalReplicas = new HashSet<>();
+
+    for (String resourceName : resourceMap.keySet()) {
+      ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+      IdealState is = dataProvider.getIdealState(resourceName);
+      if (is == null) {
+        throw new HelixException(
+            "Cannot find the resource ideal state for resource: " + 
resourceName);
+      }
+      String defName = is.getStateModelDefRef();
+      StateModelDefinition def = dataProvider.getStateModelDef(defName);
+      if (def == null) {
+        throw new IllegalArgumentException(String
+            .format("Cannot fine state model definition %s for resource %s.",
+                is.getStateModelDefRef(), resourceName));
+      }
+
+      Map<String, Integer> stateCountMap =
+          def.getStateCountMap(instanceCount, 
is.getReplicaCount(instanceCount));
+
+      for (String partition : is.getPartitionSet()) {
+        for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
+          String state = entry.getKey();
+          for (int i = 0; i < entry.getValue(); i++) {
+            totalReplicas.add(new AssignableReplica(config, partition, state,
+                def.getStatePriorityMap().get(state)));
+          }
+        }
+      }
+    }
+    return totalReplicas;
+  }
+
+  private static void addNodeAssignmentsToFaultZone(AssignableNode node,
+      Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
+    String faultZoneId = node.getFaultZone();
+    if (!assignmentForFaultZoneMap.containsKey(faultZoneId)) {
+      assignmentForFaultZoneMap.put(faultZoneId, new HashMap<>());
+    }
+    for (Map.Entry<String, Set<String>> entry : 
node.getCurrentAssignmentsMap().entrySet()) {
+      String resourceName = entry.getKey();
+      if 
(!assignmentForFaultZoneMap.get(faultZoneId).containsKey(resourceName)) {
+        assignmentForFaultZoneMap.get(faultZoneId).put(resourceName, new 
HashSet<>());
+      }
+      
assignmentForFaultZoneMap.get(faultZoneId).get(resourceName).addAll(entry.getValue());
 
 Review comment:
   `computeIfAbsent` since you've use a lot of Java 8? I think it is fine if 
you prefer to do in this way. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to