Repository: hadoop
Updated Branches:
  refs/heads/YARN-7812 [created] e6d2d26a1


YARN-7822. Constraint satisfaction checker support for composite OR and AND 
constraints. (Weiwei Yang via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6d2d26a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6d2d26a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6d2d26a

Branch: refs/heads/YARN-7812
Commit: e6d2d26a13337adc995efa7bcb77181871930796
Parents: 6ae4cc9
Author: Arun Suresh <asur...@apache.org>
Authored: Tue Jan 30 10:15:33 2018 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Tue Jan 30 10:15:33 2018 -0800

----------------------------------------------------------------------
 .../TestPlacementConstraintTransformations.java |   2 +-
 .../constraint/PlacementConstraintsUtil.java    |  53 +++-
 .../TestPlacementConstraintsUtil.java           | 278 ++++++++++++++++---
 .../constraint/TestPlacementProcessor.java      | 159 ++++++++++-
 4 files changed, 444 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6d2d26a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
index 62da092..aa92d7a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
@@ -156,7 +156,7 @@ public class TestPlacementConstraintTransformations {
     SingleConstraintTransformer singleTransformer =
         new SingleConstraintTransformer(specConstraint);
     PlacementConstraint simConstraint = singleTransformer.transform();
-    Assert.assertTrue(constraintExpr instanceof Or);
+    Assert.assertTrue(simConstraint.getConstraintExpr() instanceof Or);
     Or simOrExpr = (Or) specConstraint.getConstraintExpr();
     for (AbstractConstraint child : simOrExpr.getChildren()) {
       Assert.assertTrue(child instanceof SingleConstraint);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6d2d26a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 199dd62..6396e57 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
@@ -149,6 +151,48 @@ public final class PlacementConstraintsUtil {
     return true;
   }
 
+  /**
+   * Returns true if all child constraints are satisfied.
+   * @param appId application id
+   * @param constraint Or constraint
+   * @param node node
+   * @param atm allocation tags manager
+   * @return true if all child constraints are satisfied, false otherwise
+   * @throws InvalidAllocationTagsQueryException
+   */
+  private static boolean canSatisfyAndConstraint(ApplicationId appId,
+      And constraint, SchedulerNode node, AllocationTagsManager atm)
+      throws InvalidAllocationTagsQueryException {
+    // Iterate over the constraints tree, if found any child constraint
+    // isn't satisfied, return false.
+    for (AbstractConstraint child : constraint.getChildren()) {
+      if(!canSatisfyConstraints(appId, child.build(), node, atm)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns true as long as any of child constraint is satisfied.
+   * @param appId application id
+   * @param constraint Or constraint
+   * @param node node
+   * @param atm allocation tags manager
+   * @return true if any child constraint is satisfied, false otherwise
+   * @throws InvalidAllocationTagsQueryException
+   */
+  private static boolean canSatisfyOrConstraint(ApplicationId appId,
+      Or constraint, SchedulerNode node, AllocationTagsManager atm)
+      throws InvalidAllocationTagsQueryException {
+    for (AbstractConstraint child : constraint.getChildren()) {
+      if (canSatisfyConstraints(appId, child.build(), node, atm)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private static boolean canSatisfyConstraints(ApplicationId appId,
       PlacementConstraint constraint, SchedulerNode node,
       AllocationTagsManager atm)
@@ -167,9 +211,16 @@ public final class PlacementConstraintsUtil {
     if (sConstraintExpr instanceof SingleConstraint) {
       SingleConstraint single = (SingleConstraint) sConstraintExpr;
       return canSatisfySingleConstraint(appId, single, node, atm);
+    } else if (sConstraintExpr instanceof And) {
+      And and = (And) sConstraintExpr;
+      return canSatisfyAndConstraint(appId, and, node, atm);
+    } else if (sConstraintExpr instanceof Or) {
+      Or or = (Or) sConstraintExpr;
+      return canSatisfyOrConstraint(appId, or, node, atm);
     } else {
       throw new InvalidAllocationTagsQueryException(
-          "Unsupported type of constraint.");
+          "Unsupported type of constraint: "
+              + sConstraintExpr.getClass().getSimpleName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6d2d26a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index a5460c2..5135f63 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -21,7 +21,12 @@ import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.AbstractMap;
 import java.util.Arrays;
@@ -34,6 +39,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -44,12 +50,11 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -66,9 +71,10 @@ public class TestPlacementConstraintsUtil {
   private RMContext rmContext;
   private static final int GB = 1024;
   private ApplicationId appId1;
-  private PlacementConstraint c1, c2, c3, c4;
+  private PlacementConstraint c1, c2, c3, c4, c5, c6, c7;
   private Set<String> sourceTag1, sourceTag2;
-  private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
+  private Map<Set<String>, PlacementConstraint> constraintMap1,
+      constraintMap2, constraintMap3, constraintMap4;
   private AtomicLong requestID = new AtomicLong(0);
 
   @Before
@@ -92,6 +98,16 @@ public class TestPlacementConstraintsUtil {
         .build(targetNotIn(NODE, allocationTag("hbase-m")));
     c4 = PlacementConstraints
         .build(targetNotIn(RACK, allocationTag("hbase-rs")));
+    c5 = PlacementConstraints
+        .build(and(targetNotIn(NODE, allocationTag("hbase-m")),
+            maxCardinality(NODE, 3, "spark")));
+    c6 = PlacementConstraints
+        .build(or(targetIn(NODE, allocationTag("hbase-m")),
+            targetIn(NODE, allocationTag("hbase-rs"))));
+    c7 = PlacementConstraints
+        .build(or(targetIn(NODE, allocationTag("hbase-m")),
+            and(targetIn(NODE, allocationTag("hbase-rs")),
+                targetIn(NODE, allocationTag("spark")))));
 
     sourceTag1 = new HashSet<>(Arrays.asList("spark"));
     sourceTag2 = new HashSet<>(Arrays.asList("zk"));
@@ -106,6 +122,15 @@ public class TestPlacementConstraintsUtil {
             new AbstractMap.SimpleEntry<>(sourceTag2, c4))
         .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
             AbstractMap.SimpleEntry::getValue));
+    constraintMap3 = Stream
+        .of(new AbstractMap.SimpleEntry<>(sourceTag1, c5))
+        .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+            AbstractMap.SimpleEntry::getValue));
+    constraintMap4 = Stream
+        .of(new AbstractMap.SimpleEntry<>(sourceTag1, c6),
+            new AbstractMap.SimpleEntry<>(sourceTag2, c7))
+        .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+            AbstractMap.SimpleEntry::getValue));
   }
 
   private SchedulingRequest createSchedulingRequest(Set<String> allocationTags,
@@ -124,6 +149,20 @@ public class TestPlacementConstraintsUtil {
     return createSchedulingRequest(allocationTags, null);
   }
 
+  private ContainerId newContainerId(ApplicationId appId) {
+    return ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(appId, 0), 0);
+  }
+
+  private SchedulerNode newSchedulerNode(String hostname, String rackName,
+      NodeId nodeId) {
+    SchedulerNode node = mock(SchedulerNode.class);
+    when(node.getNodeName()).thenReturn(hostname);
+    when(node.getRackName()).thenReturn(rackName);
+    when(node.getNodeID()).thenReturn(nodeId);
+    return node;
+  }
+
   @Test
   public void testNodeAffinityAssignment()
       throws InvalidAllocationTagsQueryException {
@@ -137,8 +176,9 @@ public class TestPlacementConstraintsUtil {
     Iterator<RMNode> nodeIterator = rmNodes.iterator();
     while (nodeIterator.hasNext()) {
       RMNode currentNode = nodeIterator.next();
-      FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
-          currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
+      SchedulerNode schedulerNode =newSchedulerNode(currentNode.getHostName(),
+          currentNode.getRackName(), currentNode.getNodeID());
+
       Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
           createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm));
       Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@@ -153,14 +193,15 @@ public class TestPlacementConstraintsUtil {
     RMNode n1_r1 = rmNodes.get(1);
     RMNode n2_r2 = rmNodes.get(2);
     RMNode n3_r2 = rmNodes.get(3);
-    FiCaSchedulerNode schedulerNode0 = TestUtils
-        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode1 = TestUtils
-        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode2 = TestUtils
-        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode3 = TestUtils
-        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+        n0_r1.getRackName(), n0_r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+        n1_r1.getRackName(), n1_r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+        n2_r2.getRackName(), n2_r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+        n3_r2.getRackName(), n3_r2.getNodeID());
+
     // 1 Containers on node 0 with allocationTag 'hbase-m'
     ContainerId hbase_m = ContainerId
         .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@@ -200,14 +241,15 @@ public class TestPlacementConstraintsUtil {
         .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
     tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
 
-    FiCaSchedulerNode schedulerNode0 = TestUtils
-        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode1 = TestUtils
-        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode2 = TestUtils
-        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode3 = TestUtils
-        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+        n0_r1.getRackName(), n0_r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+        n1_r1.getRackName(), n1_r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+        n2_r2.getRackName(), n2_r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+        n3_r2.getRackName(), n3_r2.getNodeID());
+
     // 'zk' placement on Rack1 should now SUCCEED
     Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
         createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
@@ -238,14 +280,16 @@ public class TestPlacementConstraintsUtil {
     RMNode n1_r1 = rmNodes.get(1);
     RMNode n2_r2 = rmNodes.get(2);
     RMNode n3_r2 = rmNodes.get(3);
-    FiCaSchedulerNode schedulerNode0 = TestUtils
-        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode1 = TestUtils
-        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode2 = TestUtils
-        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode3 = TestUtils
-        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+        n0_r1.getRackName(), n0_r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+        n1_r1.getRackName(), n1_r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+        n2_r2.getRackName(), n2_r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+        n3_r2.getRackName(), n3_r2.getNodeID());
+
     // 1 Containers on node 0 with allocationTag 'hbase-m'
     ContainerId hbase_m = ContainerId
         .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@@ -285,14 +329,14 @@ public class TestPlacementConstraintsUtil {
         .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
     tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
 
-    FiCaSchedulerNode schedulerNode0 = TestUtils
-        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode1 = TestUtils
-        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode2 = TestUtils
-        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
-    FiCaSchedulerNode schedulerNode3 = TestUtils
-        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+        n0_r1.getRackName(), n0_r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+        n1_r1.getRackName(), n1_r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+        n2_r2.getRackName(), n2_r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+        n3_r2.getRackName(), n3_r2.getNodeID());
 
     // 'zk' placement on Rack1 should FAIL
     Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@@ -306,4 +350,162 @@ public class TestPlacementConstraintsUtil {
     Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
         createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
   }
+
+  @Test
+  public void testORConstraintAssignment()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    // Register App1 with anti-affinity constraint map.
+    pcm.registerApplication(appId1, constraintMap4);
+    RMNode n0r1 = rmNodes.get(0);
+    RMNode n1r1 = rmNodes.get(1);
+    RMNode n2r2 = rmNodes.get(2);
+    RMNode n3r2 = rmNodes.get(3);
+
+    /**
+     * Place container:
+     *  n0: hbase-m(1)
+     *  n1: ""
+     *  n2: hbase-rs(1)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("hbase-m"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("hbase-rs"));
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+        .get("hbase-m").longValue());
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+        .get("hbase-rs").longValue());
+
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+        n1r1.getRackName(), n1r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+        n2r2.getRackName(), n2r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+        n3r2.getRackName(), n3r2.getNodeID());
+
+    // n0 and n2 should be qualified for allocation as
+    // they either have hbase-m or hbase-rs tag
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+    /**
+     * Place container:
+     *  n0: hbase-m(1)
+     *  n1: ""
+     *  n2: hbase-rs(1)
+     *  n3: hbase-rs(1)
+     */
+    tm.addContainer(n3r2.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("hbase-rs"));
+    // n3 is qualified now because it is allocated with hbase-rs tag
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+    /**
+     * Place container:
+     *  n0: hbase-m(1)
+     *  n1: ""
+     *  n2: hbase-rs(1), spark(1)
+     *  n3: hbase-rs(1)
+     */
+    // Place
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("spark"));
+    // According to constraint, "zk" is allowed to be placed on a node
+    // has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
+  }
+
+  @Test
+  public void testANDConstraintAssignment()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    // Register App1 with anti-affinity constraint map.
+    pcm.registerApplication(appId1, constraintMap3);
+    RMNode n0r1 = rmNodes.get(0);
+    RMNode n1r1 = rmNodes.get(1);
+    RMNode n2r2 = rmNodes.get(2);
+    RMNode n3r2 = rmNodes.get(3);
+
+    /**
+     * Place container:
+     *  n0: hbase-m(1)
+     *  n1: ""
+     *  n2: hbase-m(1)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("hbase-m"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(appId1), ImmutableSet.of("hbase-m"));
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+        .get("hbase-m").longValue());
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+        .get("hbase-m").longValue());
+
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+        n1r1.getRackName(), n1r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+        n2r2.getRackName(), n2r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+        n3r2.getRackName(), n3r2.getNodeID());
+
+    // Anti-affinity with hbase-m so it should not be able to be placed
+    // onto n0 and n2 as they already have hbase-m allocated.
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+    /**
+     * Place container:
+     *  n0: hbase-m(1)
+     *  n1: spark(3)
+     *  n2: hbase-m(1)
+     *  n3: ""
+     */
+    for (int i=0; i<4; i++) {
+      tm.addContainer(n1r1.getNodeID(),
+          newContainerId(appId1), ImmutableSet.of("spark"));
+    }
+    Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
+        .get("spark").longValue());
+
+    // Violate cardinality constraint
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6d2d26a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 698c17b..a530230 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -60,6 +60,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.lang.Thread.sleep;
+import static 
org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
@@ -142,7 +143,8 @@ public class TestPlacementProcessor {
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 4);
 
     Assert.assertEquals(4, allocatedContainers.size());
     Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -195,7 +197,8 @@ public class TestPlacementProcessor {
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 5);
 
     Assert.assertEquals(5, allocatedContainers.size());
     Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -244,7 +247,8 @@ public class TestPlacementProcessor {
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 8);
 
     Assert.assertEquals(8, allocatedContainers.size());
     Map<NodeId, Long> nodeIdContainerIdMap =
@@ -294,7 +298,8 @@ public class TestPlacementProcessor {
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 5);
 
     Assert.assertEquals(5, allocatedContainers.size());
     Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -347,7 +352,8 @@ public class TestPlacementProcessor {
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 6);
 
     Assert.assertEquals(6, allocatedContainers.size());
     Map<NodeId, Long> nodeIdContainerIdMap =
@@ -584,7 +590,7 @@ public class TestPlacementProcessor {
     // Ensure unique nodes
     Assert.assertEquals(4, nodeIds.size());
     RejectedSchedulingRequest rej = rejectedReqs.get(0);
-    Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
+    Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
         rej.getReason());
 
     QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@@ -592,9 +598,145 @@ public class TestPlacementProcessor {
     verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
   }
 
+  @Test(timeout = 300000)
+  public void testAndOrPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 40960, 100,
+        rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 40960, 100,
+        rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 40960, 100,
+        rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 40960, 100,
+        rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+
+    // Register app1 with following constraints
+    // 1) foo anti-affinity with foo on node
+    // 2) bar anti-affinity with foo on node AND maxCardinality = 2
+    // 3) moo affinity with foo OR bar
+    Map<Set<String>, PlacementConstraint> app1Constraints = new HashMap<>();
+    app1Constraints.put(Collections.singleton("foo"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+    app1Constraints.put(Collections.singleton("bar"),
+        PlacementConstraints.build(
+            PlacementConstraints.and(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo")),
+            PlacementConstraints.maxCardinality(NODE, 2, "bar"))));
+    app1Constraints.put(Collections.singleton("moo"),
+        PlacementConstraints.build(
+            PlacementConstraints.or(
+                PlacementConstraints.targetIn(NODE, allocationTag("foo")),
+                PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, app1Constraints);
+
+    // Allocates 3 foo containers on 3 different nodes,
+    // in anti-affinity fashion.
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo")
+    ));
+    List<Container> allocatedContainers = new ArrayList<>();
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 3);
+    printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+    Assert.assertEquals(3, allocatedContainers.size());
+
+    /** Testing AND placement constraint**/
+    // Now allocates a bar container, as restricted by the AND constraint,
+    // bar could be only allocated to the node without foo
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "bar")
+        ));
+    allocatedContainers.clear();
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 1);
+    printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+    Assert.assertEquals(1, allocatedContainers.size());
+    NodeId barNode = allocatedContainers.get(0).getNodeId();
+
+    // Sends another 3 bar request, 2 of them can be allocated
+    // as maxCardinality is 2, for placed containers, they should be all
+    // on the node where the last bar was placed.
+    allocatedContainers.clear();
+    List<RejectedSchedulingRequest> rejectedContainers = new ArrayList<>();
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 2, 1, 512, "bar"),
+            schedulingRequest(1, 3, 1, 512, "bar"),
+            schedulingRequest(1, 4, 1, 512, "bar")
+        ));
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, rejectedContainers, 2);
+    printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+    Assert.assertEquals(2, allocatedContainers.size());
+    Assert.assertTrue(allocatedContainers.stream().allMatch(
+        container -> container.getNodeId().equals(barNode)));
+
+    // The third request could not be satisfied because it violates
+    // the cardinality constraint. Validate rejected request correctly
+    // capture this.
+    Assert.assertEquals(1, rejectedContainers.size());
+    Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
+        rejectedContainers.get(0).getReason());
+
+    /** Testing OR placement constraint**/
+    // Register one more NM for testing
+    MockNM nm5 = new MockNM("h5:1234", 4096, 100,
+        rm.getResourceTrackerService());
+    nodes.put(nm5.getNodeId(), nm5);
+    nm5.registerNode();
+    nm5.nodeHeartbeat(true);
+
+    List<SchedulingRequest> mooRequests = new ArrayList<>();
+    for (int i=5; i<25; i++) {
+      mooRequests.add(schedulingRequest(1, i, 1, 100, "moo"));
+    }
+    am1.addSchedulingRequest(mooRequests);
+    allocatedContainers.clear();
+    waitForContainerAllocation(nodes.values(), am1,
+        allocatedContainers, new ArrayList<>(), 20);
+
+    // All 20 containers should be allocated onto nodes besides nm5,
+    // because moo affinity to foo or bar which only exists on rest of nodes.
+    Assert.assertEquals(20, allocatedContainers.size());
+    for (Container mooContainer : allocatedContainers) {
+      // nm5 has no moo allocated containers.
+      Assert.assertFalse(mooContainer.getNodeId().equals(nm5.getNodeId()));
+    }
+  }
+
+  private static void printTags(Collection<MockNM> nodes,
+      AllocationTagsManager atm){
+    for (MockNM nm : nodes) {
+      Map<String, Long> nmTags = atm
+          .getAllocationTagsWithCount(nm.getNodeId());
+      StringBuffer sb = new StringBuffer();
+      if (nmTags != null) {
+        nmTags.forEach((tag, count) ->
+            sb.append(tag + "(" + count + "),"));
+        LOG.info("nm_" + nm.getNodeId() + ": " + sb.toString());
+      }
+    }
+  }
+
   private static void waitForContainerAllocation(Collection<MockNM> nodes,
-      MockAM am, List<Container> allocatedContainers, int containerNum)
-      throws Exception {
+      MockAM am, List<Container> allocatedContainers,
+      List<RejectedSchedulingRequest> rejectedRequests,
+      int containerNum) throws Exception {
     int attemptCount = 10;
     while (allocatedContainers.size() < containerNum && attemptCount > 0) {
       for (MockNM node : nodes) {
@@ -605,6 +747,7 @@ public class TestPlacementProcessor {
       sleep(1000);
       AllocateResponse allocResponse = am.schedule();
       allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      rejectedRequests.addAll(allocResponse.getRejectedSchedulingRequests());
       attemptCount--;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to