YARN-7972. Support inter-app placement constraints for allocation tags by 
application ID. (Weiwei Yang via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1054b48c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1054b48c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1054b48c

Branch: refs/heads/HDFS-7240
Commit: 1054b48c27f3158110bd0512afecded36eecb8ad
Parents: 2e1e049
Author: Arun Suresh <asur...@apache.org>
Authored: Mon Mar 5 11:24:17 2018 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Mon Mar 5 11:24:17 2018 -0800

----------------------------------------------------------------------
 .../api/records/AllocationTagNamespace.java     | 336 +++++++++++++++++++
 .../api/records/AllocationTagNamespaceType.java |  74 ++++
 .../hadoop/yarn/api/records/AllocationTags.java |  50 +++
 .../hadoop/yarn/api/records/Evaluable.java      |  38 +++
 .../yarn/api/records/TargetApplications.java    |  53 +++
 .../yarn/api/resource/PlacementConstraints.java |  26 +-
 .../InvalidAllocationTagException.java          |  34 ++
 .../constraint/AllocationTagsManager.java       |   9 +
 .../InvalidAllocationTagsQueryException.java    |   4 +
 .../constraint/PlacementConstraintsUtil.java    |  71 +++-
 .../SingleConstraintAppPlacementAllocator.java  |  31 +-
 .../constraint/TestAllocationTagsNamespace.java | 147 ++++++++
 .../TestPlacementConstraintsUtil.java           | 151 +++++++++
 13 files changed, 1001 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
new file mode 100644
index 0000000..25f8761
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
+import static 
org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString;
+
+/**
+ * Class to describe the namespace of an allocation tag.
+ * Each namespace can be evaluated against a set of applications.
+ * After evaluation, the namespace should have an implicit set of
+ * applications which defines its scope.
+ */
+public abstract class AllocationTagNamespace implements
+    Evaluable<TargetApplications> {
+
+  public final static String NAMESPACE_DELIMITER = "/";
+
+  private AllocationTagNamespaceType nsType;
+  // Namespace scope value will be delay binding by eval method.
+  private Set<ApplicationId> nsScope;
+
+  public AllocationTagNamespace(AllocationTagNamespaceType
+      allocationTagNamespaceType) {
+    this.nsType = allocationTagNamespaceType;
+  }
+
+  protected void setScopeIfNotNull(Set<ApplicationId> appIds) {
+    if (appIds != null) {
+      this.nsScope = appIds;
+    }
+  }
+
+  /**
+   * Get the type of the namespace.
+   * @return namespace type.
+   */
+  public AllocationTagNamespaceType getNamespaceType() {
+    return nsType;
+  }
+
+  /**
+   * Get the scope of the namespace, in form of a set of applications.
+   * Before calling this method, {@link #evaluate(TargetApplications)}
+   * must be called in prior to ensure the scope is proper evaluated.
+   *
+   * @return a set of applications.
+   */
+  public Set<ApplicationId> getNamespaceScope() {
+    if (this.nsScope == null) {
+      throw new IllegalStateException("Invalid namespace scope,"
+          + " it is not initialized. Evaluate must be called before"
+          + " a namespace can be consumed.");
+    }
+    return this.nsScope;
+  }
+
+  @Override
+  public abstract void evaluate(TargetApplications target)
+      throws InvalidAllocationTagException;
+
+  /**
+   * @return true if the namespace is effective in all applications
+   * in this cluster. Specifically the namespace prefix should be
+   * "all".
+   */
+  public boolean isGlobal() {
+    return AllocationTagNamespaceType.ALL.equals(getNamespaceType());
+  }
+
+  /**
+   * @return true if the namespace is effective within a single application
+   * by its application ID, the namespace prefix should be "app-id";
+   * false otherwise.
+   */
+  public boolean isSingleInterApp() {
+    return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType());
+  }
+
+  /**
+   * @return true if the namespace is effective to the application itself,
+   * the namespace prefix should be "self"; false otherwise.
+   */
+  public boolean isIntraApp() {
+    return AllocationTagNamespaceType.SELF.equals(getNamespaceType());
+  }
+
+  /**
+   * @return true if the namespace is effective to all applications except
+   * itself, the namespace prefix should be "not-self"; false otherwise.
+   */
+  public boolean isNotSelf() {
+    return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType());
+  }
+
+  /**
+   * @return true if the namespace is effective to a group of applications
+   * identified by a application label, the namespace prefix should be
+   * "app-label"; false otherwise.
+   */
+  public boolean isAppLabel() {
+    return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType());
+  }
+
+  @Override
+  public String toString() {
+    return this.nsType.toString();
+  }
+
+  /**
+   * Namespace within application itself.
+   */
+  public static class Self extends AllocationTagNamespace {
+
+    public Self() {
+      super(SELF);
+    }
+
+    @Override
+    public void evaluate(TargetApplications target)
+        throws InvalidAllocationTagException {
+      if (target == null || target.getCurrentApplicationId() == null) {
+        throw new InvalidAllocationTagException("Namespace Self must"
+            + " be evaluated against a single application ID.");
+      }
+      ApplicationId applicationId = target.getCurrentApplicationId();
+      setScopeIfNotNull(ImmutableSet.of(applicationId));
+    }
+  }
+
+  /**
+   * Namespace to all applications except itself.
+   */
+  public static class NotSelf extends AllocationTagNamespace {
+
+    private ApplicationId applicationId;
+
+    public NotSelf() {
+      super(NOT_SELF);
+    }
+
+    /**
+     * The scope of self namespace is to an application itself,
+     * the application ID can be delay binding to the namespace.
+     *
+     * @param appId application ID.
+     */
+    public void setApplicationId(ApplicationId appId) {
+      this.applicationId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return this.applicationId;
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      Set<ApplicationId> otherAppIds = target.getOtherApplicationIds();
+      setScopeIfNotNull(otherAppIds);
+    }
+  }
+
+  /**
+   * Namespace to all applications in the cluster.
+   */
+  public static class All extends AllocationTagNamespace {
+
+    public All() {
+      super(ALL);
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      Set<ApplicationId> allAppIds = target.getAllApplicationIds();
+      setScopeIfNotNull(allAppIds);
+    }
+  }
+
+  /**
+   * Namespace to all applications in the cluster.
+   */
+  public static class AppLabel extends AllocationTagNamespace {
+
+    public AppLabel() {
+      super(APP_LABEL);
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      // TODO Implement app-label namespace evaluation
+    }
+  }
+
+  /**
+   * Namespace defined by a certain application ID.
+   */
+  public static class AppID extends AllocationTagNamespace {
+
+    private ApplicationId targetAppId;
+    // app-id namespace requires an extra value of an application id.
+    public AppID(ApplicationId applicationId) {
+      super(APP_ID);
+      this.targetAppId = applicationId;
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      setScopeIfNotNull(ImmutableSet.of(targetAppId));
+    }
+
+    @Override
+    public String toString() {
+      return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId;
+    }
+  }
+
+  /**
+   * Parse namespace from a string. The string must be in legal format
+   * defined by each {@link AllocationTagNamespaceType}.
+   *
+   * @param namespaceStr namespace string.
+   * @return an instance of {@link AllocationTagNamespace}.
+   * @throws InvalidAllocationTagException
+   * if given string is not in valid format
+   */
+  public static AllocationTagNamespace parse(String namespaceStr)
+      throws InvalidAllocationTagException {
+    // Return the default namespace if no valid string is given.
+    if (Strings.isNullOrEmpty(namespaceStr)) {
+      return new Self();
+    }
+
+    // Normalize the input, escape additional chars.
+    List<String> nsValues = normalize(namespaceStr);
+    // The first string should be the prefix.
+    String nsPrefix = nsValues.get(0);
+    AllocationTagNamespaceType allocationTagNamespaceType =
+        fromString(nsPrefix);
+    switch (allocationTagNamespaceType) {
+    case SELF:
+      return new Self();
+    case NOT_SELF:
+      return new NotSelf();
+    case ALL:
+      return new All();
+    case APP_ID:
+      if (nsValues.size() != 2) {
+        throw new InvalidAllocationTagException(
+            "Missing the application ID in the namespace string: "
+                + namespaceStr);
+      }
+      String appIDStr = nsValues.get(1);
+      return parseAppID(appIDStr);
+    case APP_LABEL:
+      return new AppLabel();
+    default:
+      throw new InvalidAllocationTagException(
+          "Invalid namespace string " + namespaceStr);
+    }
+  }
+
+  private static AllocationTagNamespace parseAppID(String appIDStr)
+      throws InvalidAllocationTagException {
+    try {
+      ApplicationId applicationId = ApplicationId.fromString(appIDStr);
+      return new AppID(applicationId);
+    } catch (IllegalArgumentException e) {
+      throw new InvalidAllocationTagException(
+          "Invalid application ID for "
+              + APP_ID.getTypeKeyword() + ": " + appIDStr);
+    }
+  }
+
+  /**
+   * Valid given namespace string and parse it to a list of sub-strings
+   * that can be consumed by the parser according to the type of the
+   * namespace. Currently the size of return list should be either 1 or 2.
+   * Extra slash is escaped during the normalization.
+   *
+   * @param namespaceStr namespace string.
+   * @return a list of parsed strings.
+   * @throws InvalidAllocationTagException
+   * if namespace format is unexpected.
+   */
+  private static List<String> normalize(String namespaceStr)
+      throws InvalidAllocationTagException {
+    List<String> result = new ArrayList<>();
+    if (namespaceStr == null) {
+      return result;
+    }
+
+    String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
+    for (String str : nsValues) {
+      if (!Strings.isNullOrEmpty(str)) {
+        result.add(str);
+      }
+    }
+
+    // Currently we only allow 1 or 2 values for a namespace string
+    if (result.size() == 0 || result.size() > 2) {
+      throw new InvalidAllocationTagException("Invalid namespace string: "
+          + namespaceStr + ", the syntax is <namespace_prefix> or"
+          + " <namespace_prefix>/<namespace_value>");
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
new file mode 100644
index 0000000..5e46cd0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Class to describe all supported forms of namespaces for an allocation tag.
+ */
+public enum AllocationTagNamespaceType {
+
+  SELF("self"),
+  NOT_SELF("not-self"),
+  APP_ID("app-id"),
+  APP_LABEL("app-label"),
+  ALL("all");
+
+  private String typeKeyword;
+  AllocationTagNamespaceType(String keyword) {
+    this.typeKeyword = keyword;
+  }
+
+  public String getTypeKeyword() {
+    return this.typeKeyword;
+  }
+
+  /**
+   * Parses the namespace type from a given string.
+   * @param prefix namespace prefix.
+   * @return namespace type.
+   * @throws InvalidAllocationTagException
+   */
+  public static AllocationTagNamespaceType fromString(String prefix) throws
+      InvalidAllocationTagException {
+    for (AllocationTagNamespaceType type :
+        AllocationTagNamespaceType.values()) {
+      if(type.getTypeKeyword().equals(prefix)) {
+        return type;
+      }
+    }
+
+    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
+        .map(AllocationTagNamespaceType::toString)
+        .collect(Collectors.toSet());
+    throw new InvalidAllocationTagException(
+        "Invalid namespace prefix: " + prefix
+            + ", valid values are: " + String.join(",", values));
+  }
+
+  @Override
+  public String toString() {
+    return this.getTypeKeyword();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
new file mode 100644
index 0000000..50bffc3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+
+/**
+ * Allocation tags under same namespace.
+ */
+public class AllocationTags {
+
+  private AllocationTagNamespace ns;
+  private Set<String> tags;
+
+  public AllocationTags(AllocationTagNamespace namespace,
+      Set<String> allocationTags) {
+    this.ns = namespace;
+    this.tags = allocationTags;
+  }
+
+  /**
+   * @return the namespace of these tags.
+   */
+  public AllocationTagNamespace getNamespace() {
+    return this.ns;
+  }
+
+  /**
+   * @return the allocation tags.
+   */
+  public Set<String> getTags() {
+    return this.tags;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
new file mode 100644
index 0000000..7a74002
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A class implements Evaluable interface represents the internal state
+ * of the class can be changed against a given target.
+ * @param <T> a target to evaluate against
+ */
+public interface Evaluable<T> {
+
+  /**
+   * Evaluate against a given target, this process changes the internal state
+   * of current class.
+   *
+   * @param target a generic type target that impacts this evaluation.
+   * @throws YarnException
+   */
+  void evaluate(T target) throws YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
new file mode 100644
index 0000000..de0ea26
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used by
+ * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate
+ * a namespace.
+ */
+public class TargetApplications {
+
+  private ApplicationId currentAppId;
+  private Set<ApplicationId> allAppIds;
+
+  public TargetApplications(ApplicationId currentApplicationId,
+      Set<ApplicationId> allApplicationIds) {
+    this.currentAppId = currentApplicationId;
+    this.allAppIds = allApplicationIds;
+  }
+
+  public Set<ApplicationId> getAllApplicationIds() {
+    return this.allAppIds;
+  }
+
+  public ApplicationId getCurrentApplicationId() {
+    return this.currentAppId;
+  }
+
+  public Set<ApplicationId> getOtherApplicationIds() {
+    return allAppIds == null ? null : allAppIds.stream().filter(appId ->
+        !appId.equals(getCurrentApplicationId()))
+        .collect(Collectors.toSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index c1549c5..af70e2a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.yarn.api.resource;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -50,13 +50,6 @@ public final class PlacementConstraints {
   public static final String RACK = PlacementConstraint.RACK_SCOPE;
   public static final String NODE_PARTITION = "yarn_node_partition/";
 
-  private static final String APPLICATION_LABEL_PREFIX =
-      "yarn_application_label/";
-
-  @InterfaceAudience.Private
-  public static final String APPLICATION_LABEL_INTRA_APPLICATION =
-      APPLICATION_LABEL_PREFIX + "%intra_app%";
-
   /**
    * Creates a constraint that requires allocations to be placed on nodes that
    * satisfy all target expressions within the given scope (e.g., node or 
rack).
@@ -224,6 +217,20 @@ public final class PlacementConstraints {
     }
 
     /**
+     * Constructs a target expression on a set of allocation tags under
+     * a certain namespace.
+     *
+     * @param namespace namespace of the allocation tags
+     * @param allocationTags allocation tags
+     * @return a target expression
+     */
+    public static TargetExpression allocationTagWithNamespace(String namespace,
+        String... allocationTags) {
+      return new TargetExpression(TargetType.ALLOCATION_TAG,
+          namespace, allocationTags);
+    }
+
+    /**
      * Constructs a target expression on an allocation tag. It is satisfied if
      * there are allocations with one of the given tags. Comparing to
      * {@link PlacementTargets#allocationTag(String...)}, this only checks tags
@@ -235,8 +242,9 @@ public final class PlacementConstraints {
      */
     public static TargetExpression allocationTagToIntraApp(
         String... allocationTags) {
+      AllocationTagNamespace selfNs = new AllocationTagNamespace.Self();
       return new TargetExpression(TargetType.ALLOCATION_TAG,
-          APPLICATION_LABEL_INTRA_APPLICATION, allocationTags);
+          selfNs.toString(), allocationTags);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
new file mode 100644
index 0000000..be8d881
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+/**
+ * This exception is thrown by
+ * {@link
+ * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)}
+ * when it fails to parse a namespace.
+ */
+public class InvalidAllocationTagException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InvalidAllocationTagException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 8ef9999..fb2619a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -21,6 +21,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -561,4 +562,12 @@ public class AllocationTagsManager {
   public Map<String, Long> getAllocationTagsWithCount(NodeId nodeId) {
     return globalNodeMapping.getTypeToTagsWithCount().get(nodeId);
   }
+
+  /**
+   * @return all application IDs in a set that currently visible by
+   * the allocation tags manager.
+   */
+  public Set<ApplicationId> getAllApplicationIds() {
+    return ImmutableSet.copyOf(perAppNodeMappings.keySet());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
index 29483a2..d2bc4d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
@@ -32,4 +32,8 @@ public class InvalidAllocationTagsQueryException extends 
YarnException {
   public InvalidAllocationTagsQueryException(String msg) {
     super(msg);
   }
+
+  public InvalidAllocationTagsQueryException(YarnException e) {
+    super(e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index ab0bbd7..2d0e95a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -24,8 +24,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.TargetApplications;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@@ -35,6 +38,7 @@ import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
 import 
org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
@@ -57,6 +61,53 @@ public final class PlacementConstraintsUtil {
   }
 
   /**
+   * Try to the namespace of the allocation tags from the given target key.
+   *
+   * @param targetKey
+   * @return allocation tag namespace.
+   * @throws InvalidAllocationTagsQueryException
+   * if fail to parse the target key to a valid namespace.
+   */
+  private static AllocationTagNamespace getAllocationTagNamespace(
+      ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
+      throws InvalidAllocationTagException{
+    // Parse to a valid namespace.
+    AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
+
+    // TODO remove such check once we support all forms of namespaces
+    if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) {
+      throw new InvalidAllocationTagException(
+          "Only support " + AllocationTagNamespaceType.SELF.toString()
+              + " and "+ AllocationTagNamespaceType.APP_ID + " now,"
+              + namespace.toString() + " is not supported yet!");
+    }
+
+    // Evaluate the namespace according to the given target
+    // before it can be consumed.
+    TargetApplications ta = new TargetApplications(currentAppId,
+        atm.getAllApplicationIds());
+    namespace.evaluate(ta);
+    return namespace;
+  }
+
+  // We return a single app Id now, because at present,
+  // only self and app-id namespace is supported. But moving on,
+  // this will return a set of application IDs.
+  // TODO support other forms of namespaces
+  private static ApplicationId getNamespaceScope(
+      AllocationTagNamespace namespace)
+      throws InvalidAllocationTagException {
+    if (namespace.getNamespaceScope() == null
+        || namespace.getNamespaceScope().size() != 1) {
+      throw new InvalidAllocationTagException(
+          "Invalid allocation tag namespace " + namespace.toString()
+              + ", expecting it is not null and only 1 application"
+              + " ID in the scope.");
+    }
+    return namespace.getNamespaceScope().iterator().next();
+  }
+
+  /**
    * Returns true if <b>single</b> placement constraint with associated
    * allocationTags and scope is satisfied by a specific scheduler Node.
    *
@@ -74,6 +125,18 @@ public final class PlacementConstraintsUtil {
       ApplicationId targetApplicationId, SingleConstraint sc,
       TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
       throws InvalidAllocationTagsQueryException {
+    // Parse the allocation tag's namespace from the given target key,
+    // then evaluate the namespace and get its scope,
+    // which is represented by one or more application IDs.
+    ApplicationId effectiveAppID;
+    try {
+      AllocationTagNamespace namespace = getAllocationTagNamespace(
+          targetApplicationId, te.getTargetKey(), tm);
+      effectiveAppID = getNamespaceScope(namespace);
+    } catch (InvalidAllocationTagException e) {
+      throw new InvalidAllocationTagsQueryException(e);
+    }
+
     long minScopeCardinality = 0;
     long maxScopeCardinality = 0;
 
@@ -86,20 +149,20 @@ public final class PlacementConstraintsUtil {
     if (sc.getScope().equals(PlacementConstraints.NODE)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            targetApplicationId, te.getTargetValues(), Long::max);
+            effectiveAppID, te.getTargetValues(), Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            targetApplicationId, te.getTargetValues(), Long::min);
+            effectiveAppID, te.getTargetValues(), Long::min);
       }
     } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            targetApplicationId, te.getTargetValues(), Long::max);
+            effectiveAppID, te.getTargetValues(), Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            targetApplicationId, te.getTargetValues(), Long::min);
+            effectiveAppID, te.getTargetValues(), Long::min);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
index ed07345..7e5506e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -23,6 +23,8 @@ import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import 
org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -53,7 +56,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
 
 /**
@@ -220,7 +222,8 @@ public class SingleConstraintAppPlacementAllocator<N 
extends SchedulerNode>
     throw new SchedulerInvalidResoureRequestException(sb.toString());
   }
 
-  private void validateAndSetSchedulingRequest(SchedulingRequest 
newSchedulingRequest)
+  private void validateAndSetSchedulingRequest(SchedulingRequest
+      newSchedulingRequest)
       throws SchedulerInvalidResoureRequestException {
     // Check sizing exists
     if (newSchedulingRequest.getResourceSizing() == null
@@ -333,15 +336,23 @@ public class SingleConstraintAppPlacementAllocator<N 
extends SchedulerNode>
         targetAllocationTags = new HashSet<>(
             targetExpression.getTargetValues());
 
-        if (targetExpression.getTargetKey() != null && !targetExpression
-            .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) {
+        try {
+          AllocationTagNamespace tagNS =
+              AllocationTagNamespace.parse(targetExpression.getTargetKey());
+          if (!AllocationTagNamespaceType.SELF
+              .equals(tagNS.getNamespaceType())) {
+            throwExceptionWithMetaInfo(
+                "As of now, the only accepted target key for targetKey of "
+                    + "allocation_tag target expression is: ["
+                    + AllocationTagNamespaceType.SELF.toString()
+                    + "]. Please make changes to placement constraints "
+                    + "accordingly. If this is null, it will be set to "
+                    + AllocationTagNamespaceType.SELF.toString()
+                    + " by default.");
+          }
+        } catch (InvalidAllocationTagException e) {
           throwExceptionWithMetaInfo(
-              "As of now, the only accepted target key for targetKey of "
-                  + "allocation_tag target expression is: ["
-                  + APPLICATION_LABEL_INTRA_APPLICATION
-                  + "]. Please make changes to placement constraints "
-                  + "accordingly. If this is null, it will be set to "
-                  + APPLICATION_LABEL_INTRA_APPLICATION + " by default.");
+              "Invalid allocation tag namespace, message: " + e.getMessage());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java
new file mode 100644
index 0000000..67a3901
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.TargetApplications;
+import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for {@link AllocationTagNamespace}.
+ */
+public class TestAllocationTagsNamespace {
+
+  @Test
+  public void testNamespaceParse() throws InvalidAllocationTagException {
+    AllocationTagNamespace namespace;
+
+    String namespaceStr = "self";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    Assert.assertTrue(namespace.isIntraApp());
+
+    namespaceStr = "not-self";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    Assert.assertTrue(namespace.isNotSelf());
+
+    namespaceStr = "all";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    Assert.assertTrue(namespace.isGlobal());
+
+    namespaceStr = "app-label";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    Assert.assertTrue(namespace.isAppLabel());
+
+    ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
+    namespaceStr = "app-id/" + applicationId.toString();
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    Assert.assertTrue(namespace.isSingleInterApp());
+
+    // Invalid app-id namespace syntax, invalid app ID.
+    try {
+      namespaceStr = "app-id/apppppp_12345_99999";
+      AllocationTagNamespace.parse(namespaceStr);
+      Assert.fail("Parsing should fail as the given app ID is invalid");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Invalid application ID for app-id"));
+    }
+
+    // Invalid app-id namespace syntax, missing app ID.
+    try {
+      namespaceStr = "app-id";
+      AllocationTagNamespace.parse(namespaceStr);
+      Assert.fail("Parsing should fail as the given namespace"
+          + " is missing application ID");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Missing the application ID in the namespace string"));
+    }
+
+    // Invalid namespace type.
+    try {
+      namespaceStr = "non_exist_ns";
+      AllocationTagNamespace.parse(namespaceStr);
+      Assert.fail("Parsing should fail as the giving type is not supported.");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Invalid namespace prefix"));
+    }
+  }
+
+  @Test
+  public void testNamespaceEvaluation() throws InvalidAllocationTagException {
+    AllocationTagNamespace namespace;
+    TargetApplications targetApplications;
+    ApplicationId app1 = ApplicationId.newInstance(10000, 1);
+    ApplicationId app2 = ApplicationId.newInstance(10000, 2);
+    ApplicationId app3 = ApplicationId.newInstance(10000, 3);
+    ApplicationId app4 = ApplicationId.newInstance(10000, 4);
+    ApplicationId app5 = ApplicationId.newInstance(10000, 5);
+
+    // Ensure eval is called before using the scope.
+    String namespaceStr = "self";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    try {
+      namespace.getNamespaceScope();
+      Assert.fail("Call getNamespaceScope before evaluate is not allowed.");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalStateException);
+      Assert.assertTrue(e.getMessage().contains(
+          "Evaluate must be called before a namespace can be consumed."));
+    }
+
+    namespaceStr = "self";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
+    namespace.evaluate(targetApplications);
+    Assert.assertEquals(1, namespace.getNamespaceScope().size());
+    Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next());
+
+    namespaceStr = "not-self";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
+    namespace.evaluate(targetApplications);
+    Assert.assertEquals(0, namespace.getNamespaceScope().size());
+
+    targetApplications = new TargetApplications(app1,
+        ImmutableSet.of(app1, app2, app3));
+    namespace.evaluate(targetApplications);
+    Assert.assertEquals(2, namespace.getNamespaceScope().size());
+    Assert.assertFalse(namespace.getNamespaceScope().contains(app1));
+
+    namespaceStr = "all";
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    targetApplications = new TargetApplications(null,
+        ImmutableSet.of(app1, app2));
+    namespace.evaluate(targetApplications);
+    Assert.assertEquals(2, namespace.getNamespaceScope().size());
+
+    namespaceStr = "app-id/" + app2.toString();
+    namespace = AllocationTagNamespace.parse(namespaceStr);
+    targetApplications = new TargetApplications(app1,
+        ImmutableSet.of(app1, app2, app3, app4, app5));
+    namespace.evaluate(targetApplications);
+    Assert.assertEquals(1, namespace.getNamespaceScope().size());
+    Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1054b48c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index 5135f63..5ba8948 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
 import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
@@ -30,6 +31,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -39,6 +41,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -508,4 +511,152 @@ public class TestPlacementConstraintsUtil {
     Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
         createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
   }
+
+  @Test
+  public void testInterAppConstraintsByAppID()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    rmContext.setAllocationTagsManager(tm);
+    rmContext.setPlacementConstraintManager(pcm);
+
+    long ts = System.currentTimeMillis();
+    ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
+
+    // Register App1 with anti-affinity constraint map.
+    RMNode n0r1 = rmNodes.get(0);
+    RMNode n1r1 = rmNodes.get(1);
+    RMNode n2r2 = rmNodes.get(2);
+    RMNode n3r2 = rmNodes.get(3);
+
+    /**
+     * Place container:
+     *  n0: app1/hbase-m(1)
+     *  n1: ""
+     *  n2: app1/hbase-m(1)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("hbase-m"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("hbase-m"));
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+        .get("hbase-m").longValue());
+    Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+        .get("hbase-m").longValue());
+
+    SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+    SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+        n1r1.getRackName(), n1r1.getNodeID());
+    SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+        n2r2.getRackName(), n2r2.getNodeID());
+    SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+        n3r2.getRackName(), n3r2.getNodeID());
+
+    AllocationTagNamespace namespace =
+        new AllocationTagNamespace.AppID(application1);
+    Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
+    PlacementConstraint constraint2 = PlacementConstraints
+        .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
+            "hbase-m"))
+        .build();
+    Set<String> srcTags2 = new HashSet<>();
+    srcTags2.add("app2");
+    constraintMap.put(srcTags2, constraint2);
+
+    ts = System.currentTimeMillis();
+    ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124);
+    pcm.registerApplication(application2, constraintMap);
+
+    // Anti-affinity with app1/hbase-m so it should not be able to be placed
+    // onto n0 and n2 as they already have hbase-m allocated.
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode3, pcm, tm));
+
+    // Intra-app constraint
+    // Test with default and empty namespace
+    AllocationTagNamespace self = new AllocationTagNamespace.Self();
+    PlacementConstraint constraint3 = PlacementConstraints
+        .targetNotIn(NODE, allocationTagWithNamespace(self.toString(),
+            "hbase-m"))
+        .build();
+    Set<String> srcTags3 = new HashSet<>();
+    srcTags3.add("app3");
+    constraintMap.put(srcTags3, constraint3);
+
+    ts = System.currentTimeMillis();
+    ApplicationId application3 = BuilderUtils.newApplicationId(ts, 124);
+    pcm.registerApplication(application3, constraintMap);
+
+    /**
+     * Place container:
+     *  n0: app1/hbase-m(1), app3/hbase-m
+     *  n1: ""
+     *  n2: app1/hbase-m(1)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("hbase-m"));
+
+    // Anti-affinity to self/hbase-m
+    Assert.assertFalse(PlacementConstraintsUtil
+        .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
+            schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil
+        .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
+            schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil
+        .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
+            schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil
+        .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
+            schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application3);
+  }
+
+  @Test
+  public void testInvalidAllocationTagNamespace() {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    rmContext.setAllocationTagsManager(tm);
+    rmContext.setPlacementConstraintManager(pcm);
+
+    long ts = System.currentTimeMillis();
+    ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
+    RMNode n0r1 = rmNodes.get(0);
+    SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+
+    PlacementConstraint constraint1 = PlacementConstraints
+        .targetNotIn(NODE, allocationTagWithNamespace("unknown_namespace",
+            "hbase-m"))
+        .build();
+    Set<String> srcTags1 = new HashSet<>();
+    srcTags1.add("app1");
+
+    try {
+      PlacementConstraintsUtil.canSatisfyConstraints(application1,
+          createSchedulingRequest(srcTags1, constraint1), schedulerNode0,
+          pcm, tm);
+      Assert.fail("This should fail because we gave an invalid namespace");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
+      Assert.assertTrue(e.getMessage()
+          .contains("Invalid namespace prefix: unknown_namespace"));
+    }
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to