narendly commented on a change in pull request #362: The WAGED rebalancer
cluster model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r309921368
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
##########
@@ -19,9 +19,100 @@
* under the License.
*/
+import org.apache.helix.HelixException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed
cluster.
+ * This class tracks the rebalance-related global cluster status.
*/
-public class ClusterContext { }
+public class ClusterContext {
+ private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+ // This estimation helps to ensure global partition count evenness
+ private final int _estimatedMaxPartitionCount;
+ // This estimation helps to ensure global top state replica count evenness
+ private final int _estimatedMaxTopStateCount;
+ // This estimation helps to ensure per-resource partition count evenness
+ private final Map<String, Integer> _estimatedMaxPartitionByResource = new
HashMap<>();
+
+ // map{zoneName : map{resourceName : set(partitionNames)}}
+ private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap =
new HashMap<>();
+
+ /**
+ * Construct the cluster context based on the current instance status.
+ *
+ * @param replicaSet All the partition replicas that are managed by the
rebalancer
+ * @param instanceCount The count of all the active instances that can be
used to host partitions.
+ */
+ ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
+ int totalReplicas = 0;
+ int totalTopStateReplicas = 0;
+
+ for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet())
{
+ int replicas = entry.getValue().size();
+ totalReplicas += replicas;
+
+ int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas,
instanceCount));
+ _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+ totalTopStateReplicas +=
+
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+ }
+
+ _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas,
instanceCount);
+ _estimatedMaxTopStateCount =
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+ }
+
+ public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
+ return _assignmentForFaultZoneMap;
+ }
+
+ public int getEstimatedMaxPartitionCount() {
+ return _estimatedMaxPartitionCount;
+ }
+
+ public int getEstimatedMaxPartitionByResource(String resourceName) {
+ return _estimatedMaxPartitionByResource.get(resourceName);
+ }
+
+ public int getEstimatedMaxTopStateCount() {
+ return _estimatedMaxTopStateCount;
+ }
+
+ public Set<String> getPartitionsForResourceAndFaultZone(String faultZoneId,
String resourceName) {
Review comment:
Minor: Keep the order consistent in both the name and arguments - resource
and fault zone
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services