This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new c40f15b  HardConstraints Implementation and unit tests (#433)
c40f15b is described below

commit c40f15b25a21fa8bbf2c9fddf89fd245c33d7cd9
Author: Yi Wang <i3.wan...@gmail.com>
AuthorDate: Mon Sep 9 13:57:28 2019 -0700

    HardConstraints Implementation and unit tests (#433)
    
    * Implement all of basic Hard Constraints
    1. Partitions count cannot exceed instance's upper limit
    2. Fault zone aware (no same partitions on the same zone)
    3. Partitions weight cannot exceed instance's capacity
    4. Cannot assign inactived partitions
    5. Same partition of different states cannot co-exist in one instance
    6. Instance doesn't have the tag of the replica
---
 .../constraints/FaultZoneAwareConstraint.java      | 43 ++++++++++++
 .../waged/constraints/NodeCapacityConstraint.java  | 50 ++++++++++++++
 .../NodeMaxPartitionLimitConstraint.java           | 40 +++++++++++
 .../constraints/ReplicaActivateConstraint.java     | 41 +++++++++++
 .../SamePartitionOnInstanceConstraint.java         | 39 +++++++++++
 .../waged/constraints/ValidGroupTagConstraint.java | 41 +++++++++++
 .../rebalancer/waged/model/AssignableNode.java     | 75 +++++++++++---------
 .../rebalancer/waged/model/AssignableReplica.java  |  4 ++
 .../constraints/TestFaultZoneAwareConstraint.java  | 79 ++++++++++++++++++++++
 .../constraints/TestNodeCapacityConstraint.java    | 54 +++++++++++++++
 .../TestNodeMaxPartitionLimitConstraint.java       | 56 +++++++++++++++
 .../TestPartitionActivateConstraint.java           | 64 ++++++++++++++++++
 .../TestSamePartitionOnInstanceConstraint.java     | 59 ++++++++++++++++
 .../constraints/TestValidGroupTagConstraint.java   | 66 ++++++++++++++++++
 14 files changed, 679 insertions(+), 32 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java
new file mode 100644
index 0000000..c33419e
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java
@@ -0,0 +1,43 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class FaultZoneAwareConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!node.hasFaultZone()) {
+      return true;
+    }
+    return !clusterContext
+        .getPartitionsForResourceAndFaultZone(replica.getResourceName(), 
node.getFaultZone())
+        .contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "A fault zone cannot contain more than 1 replica of same partition";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
new file mode 100644
index 0000000..5fc2faf
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
@@ -0,0 +1,50 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class NodeCapacityConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    Map<String, Integer> nodeCapacity = node.getCurrentCapacity();
+    Map<String, Integer> replicaCapacity = replica.getCapacity();
+
+    for (String key : replicaCapacity.keySet()) {
+      if (nodeCapacity.containsKey(key)) {
+        if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  String getDescription() {
+    return "Node has insufficient capacity";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
new file mode 100644
index 0000000..9d0752b
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class NodeMaxPartitionLimitConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    return node.getAssignedReplicaCount() < node.getMaxPartition()
+        && 
node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
+            .getResourceMaxPartitionsPerInstance();
+  }
+
+  @Override
+  String getDescription() {
+    return "Cannot exceed the maximum number of partitions limitation on node";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java
new file mode 100644
index 0000000..9152efe
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java
@@ -0,0 +1,41 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class ReplicaActivateConstraint extends HardConstraint {
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    List<String> disabledPartitions =
+        node.getDisabledPartitionsMap().get(replica.getResourceName());
+    return disabledPartitions == null || 
!disabledPartitions.contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "Cannot assign the inactive replica";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java
new file mode 100644
index 0000000..202e49a
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java
@@ -0,0 +1,39 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class SamePartitionOnInstanceConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    return !node.getAssignedPartitionsByResource(replica.getResourceName())
+        .contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "Same partition of different states cannot co-exist in one 
instance";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java
new file mode 100644
index 0000000..e31864f
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java
@@ -0,0 +1,41 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class ValidGroupTagConstraint extends HardConstraint {
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!replica.hasResourceInstanceGroupTag()) {
+      return true;
+    }
+
+    return 
node.getInstanceTags().contains(replica.getResourceInstanceGroupTag());
+  }
+
+  @Override
+  String getDescription() {
+    return "Instance doesn't have the tag of the replica";
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 4141d20..f25c289 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import static java.lang.Math.max;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,8 +37,6 @@ import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.Math.max;
-
 /**
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
@@ -52,7 +52,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   private Map<String, Integer> _maxCapacity;
   private int _maxPartition; // maximum number of the partitions that can be 
assigned to the node.
 
-  // A map of <resource name, <partition name, replica>> that tracks the 
replicas assigned to the node.
+  // A map of <resource name, <partition name, replica>> that tracks the 
replicas assigned to the
+  // node.
   private Map<String, Map<String, AssignableReplica>> 
_currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available 
node capacity
   private Map<String, Integer> _currentCapacityMap;
@@ -78,13 +79,15 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   }
 
   /**
-   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculates currentCapacity.
-   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
-   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculates
+   * currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the
+   * clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and
+   * ResourceConfig could
    * subject to change. If the assumption is no longer true, this function 
should become private.
-   *
-   * @param clusterConfig      - the Cluster Config of the cluster where the 
node is located
-   * @param instanceConfig     - the Instance Config of the node
+   * @param clusterConfig - the Cluster Config of the cluster where the node 
is located
+   * @param instanceConfig - the Instance Config of the node
    * @param existingAssignment - all the existing replicas that are current 
assigned to the node
    */
   private void refresh(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig,
@@ -104,7 +107,6 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
 
   /**
    * Assign a replica to the node.
-   *
    * @param assignableReplica - the replica to be assigned
    */
   void assign(AssignableReplica assignableReplica) {
@@ -116,7 +118,6 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   /**
    * Release a replica from the node.
    * If the replication is not on this node, the assignable node is not 
updated.
-   *
    * @param replica - the replica to be released
    */
   void release(AssignableReplica replica) throws IllegalArgumentException {
@@ -131,8 +132,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     }
 
     Map<String, AssignableReplica> partitionMap = 
_currentAssignedReplicaMap.get(resourceName);
-    if (!partitionMap.containsKey(partitionName) || 
!partitionMap.get(partitionName)
-        .equals(replica)) {
+    if (!partitionMap.containsKey(partitionName)
+        || !partitionMap.get(partitionName).equals(replica)) {
       LOG.warn("Replica {} is not assigned to node {}. Ignore the release 
call.",
           replica.toString(), getInstanceName());
       return;
@@ -174,7 +175,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
 
   /**
    * @param resource Resource name
-   * @return A set of the current assigned replicas' partition names with the 
top state in the specified resource.
+   * @return A set of the current assigned replicas' partition names with the 
top state in the
+   *         specified resource.
    */
   public Set<String> getAssignedTopStatePartitionsByResource(String resource) {
     return _currentAssignedReplicaMap.getOrDefault(resource, 
Collections.emptyMap()).entrySet()
@@ -194,7 +196,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   /**
    * @return The total count of assigned replicas.
    */
-  public long getAssignedReplicaCount() {
+  public int getAssignedReplicaCount() {
     return 
_currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum();
   }
 
@@ -207,7 +209,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
 
   /**
    * Return the most concerning capacity utilization number for evenly 
partition assignment.
-   * The method dynamically returns the highest utilization number among all 
the capacity categories.
+   * The method dynamically returns the highest utilization number among all 
the capacity
+   * categories.
    * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 
0.6}. Then this call shall
    * return 0.9.
    *
@@ -229,15 +232,21 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     return _faultZone;
   }
 
+  public boolean hasFaultZone() {
+    return _faultZone != null;
+  }
+
   /**
-   * @return A map of <resource name, set of partition names> contains all the 
partitions that are disabled on the node.
+   * @return A map of <resource name, set of partition names> contains all the 
partitions that are
+   *         disabled on the node.
    */
   public Map<String, List<String>> getDisabledPartitionsMap() {
     return _disabledPartitionsMap;
   }
 
   /**
-   * @return A map of <capacity category, capacity number> that describes the 
max capacity of the node.
+   * @return A map of <capacity category, capacity number> that describes the 
max capacity of the
+   *         node.
    */
   public Map<String, Integer> getMaxCapacity() {
     return _maxCapacity;
@@ -251,8 +260,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   }
 
   /**
-   * Computes the fault zone id based on the domain and fault zone type when 
topology is enabled. For example, when
-   * the domain is "zone=2, instance=testInstance" and the fault zone type is 
"zone", this function returns "2".
+   * Computes the fault zone id based on the domain and fault zone type when 
topology is enabled.
+   * For example, when
+   * the domain is "zone=2, instance=testInstance" and the fault zone type is 
"zone", this function
+   * returns "2".
    * If cannot find the fault zone id, this function leaves the fault zone id 
as the instance name.
    * TODO merge this logic with Topology.java tree building logic.
    * For now, the WAGED rebalancer has a more strict topology def requirement.
@@ -267,8 +278,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
       }
 
       String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0 || Arrays.stream(topologyDef)
-          .noneMatch(type -> type.equals(faultZoneType))) {
+      if (topologyDef.length == 0
+          || Arrays.stream(topologyDef).noneMatch(type -> 
type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain 
the fault zone type.");
       }
@@ -304,7 +315,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   }
 
   /**
-   * This function should only be used to assign a set of new partitions that 
are not allocated on this node.
+   * This function should only be used to assign a set of new partitions that 
are not allocated on
+   * this node.
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
   private void assignNewBatch(Collection<AssignableReplica> replicas) {
@@ -314,9 +326,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
       // increment the capacity requirement according to partition's capacity 
configuration.
       for (Map.Entry<String, Integer> capacity : 
replica.getCapacity().entrySet()) {
         totalPartitionCapacity.compute(capacity.getKey(),
-            (key, totalValue) -> (totalValue == null) ?
-                capacity.getValue() :
-                totalValue + capacity.getValue());
+            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                : totalValue + capacity.getValue());
       }
     }
 
@@ -332,12 +343,12 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   private void addToAssignmentRecord(AssignableReplica replica) {
     String resourceName = replica.getResourceName();
     String partitionName = replica.getPartitionName();
-    if (_currentAssignedReplicaMap.containsKey(resourceName) && 
_currentAssignedReplicaMap
-        .get(resourceName).containsKey(partitionName)) {
-      throw new HelixException(String
-          .format("Resource %s already has a replica with state %s from 
partition %s on node %s",
-              replica.getResourceName(), replica.getReplicaState(), 
replica.getPartitionName(),
-              getInstanceName()));
+    if (_currentAssignedReplicaMap.containsKey(resourceName)
+        && 
_currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
+      throw new HelixException(String.format(
+          "Resource %s already has a replica with state %s from partition %s 
on node %s",
+          replica.getResourceName(), replica.getReplicaState(), 
replica.getPartitionName(),
+          getInstanceName()));
     } else {
       _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new 
HashMap<>())
           .put(partitionName, replica);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 537bf70..66bd7b7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -87,6 +87,10 @@ public class AssignableReplica implements 
Comparable<AssignableReplica> {
     return _resourceInstanceGroupTag;
   }
 
+  public boolean hasResourceInstanceGroupTag() {
+    return _resourceInstanceGroupTag != null && 
!_resourceInstanceGroupTag.isEmpty();
+  }
+
   public int getResourceMaxPartitionsPerInstance() {
     return _resourceMaxPartitionsPerInstance;
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java
new file mode 100644
index 0000000..9d2cb14
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestFaultZoneAwareConstraint {
+  private static final String TEST_PARTITION = "testPartition";
+  private static final String TEST_ZONE = "testZone";
+  private static final String TEST_RESOURCE = "testResource";
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+
+  private final HardConstraint _faultZoneAwareConstraint = new 
FaultZoneAwareConstraint();
+
+  @BeforeMethod
+  public void init() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getFaultZone()).thenReturn(TEST_ZONE);
+  }
+
+  @Test
+  public void inValidWhenFaultZoneAlreadyAssigned() {
+    when(_testNode.hasFaultZone()).thenReturn(true);
+    when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, 
TEST_ZONE)).thenReturn(
+            ImmutableSet.of(TEST_PARTITION));
+
+    Assert.assertFalse(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void validWhenEmptyAssignment() {
+    when(_testNode.hasFaultZone()).thenReturn(true);
+    when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, 
TEST_ZONE)).thenReturn(Collections.emptySet());
+
+    Assert.assertTrue(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void validWhenNoFaultZone() {
+    when(_testNode.hasFaultZone()).thenReturn(false);
+
+    Assert.assertTrue(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
new file mode 100644
index 0000000..511f881
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
@@ -0,0 +1,54 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestNodeCapacityConstraint {
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new NodeCapacityConstraint();
+
+  @Test
+  public void testConstraintValidWhenNodeHasEnoughSpace() {
+    String key = "testKey";
+    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  10));
+    when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValidWhenNodeHasInsufficientSpace() {
+    String key = "testKey";
+    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  1));
+    when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java
new file mode 100644
index 0000000..4cb7466
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java
@@ -0,0 +1,56 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNodeMaxPartitionLimitConstraint {
+  private static final String TEST_RESOURCE = "TestResource";
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new 
NodeMaxPartitionLimitConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(0);
+    when(_testNode.getMaxPartition()).thenReturn(10);
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(Collections.emptySet());
+    when(_testReplica.getResourceMaxPartitionsPerInstance()).thenReturn(5);
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintInvalid() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+    when(_testNode.getMaxPartition()).thenReturn(5);
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java
new file mode 100644
index 0000000..ecfdaa2
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestPartitionActivateConstraint {
+  private static final String TEST_PARTITION = "TestPartition";
+  private static final String TEST_RESOURCE = "TestResource";
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new ReplicaActivateConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, Collections.emptyList()));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, 
ImmutableList.of("dummy")));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintInvalidWhenReplicaIsDisabled() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, 
ImmutableList.of(TEST_PARTITION)));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java
new file mode 100644
index 0000000..50b0c03
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestSamePartitionOnInstanceConstraint {
+  private static final String TEST_RESOURCE = "TestResource";
+  private static final String TEST_PARTITIOIN = TEST_RESOURCE + "0";
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new 
SamePartitionOnInstanceConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(ImmutableSet.of("dummy"));
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN);
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValid() {
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(ImmutableSet.of(TEST_PARTITIOIN));
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN);
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java
new file mode 100644
index 0000000..8d02b3d
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java
@@ -0,0 +1,66 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestValidGroupTagConstraint {
+  private static final String TEST_TAG = "testTag";
+  private final AssignableReplica _testReplica = 
Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = 
Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new ValidGroupTagConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true);
+    when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG);
+    when(_testNode.getInstanceTags()).thenReturn(ImmutableSet.of(TEST_TAG));
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValid() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true);
+    when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG);
+    when(_testNode.getInstanceTags()).thenReturn(Collections.emptySet());
+
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+
+  @Test
+  public void testConstraintWhenReplicaHasNoTag() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(false);
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, 
_clusterContext));
+  }
+}

Reply via email to