YARN-7142. Support placement policy in yarn native services. (Gour Saha via 
wangda)

Change-Id: I166c67a7a34430627c17365f60bac75b6da1b434
(cherry picked from commit a0bde7d525911680f9e5fb0a939604865eb8e164)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b886836
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b886836
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b886836

Branch: refs/heads/branch-3.1
Commit: 0b88683627dbe2de56a81f119f314afc1e408517
Parents: eb1026d
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Apr 2 07:26:01 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Sat Apr 14 11:01:59 2018 -0700

----------------------------------------------------------------------
 .../yarn/api/resource/PlacementConstraint.java  |   5 +
 ...RN-Simplified-V1-API-Layer-For-Services.yaml |  91 +++++-
 .../hadoop/yarn/service/ServiceScheduler.java   |  28 +-
 .../yarn/service/UpgradeComponentsFinder.java   |   5 -
 .../yarn/service/api/records/Component.java     |   7 +-
 .../yarn/service/api/records/ConfigFile.java    |   2 +-
 .../api/records/PlacementConstraint.java        | 283 +++++++++++++++++++
 .../service/api/records/PlacementPolicy.java    |  52 ++--
 .../service/api/records/PlacementScope.java     |  56 ++++
 .../yarn/service/api/records/PlacementType.java |  38 +++
 .../yarn/service/api/records/Resource.java      |   3 +-
 .../yarn/service/api/records/Service.java       |  27 --
 .../yarn/service/api/records/ServiceStatus.java |   4 +-
 .../yarn/service/component/Component.java       | 129 +++++++--
 .../exceptions/RestApiErrorMessages.java        |  12 +
 .../ServiceTimelinePublisher.java               |   4 -
 .../yarn/service/utils/ServiceApiUtil.java      |  28 +-
 .../hadoop/yarn/service/TestServiceApiUtil.java |  35 +++
 .../yarn/service/TestYarnNativeServices.java    | 129 ++++++++-
 .../TestServiceTimelinePublisher.java           |  10 +-
 .../impl/pb/SchedulingRequestPBImpl.java        |   1 +
 .../markdown/yarn-service/YarnServiceAPI.md     | 268 +++++++++++++++---
 22 files changed, 1071 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
index 9bb17f4..0fe8273 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
@@ -130,6 +130,11 @@ public class PlacementConstraint {
     public PlacementConstraint build() {
       return new PlacementConstraint(this);
     }
+
+    @Override
+    public String toString() {
+      return super.toString();
+    }
   }
 
   static final String NODE_SCOPE = "node";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
index 17723bc..45b1bc7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
@@ -229,9 +229,6 @@ definitions:
         type: integer
         format: int64
         description: Life time (in seconds) of the service from the time it 
reaches the STARTED state (after which it is automatically destroyed by YARN). 
For unlimited lifetime do not set a lifetime value.
-      placement_policy:
-        description: (TBD) Advanced scheduling and placement policies. If not 
specified, it defaults to the default placement policy of the service owner. 
The design of placement policies are in the works. It is not very clear at this 
point, how policies in conjunction with labels be exposed to service owners. 
This is a placeholder for now. The advanced structure of this attribute will be 
determined by YARN-4902.
-        $ref: '#/definitions/PlacementPolicy'
       components:
         description: Components of a service.
         type: array
@@ -256,7 +253,7 @@ definitions:
         $ref: '#/definitions/KerberosPrincipal'
   ResourceInformation:
     description:
-      ResourceInformation determines unit/value of resource types in addition 
to memory and vcores. It will be part of Resource object
+      ResourceInformation determines unit/value of resource types in addition 
to memory and vcores. It will be part of Resource object.
     properties:
       value:
         type: integer
@@ -264,8 +261,7 @@ definitions:
         description: Integer value of the resource.
       unit:
         type: string
-        description:
-          Unit of the resource, acceptable values are: 
p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit
+        description: Unit of the resource, acceptable values are - 
p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit.
   Resource:
     description:
       Resource determines the amount of resources (vcores, memory, network, 
etc.) usable by a container. This field determines the resource to be applied 
for all the containers of a component or service. The resource specified at the 
service (or global) level can be overriden at the component level. Only one of 
profile OR cpu & memory are expected. It raises a validation exception 
otherwise.
@@ -286,11 +282,75 @@ definitions:
           $ref: '#/definitions/ResourceInformation'
         description: Map of resource name to ResourceInformation
   PlacementPolicy:
-    description: Placement policy of an instance of a service. This feature is 
in the works in YARN-6592.
+    description: Advanced placement policy of the components of a service.
+    required:
+      - constraints
     properties:
-      label:
+      constraints:
+        description: Placement constraint details.
+        type: array
+        items:
+          $ref: '#/definitions/PlacementConstraint'
+  PlacementConstraint:
+    description: Placement constraint details.
+    required:
+      - type
+      - scope
+    properties:
+      name:
+        description: An optional name associated to this constraint.
         type: string
-        description: Assigns a service to a named partition of the cluster 
where the service desires to run (optional). If not specified all services are 
submitted to a default label of the service owner. One or more labels can be 
setup for each service owner account with required constraints like 
no-preemption, sla-99999, preemption-ok, etc.
+        example: C1
+      type:
+        description: The type of placement.
+        $ref: '#/definitions/PlacementType'
+      scope:
+        description: The scope of placement.
+        $ref: '#/definitions/PlacementScope'
+      target_tags:
+        description: The name of the components that this component's 
placement policy is depending upon are added as target tags. So for affinity 
say, this component's containers are requesting to be placed on hosts where 
containers of the target tag component(s) are running on. Target tags can also 
contain the name of this component, in which case it implies that for 
anti-affinity say, no more than one container of this component can be placed 
on a host. Similarly, for cardinality, it would mean that containers of this 
component is requesting to be placed on hosts where at least minCardinality but 
no more than maxCardinality containers of the target tag component(s) are 
running.
+        type: array
+        items:
+          type: string
+      node_attributes:
+        description: Node attributes are a set of key:value(s) pairs 
associated with nodes.
+        type: object
+        additionalProperties:
+          type: array
+          items:
+            type: string
+      node_partitions:
+        description: Node partitions where the containers of this component 
can run.
+        type: array
+        items:
+          type: string
+      min_cardinality:
+        type: integer
+        format: int64
+        description: When placement type is cardinality, the minimum number of 
containers of the depending component that a host should have, where containers 
of this component can be allocated on.
+        example: 2
+      max_cardinality:
+        type: integer
+        format: int64
+        description: When placement type is cardinality, the maximum number of 
containers of the depending component that a host should have, where containers 
of this component can be allocated on.
+        example: 3
+  PlacementType:
+    description: The type of placement - 
affinity/anti-affinity/affinity-with-cardinality with containers of another 
component or containers of the same component (self).
+    properties:
+      type:
+        type: string
+        enum:
+          - AFFINITY
+          - ANTI_AFFINITY
+          - AFFINITY_WITH_CARDINALITY
+  PlacementScope:
+    description: The scope of placement for the containers of a component.
+    properties:
+      type:
+        type: string
+        enum:
+          - NODE
+          - RACK
   Artifact:
     description: Artifact of a service component. If not specified, component 
will just run the bare launch command and no artifact will be localized.
     required:
@@ -342,11 +402,16 @@ definitions:
         type: integer
         format: int64
         description: Number of containers for this component (optional). If 
not specified, the service level global number_of_containers takes effect.
+      containers:
+        type: array
+        description: Containers of a started component. Specifying a value for 
this attribute for the POST payload raises a validation error. This blob is 
available only in the GET response of a started service.
+        items:
+          $ref: '#/definitions/Container'
       run_privileged_container:
         type: boolean
         description: Run all containers of this component in privileged mode 
(YARN-4262).
       placement_policy:
-        description: Advanced scheduling and placement policies for all 
containers of this component (optional). If not specified, the service level 
placement_policy takes effect. Refer to the description at the global level for 
more details.
+        description: Advanced scheduling and placement policies for all 
containers of this component.
         $ref: '#/definitions/PlacementPolicy'
       configuration:
         description: Config properties for this component.
@@ -380,7 +445,7 @@ definitions:
     properties:
       properties:
         type: object
-        description: A blob of key-value pairs for configuring the YARN 
service AM
+        description: A blob of key-value pairs for configuring the YARN 
service AM.
         additionalProperties:
           type: string
       env:
@@ -405,7 +470,6 @@ definitions:
           - JSON
           - YAML
           - TEMPLATE
-          - ENV
           - HADOOP_XML
       dest_file:
         type: string
@@ -416,6 +480,8 @@ definitions:
       properties:
         type: object
         description: A blob of key value pairs that will be dumped in the 
dest_file in the format as specified in type. If src_file is specified, 
src_file content are dumped in the dest_file and these properties will 
overwrite, if any, existing properties in src_file or be added as new 
properties in src_file.
+        additionalProperties:
+          type: string
   Container:
     description: An instance of a running service container.
     properties:
@@ -464,6 +530,7 @@ definitions:
           - STABLE
           - STOPPED
           - FAILED
+          - FLEX
   ContainerState:
     description: The current state of the container of a service.
     properties:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 79eef49..0fcca16 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -149,6 +150,12 @@ public class ServiceScheduler extends CompositeService {
       new ConcurrentHashMap<>();
   private long containerRecoveryTimeout;
 
+  // If even one component of a service uses placement constraints, then use
+  // placement scheduler to schedule containers for all components (including
+  // the ones with no constraints). Mixing of container requests and scheduling
+  // requests for a single service is not recommended.
+  private boolean hasAtLeastOnePlacementConstraint;
+
   public ServiceScheduler(ServiceContext context) {
     super(context.service.getName());
     this.context = context;
@@ -286,6 +293,9 @@ public class ServiceScheduler extends CompositeService {
   public void serviceStart() throws Exception {
     super.serviceStart();
     InetSocketAddress bindAddress = context.clientAMService.getBindAddress();
+    // When yarn.resourcemanager.placement-constraints.handler is set to
+    // placement-processor then constraints need to be added during
+    // registerApplicationMaster.
     RegisterApplicationMasterResponse response = amRMClient
         .registerApplicationMaster(bindAddress.getHostName(),
             bindAddress.getPort(), "N/A");
@@ -512,6 +522,12 @@ public class ServiceScheduler extends CompositeService {
       componentsById.put(allocateId, component);
       componentsByName.put(component.getName(), component);
       allocateId++;
+      if (!hasAtLeastOnePlacementConstraint
+          && compSpec.getPlacementPolicy() != null
+          && compSpec.getPlacementPolicy().getConstraints() != null
+          && !compSpec.getPlacementPolicy().getConstraints().isEmpty()) {
+        hasAtLeastOnePlacementConstraint = true;
+      }
     }
   }
 
@@ -681,8 +697,14 @@ public class ServiceScheduler extends CompositeService {
     @Override public void onError(Throwable e) {
       LOG.error("Error in AMRMClient callback handler ", e);
     }
-  }
 
+    @Override
+    public void onRequestsRejected(
+        List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
+      LOG.error("Error in AMRMClient callback handler. Following scheduling "
+          + "requests were rejected: {}", rejectedSchedulingRequests);
+    }
+  }
 
   private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler 
{
 
@@ -810,4 +832,8 @@ public class ServiceScheduler extends CompositeService {
   public BoundedAppender getDiagnostics() {
     return diagnostics;
   }
+
+  public boolean hasAtLeastOnePlacementConstraint() {
+    return hasAtLeastOnePlacementConstraint;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
index e18697b..d6663ec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
@@ -56,11 +56,6 @@ public interface UpgradeComponentsFinder {
         throw new UnsupportedOperationException("changes to queue " +
             "not supported by upgrade");
       }
-      if (!Objects.equals(currentDef.getPlacementPolicy(),
-          targetDef.getPlacementPolicy())) {
-        throw new UnsupportedOperationException("changes to placement policy " 
+
-            "not supported by upgrade");
-      }
 
       if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) {
         throw new UnsupportedOperationException("changes to resource " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
index ce0e0cf..667b1aa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
@@ -269,16 +269,15 @@ public class Component implements Serializable {
 
   /**
    * Advanced scheduling and placement policies for all containers of this
-   * component (optional). If not specified, the service level placement_policy
-   * takes effect. Refer to the description at the global level for more
-   * details.
+   * component.
    **/
   public Component placementPolicy(PlacementPolicy placementPolicy) {
     this.placementPolicy = placementPolicy;
     return this;
   }
 
-  @ApiModelProperty(example = "null", value = "Advanced scheduling and 
placement policies for all containers of this component (optional). If not 
specified, the service level placement_policy takes effect. Refer to the 
description at the global level for more details.")
+  @ApiModelProperty(example = "null", value = "Advanced scheduling and "
+      + "placement policies for all containers of this component.")
   public PlacementPolicy getPlacementPolicy() {
     return placementPolicy;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFile.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFile.java
index 984e6f7..d3b18bc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFile.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFile.java
@@ -55,7 +55,7 @@ public class ConfigFile implements Serializable {
   @XmlEnum
   public enum TypeEnum {
     XML("XML"), PROPERTIES("PROPERTIES"), JSON("JSON"), YAML("YAML"), TEMPLATE(
-        "TEMPLATE"), HADOOP_XML("HADOOP_XML"),;
+        "TEMPLATE"), HADOOP_XML("HADOOP_XML");
 
     private String value;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
new file mode 100644
index 0000000..5eaf5e8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * Placement constraint details.
+ **/
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "Placement constraint details.")
+@javax.annotation.Generated(
+    value = "class io.swagger.codegen.languages.JavaClientCodegen",
+    date = "2018-02-16T10:20:12.927-07:00")
+public class PlacementConstraint implements Serializable {
+  private static final long serialVersionUID = 1518017165676511762L;
+
+  private String name = null;
+  private PlacementType type = null;
+  private PlacementScope scope = null;
+  @JsonProperty("target_tags")
+  @XmlElement(name = "target_tags")
+  private List<String> targetTags = new ArrayList<>();
+  @JsonProperty("node_attributes")
+  @XmlElement(name = "node_attributes")
+  private Map<String, List<String>> nodeAttributes = new HashMap<>();
+  @JsonProperty("node_partitions")
+  @XmlElement(name = "node_partitions")
+  private List<String> nodePartitions = new ArrayList<>();
+  @JsonProperty("min_cardinality")
+  @XmlElement(name = "min_cardinality")
+  private Long minCardinality = null;
+  @JsonProperty("max_cardinality")
+  @XmlElement(name = "max_cardinality")
+  private Long maxCardinality = null;
+
+  /**
+   * An optional name associated to this constraint.
+   **/
+  public PlacementConstraint name(String name) {
+    this.name = name;
+    return this;
+  }
+
+  @ApiModelProperty(example = "C1", required = true)
+  @JsonProperty("name")
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * The type of placement.
+   **/
+  public PlacementConstraint type(PlacementType type) {
+    this.type = type;
+    return this;
+  }
+
+  @ApiModelProperty(example = "null", required = true)
+  @JsonProperty("type")
+  public PlacementType getType() {
+    return type;
+  }
+
+  public void setType(PlacementType type) {
+    this.type = type;
+  }
+
+  /**
+   * The scope of placement.
+   **/
+  public PlacementConstraint scope(PlacementScope scope) {
+    this.scope = scope;
+    return this;
+  }
+
+  @ApiModelProperty(example = "null", required = true)
+  @JsonProperty("scope")
+  public PlacementScope getScope() {
+    return scope;
+  }
+
+  public void setScope(PlacementScope scope) {
+    this.scope = scope;
+  }
+
+  /**
+   * The name of the components that this component's placement policy is
+   * depending upon are added as target tags. So for affinity say, this
+   * component's containers are requesting to be placed on hosts where
+   * containers of the target tag component(s) are running on. Target tags can
+   * also contain the name of this component, in which case it implies that for
+   * anti-affinity say, no more than one container of this component can be
+   * placed on a host. Similarly, for cardinality, it would mean that 
containers
+   * of this component is requesting to be placed on hosts where at least
+   * minCardinality but no more than maxCardinality containers of the target 
tag
+   * component(s) are running.
+   **/
+  public PlacementConstraint targetTags(List<String> targetTags) {
+    this.targetTags = targetTags;
+    return this;
+  }
+
+  @ApiModelProperty(example = "[\"hbase-regionserver\"]")
+  public List<String> getTargetTags() {
+    return targetTags;
+  }
+
+  public void setTargetTags(List<String> targetTags) {
+    this.targetTags = targetTags;
+  }
+
+  /**
+   * Node attributes are a set of key:value(s) pairs associated with nodes.
+   */
+  public PlacementConstraint nodeAttributes(
+      Map<String, List<String>> nodeAttributes) {
+    this.nodeAttributes = nodeAttributes;
+    return this;
+  }
+
+  @ApiModelProperty(example = "\"JavaVersion\":[\"1.7\", \"1.8\"]")
+  public Map<String, List<String>> getNodeAttributes() {
+    return nodeAttributes;
+  }
+
+  public void setNodeAttributes(Map<String, List<String>> nodeAttributes) {
+    this.nodeAttributes = nodeAttributes;
+  }
+
+  /**
+   * Node partitions where the containers of this component can run.
+   */
+  public PlacementConstraint nodePartitions(
+      List<String> nodePartitions) {
+    this.nodePartitions = nodePartitions;
+    return this;
+  }
+
+  @ApiModelProperty(example = "[\"gpu\", \"fast_disk\"]")
+  public List<String> getNodePartitions() {
+    return nodePartitions;
+  }
+
+  public void setNodePartitions(List<String> nodePartitions) {
+    this.nodePartitions = nodePartitions;
+  }
+
+  /**
+   * When placement type is cardinality, the minimum number of containers of 
the
+   * depending component that a host should have, where containers of this
+   * component can be allocated on.
+   **/
+  public PlacementConstraint minCardinality(Long minCardinality) {
+    this.minCardinality = minCardinality;
+    return this;
+  }
+
+  @ApiModelProperty(example = "2")
+  public Long getMinCardinality() {
+    return minCardinality;
+  }
+
+  public void setMinCardinality(Long minCardinality) {
+    this.minCardinality = minCardinality;
+  }
+
+  /**
+   * When placement type is cardinality, the maximum number of containers of 
the
+   * depending component that a host should have, where containers of this
+   * component can be allocated on.
+   **/
+  public PlacementConstraint maxCardinality(Long maxCardinality) {
+    this.maxCardinality = maxCardinality;
+    return this;
+  }
+
+  @ApiModelProperty(example = "3")
+  public Long getMaxCardinality() {
+    return maxCardinality;
+  }
+
+  public void setMaxCardinality(Long maxCardinality) {
+    this.maxCardinality = maxCardinality;
+  }
+
+  @Override
+  public boolean equals(java.lang.Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PlacementConstraint placementConstraint = (PlacementConstraint) o;
+    return Objects.equals(this.name, placementConstraint.name)
+        && Objects.equals(this.type, placementConstraint.type)
+        && Objects.equals(this.scope, placementConstraint.scope)
+        && Objects.equals(this.targetTags, placementConstraint.targetTags)
+        && Objects.equals(this.nodeAttributes,
+            placementConstraint.nodeAttributes)
+        && Objects.equals(this.nodePartitions,
+            placementConstraint.nodePartitions)
+        && Objects.equals(this.minCardinality,
+            placementConstraint.minCardinality)
+        && Objects.equals(this.maxCardinality,
+            placementConstraint.maxCardinality);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, type, scope, targetTags, nodeAttributes,
+        nodePartitions, minCardinality, maxCardinality);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class PlacementConstraint {\n");
+
+    sb.append("    name: ").append(toIndentedString(name)).append("\n");
+    sb.append("    type: ").append(toIndentedString(type)).append("\n");
+    sb.append("    scope: ").append(toIndentedString(scope)).append("\n");
+    sb.append("    targetTags: ").append(toIndentedString(targetTags))
+        .append("\n");
+    sb.append("    nodeAttributes: ").append(toIndentedString(nodeAttributes))
+        .append("\n");
+    sb.append("    nodePartitions: ").append(toIndentedString(nodePartitions))
+    .append("\n");
+    sb.append("    minCardinality: ").append(toIndentedString(minCardinality))
+        .append("\n");
+    sb.append("    maxCardinality: ").append(toIndentedString(maxCardinality))
+        .append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(java.lang.Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
index 6f6fe6f..a9824bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
@@ -17,49 +17,50 @@
 
 package org.apache.hadoop.yarn.service.api.records;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
 /**
- * Placement policy of an instance of an service. This feature is in the
- * works in YARN-4902.
+ * Advanced placement policy of the components of a service.
  **/
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-@ApiModel(description = "Placement policy of an instance of an service. This 
feature is in the works in YARN-4902.")
-@javax.annotation.Generated(value = "class 
io.swagger.codegen.languages.JavaClientCodegen", date = 
"2016-06-02T08:15:05.615-07:00")
+@ApiModel(description = "Advanced placement policy of the components of a "
+    + "service.")
+@javax.annotation.Generated(
+    value = "class io.swagger.codegen.languages.JavaClientCodegen",
+    date = "2018-02-16T10:20:12.927-07:00")
 public class PlacementPolicy implements Serializable {
   private static final long serialVersionUID = 4341110649551172231L;
 
-  private String label = null;
+  private List<PlacementConstraint> constraints = new ArrayList<>();
 
   /**
-   * Assigns a service to a named partition of the cluster where the service
-   * desires to run (optional). If not specified all services are submitted to
-   * a default label of the service owner. One or more labels can be setup for
-   * each service owner account with required constraints like no-preemption,
-   * sla-99999, preemption-ok, etc.
+   * Placement constraint details.
    **/
-  public PlacementPolicy label(String label) {
-    this.label = label;
+  public PlacementPolicy constraints(List<PlacementConstraint> constraints) {
+    this.constraints = constraints;
     return this;
   }
 
-  @ApiModelProperty(example = "null", value = "Assigns a service to a named 
partition of the cluster where the service desires to run (optional). If not 
specified all services are submitted to a default label of the service owner. 
One or more labels can be setup for each service owner account with required 
constraints like no-preemption, sla-99999, preemption-ok, etc.")
-  @JsonProperty("label")
-  public String getLabel() {
-    return label;
+  @ApiModelProperty(example = "null", required = true)
+  @JsonProperty("constraints")
+  public List<PlacementConstraint> getConstraints() {
+    return constraints;
   }
 
-  public void setLabel(String label) {
-    this.label = label;
+  public void setConstraints(List<PlacementConstraint> constraints) {
+    this.constraints = constraints;
   }
 
   @Override
@@ -71,12 +72,12 @@ public class PlacementPolicy implements Serializable {
       return false;
     }
     PlacementPolicy placementPolicy = (PlacementPolicy) o;
-    return Objects.equals(this.label, placementPolicy.label);
+    return Objects.equals(this.constraints, placementPolicy.constraints);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(label);
+    return Objects.hash(constraints);
   }
 
   @Override
@@ -84,7 +85,8 @@ public class PlacementPolicy implements Serializable {
     StringBuilder sb = new StringBuilder();
     sb.append("class PlacementPolicy {\n");
 
-    sb.append("    label: ").append(toIndentedString(label)).append("\n");
+    sb.append("    constraints: ").append(toIndentedString(constraints))
+        .append("\n");
     sb.append("}");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
new file mode 100644
index 0000000..0da19b7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import io.swagger.annotations.ApiModel;
+
+/**
+ * The scope of placement for the containers of a component.
+ **/
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "The scope of placement for the containers of a "
+    + "component.")
+@javax.annotation.Generated(
+    value = "class io.swagger.codegen.languages.JavaClientCodegen",
+    date = "2018-02-16T10:20:12.927-07:00")
+public enum PlacementScope {
+  NODE(PlacementConstraints.NODE), RACK(PlacementConstraints.RACK);
+
+  private String value;
+
+  PlacementScope(String value) {
+    this.value = value;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  @Override
+  @JsonValue
+  public String toString() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
new file mode 100644
index 0000000..1b155ab
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import io.swagger.annotations.ApiModel;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * The type of placement - affinity/anti-affinity/affinity-with-cardinality 
with
+ * containers of another component or containers of the same component (self).
+ **/
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "The type of placement - affinity/anti-affinity/"
+    + "affinity-with-cardinality with containers of another component or "
+    + "containers of the same component (self).")
+@javax.annotation.Generated(
+    value = "class io.swagger.codegen.languages.JavaClientCodegen",
+    date = "2018-02-16T10:20:12.927-07:00")
+public enum PlacementType {
+  AFFINITY, ANTI_AFFINITY, AFFINITY_WITH_CARDINALITY;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
index c417ec0..f1c0852 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import javax.xml.bind.annotation.XmlElement;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -49,7 +50,7 @@ public class Resource extends BaseResource implements 
Cloneable {
 
   @JsonProperty("additional")
   @XmlElement(name = "additional")
-  private Map<String, ResourceInformation> additional = null;
+  private Map<String, ResourceInformation> additional = new HashMap<>();
 
   /**
    * Each resource profile has a unique id which is associated with a

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
index 7b5c5b3..9475bf6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
@@ -63,9 +63,6 @@ public class Service extends BaseResource {
   @XmlElement(name = "number_of_running_containers")
   private Long numberOfRunningContainers = null;
   private Long lifetime = null;
-  @JsonProperty("placement_policy")
-  @XmlElement(name = "placement_policy")
-  private PlacementPolicy placementPolicy = null;
   private List<Component> components = new ArrayList<>();
   private Configuration configuration = new Configuration();
   private ServiceState state = null;
@@ -249,28 +246,6 @@ public class Service extends BaseResource {
   }
 
   /**
-   * Advanced scheduling and placement policies (optional). If not specified, 
it
-   * defaults to the default placement policy of the service owner. The design 
of
-   * placement policies are in the works. It is not very clear at this point,
-   * how policies in conjunction with labels be exposed to service owners.
-   * This is a placeholder for now. The advanced structure of this attribute
-   * will be determined by YARN-4902.
-   **/
-  public Service placementPolicy(PlacementPolicy placementPolicy) {
-    this.placementPolicy = placementPolicy;
-    return this;
-  }
-
-  @ApiModelProperty(example = "null", value = "Advanced scheduling and 
placement policies (optional). If not specified, it defaults to the default 
placement policy of the service owner. The design of placement policies are in 
the works. It is not very clear at this point, how policies in conjunction with 
labels be exposed to service owners. This is a placeholder for now. The 
advanced structure of this attribute will be determined by YARN-4902.")
-  public PlacementPolicy getPlacementPolicy() {
-    return placementPolicy;
-  }
-
-  public void setPlacementPolicy(PlacementPolicy placementPolicy) {
-    this.placementPolicy = placementPolicy;
-  }
-
-  /**
    * Components of an service.
    **/
   public Service components(List<Component> components) {
@@ -429,8 +404,6 @@ public class Service extends BaseResource {
     sb.append("    numberOfRunningContainers: ")
         .append(toIndentedString(numberOfRunningContainers)).append("\n");
     sb.append("    lifetime: 
").append(toIndentedString(lifetime)).append("\n");
-    sb.append("    placementPolicy: 
").append(toIndentedString(placementPolicy))
-        .append("\n");
     sb.append("    components: ").append(toIndentedString(components))
         .append("\n");
     sb.append("    configuration: ").append(toIndentedString(configuration))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
index 2cee23c..f9c8190 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
@@ -85,8 +85,8 @@ public class ServiceStatus extends BaseResource {
   }
 
   /**
-   * An error code specific to a scenario which service owners should be able 
to use
-   * to understand the failure in addition to the diagnostic information.
+   * An error code specific to a scenario which service owners should be able 
to
+   * use to understand the failure in addition to the diagnostic information.
    **/
   public ServiceStatus code(Integer code) {
     this.code = code;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index b521504..39897f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -20,32 +20,41 @@ package org.apache.hadoop.yarn.service.component;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import 
org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.ServiceMaster;
+import org.apache.hadoop.yarn.service.ServiceMetrics;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
+import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
-import org.apache.hadoop.yarn.service.ServiceMaster;
-import org.apache.hadoop.yarn.service.ServiceMetrics;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
+import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
+import org.apache.hadoop.yarn.service.monitor.probe.Probe;
 import org.apache.hadoop.yarn.service.provider.ProviderUtils;
+import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.service.utils.ServiceUtils;
-import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
-import org.apache.hadoop.yarn.service.monitor.probe.Probe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +64,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -66,9 +76,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
 import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
-import static 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
-import static 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
 import static org.apache.hadoop.yarn.service.component.ComponentState.*;
+import static 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
 import static 
org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD;
 
 public class Component implements EventHandler<ComponentEvent> {
@@ -408,6 +417,8 @@ public class Component implements 
EventHandler<ComponentEvent> {
 
   @SuppressWarnings({ "unchecked" })
   public void requestContainers(long count) {
+    LOG.info("[COMPONENT {}] Requesting for {} container(s)",
+        componentSpec.getName(), count);
     org.apache.hadoop.yarn.service.api.records.Resource componentResource =
         componentSpec.getResource();
 
@@ -440,12 +451,98 @@ public class Component implements 
EventHandler<ComponentEvent> {
       }
     }
 
-    for (int i = 0; i < count; i++) {
-      //TODO Once YARN-5468 is done, use that for anti-affinity
-      ContainerRequest request =
-          ContainerRequest.newBuilder().capability(resource).priority(priority)
-              .allocationRequestId(allocateId).relaxLocality(true).build();
-      amrmClient.addContainerRequest(request);
+    if (!scheduler.hasAtLeastOnePlacementConstraint()) {
+      for (int i = 0; i < count; i++) {
+        ContainerRequest request = ContainerRequest.newBuilder()
+            .capability(resource).priority(priority)
+            .allocationRequestId(allocateId).relaxLocality(true).build();
+        LOG.info("[COMPONENT {}] Submitting container request : {}",
+            componentSpec.getName(), request);
+        amrmClient.addContainerRequest(request);
+      }
+    } else {
+      // Schedule placement requests. Validation of non-null target tags and
+      // that they refer to existing component names are already done. So, no
+      // need to validate here.
+      PlacementPolicy placementPolicy = componentSpec.getPlacementPolicy();
+      Collection<SchedulingRequest> schedulingRequests = new HashSet<>();
+      // We prepare an AND-ed composite constraint to be the final composite
+      // constraint. If placement expressions are specified to create advanced
+      // composite constraints then this AND-ed composite constraint is not
+      // used.
+      PlacementConstraint finalConstraint = null;
+      for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
+          yarnServiceConstraint : placementPolicy.getConstraints()) {
+        List<TargetExpression> targetExpressions = new ArrayList<>();
+        // Currently only intra-application allocation tags are supported.
+        if (!yarnServiceConstraint.getTargetTags().isEmpty()) {
+          targetExpressions.add(PlacementTargets.allocationTagToIntraApp(
+              yarnServiceConstraint.getTargetTags().toArray(new String[0])));
+        }
+        // Add all node attributes
+        for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
+            .getNodeAttributes().entrySet()) {
+          targetExpressions.add(PlacementTargets.nodeAttribute(
+              attribute.getKey(), attribute.getValue().toArray(new 
String[0])));
+        }
+        // Add all node partitions
+        if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
+          targetExpressions
+              .add(PlacementTargets.nodePartition(yarnServiceConstraint
+                  .getNodePartitions().toArray(new String[0])));
+        }
+        PlacementConstraint constraint = null;
+        switch (yarnServiceConstraint.getType()) {
+        case AFFINITY:
+          constraint = PlacementConstraints
+              .targetIn(yarnServiceConstraint.getScope().getValue(),
+                  targetExpressions.toArray(new TargetExpression[0]))
+              .build();
+          break;
+        case ANTI_AFFINITY:
+          constraint = PlacementConstraints
+              .targetNotIn(yarnServiceConstraint.getScope().getValue(),
+                  targetExpressions.toArray(new TargetExpression[0]))
+              .build();
+          break;
+        case AFFINITY_WITH_CARDINALITY:
+          constraint = PlacementConstraints.targetCardinality(
+              yarnServiceConstraint.getScope().name().toLowerCase(),
+              yarnServiceConstraint.getMinCardinality() == null ? 0
+                  : yarnServiceConstraint.getMinCardinality().intValue(),
+              yarnServiceConstraint.getMaxCardinality() == null
+                  ? Integer.MAX_VALUE
+                  : yarnServiceConstraint.getMaxCardinality().intValue(),
+              targetExpressions.toArray(new TargetExpression[0])).build();
+          break;
+        }
+        // The default AND-ed final composite constraint
+        if (finalConstraint != null) {
+          finalConstraint = PlacementConstraints
+              .and(constraint.getConstraintExpr(),
+                  finalConstraint.getConstraintExpr())
+              .build();
+        } else {
+          finalConstraint = constraint;
+        }
+        LOG.debug("[COMPONENT {}] Placement constraint: {}",
+            componentSpec.getName(), 
constraint.getConstraintExpr().toString());
+      }
+      ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
+          resource);
+      LOG.debug("[COMPONENT {}] Resource sizing: {}", componentSpec.getName(),
+          resourceSizing);
+      SchedulingRequest request = SchedulingRequest.newBuilder()
+          .priority(priority).allocationRequestId(allocateId)
+          .allocationTags(Collections.singleton(componentSpec.getName()))
+          .executionType(
+              ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true))
+          .placementConstraintExpression(finalConstraint)
+          .resourceSizing(resourceSizing).build();
+      LOG.info("[COMPONENT {}] Submitting scheduling request: {}",
+          componentSpec.getName(), request);
+      schedulingRequests.add(request);
+      amrmClient.addSchedulingRequests(schedulingRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java
index 12f6455..6b2b8af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java
@@ -91,4 +91,16 @@ public interface RestApiErrorMessages {
 
   String ERROR_QUICKLINKS_FOR_COMP_INVALID = "Quicklinks specified at"
       + " component level, needs corresponding values set at service level";
+  String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s "
+      + "specified in placement policy of component %s. For now, target tags "
+      + "support self reference only. Specifying anything other than its "
+      + "component name is not supported. Set target tag of component %s to "
+      + "%s.";
+  String ERROR_PLACEMENT_POLICY_TAG_NAME_INVALID = "Invalid target tag %s "
+      + "specified in placement policy of component %s. Target tags should be "
+      + "a valid component name in the service.";
+  String ERROR_PLACEMENT_POLICY_EXPRESSION_ELEMENT_NAME_INVALID = "Invalid "
+      + "expression element name %s specified in placement policy of component 
"
+      + "%s. Expression element names should be a valid constraint name or an "
+      + "expression name defined for this component only.";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
index 949ce19..6c73ebb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
@@ -268,10 +268,6 @@ public class ServiceTimelinePublisher extends 
CompositeService {
       }
       entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
           component.getRunPrivilegedContainer().toString());
-      if (component.getPlacementPolicy() != null) {
-        entityInfos.put(ServiceTimelineMetricsConstants.PLACEMENT_POLICY,
-            component.getPlacementPolicy().getLabel());
-      }
       entity.addInfo(entityInfos);
 
       putEntity(entity);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index 13d9a37..fc1b45b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -27,17 +27,18 @@ import 
org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
 import org.apache.hadoop.yarn.service.api.records.Resource;
+import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
-import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
-import org.apache.hadoop.yarn.service.provider.ProviderFactory;
-import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
 import org.apache.hadoop.yarn.service.conf.RestApiConstants;
 import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
+import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
+import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
+import org.apache.hadoop.yarn.service.provider.ProviderFactory;
 import org.codehaus.jackson.map.PropertyNamingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,6 +203,7 @@ public class ServiceApiUtil {
       }
       validateComponent(comp, fs.getFileSystem(), conf);
     }
+    validatePlacementPolicy(service.getComponents(), componentNames);
 
     // validate dependency tree
     sortByDependencies(service.getComponents());
@@ -263,6 +265,24 @@ public class ServiceApiUtil {
     namePattern.validate(name);
   }
 
+  private static void validatePlacementPolicy(List<Component> components,
+      Set<String> componentNames) {
+    for (Component comp : components) {
+      if (comp.getPlacementPolicy() != null) {
+        for (PlacementConstraint constraint : comp.getPlacementPolicy()
+            .getConstraints()) {
+          for (String targetTag : constraint.getTargetTags()) {
+            if (!comp.getName().equals(targetTag)) {
+              throw new IllegalArgumentException(String.format(
+                  
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
+                  targetTag, comp.getName(), comp.getName(), comp.getName()));
+            }
+          }
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   public static List<Component> getComponents(SliderFileSystem
       fs, String serviceName) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
index c2f8f3e..d195b2c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
@@ -22,6 +22,8 @@ import 
org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
+import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@@ -490,4 +492,37 @@ public class TestServiceApiUtil {
       Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
     }
   }
+
+  @Test
+  public void testPlacementPolicy() throws IOException {
+    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
+    Service app = createValidApplication("comp-a");
+    Component comp = app.getComponents().get(0);
+    PlacementPolicy pp = new PlacementPolicy();
+    PlacementConstraint pc = new PlacementConstraint();
+    pc.setName("CA1");
+    pc.setTargetTags(Collections.singletonList("comp-invalid"));
+    pp.setConstraints(Collections.singletonList(pc));
+    comp.setPlacementPolicy(pp);
+
+    try {
+      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
+      Assert.fail(EXCEPTION_PREFIX + "service with empty placement");
+    } catch (IllegalArgumentException e) {
+      assertEquals(
+          String.format(
+              RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
+              "comp-invalid", "comp-a", "comp-a", "comp-a"),
+          e.getMessage());
+    }
+
+    pc.setTargetTags(Collections.singletonList("comp-a"));
+
+    // now it should succeed
+    try {
+      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
+    } catch (IllegalArgumentException e) {
+      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index fac282b..5e267bb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -26,14 +26,21 @@ import 
org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
+import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
+import org.apache.hadoop.yarn.service.api.records.PlacementScope;
+import org.apache.hadoop.yarn.service.api.records.PlacementType;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
@@ -385,6 +392,103 @@ public class TestYarnNativeServices extends 
ServiceTestUtils {
     Assert.assertEquals(service.getVersion(), fromFs.getVersion());
   }
 
+  // Test to verify ANTI_AFFINITY placement policy
+  // 1. Start mini cluster with 3 NMs and scheduler placement-constraint 
handler
+  // 2. Create an example service with 3 containers
+  // 3. Verify no more than 1 container comes up in each of the 3 NMs
+  // 4. Flex the component to 4 containers
+  // 5. Verify that the 4th container does not even get allocated since there
+  //    are only 3 NMs
+  @Test (timeout = 200000)
+  public void testCreateServiceWithPlacementPolicy() throws Exception {
+    // We need to enable scheduler placement-constraint at the cluster level to
+    // let apps use placement policies.
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    setConf(conf);
+    setupInternal(3);
+    ServiceClient client = createClient(getConf());
+    Service exampleApp = new Service();
+    exampleApp.setName("example-app");
+    exampleApp.setVersion("v1");
+    Component comp = createComponent("compa", 3L, "sleep 1000");
+    PlacementPolicy pp = new PlacementPolicy();
+    PlacementConstraint pc = new PlacementConstraint();
+    pc.setName("CA1");
+    pc.setTargetTags(Collections.singletonList("compa"));
+    pc.setScope(PlacementScope.NODE);
+    pc.setType(PlacementType.ANTI_AFFINITY);
+    pp.setConstraints(Collections.singletonList(pc));
+    comp.setPlacementPolicy(pp);
+    exampleApp.addComponent(comp);
+    client.actionCreate(exampleApp);
+    waitForServiceToBeStable(client, exampleApp);
+
+    // Check service is stable and all 3 containers are running
+    Service service = client.getStatus(exampleApp.getName());
+    Component component = service.getComponent("compa");
+    Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE,
+        service.getState());
+    Assert.assertEquals("3 containers are expected to be running", 3,
+        component.getContainers().size());
+    // Prepare a map of non-AM containers for later lookup
+    Set<String> nonAMContainerIdSet = new HashSet<>();
+    for (Container cont : component.getContainers()) {
+      nonAMContainerIdSet.add(cont.getId());
+    }
+
+    // Verify that no more than 1 non-AM container came up on each of the 3 NMs
+    Set<String> hosts = new HashSet<>();
+    ApplicationReport report = client.getYarnClient()
+        .getApplicationReport(ApplicationId.fromString(exampleApp.getId()));
+    GetContainersRequest req = GetContainersRequest
+        .newInstance(report.getCurrentApplicationAttemptId());
+    ResourceManager rm = getYarnCluster().getResourceManager();
+    for (ContainerReport contReport : 
rm.getClientRMService().getContainers(req)
+        .getContainerList()) {
+      if (!nonAMContainerIdSet
+          .contains(contReport.getContainerId().toString())) {
+        continue;
+      }
+      if (hosts.contains(contReport.getNodeHttpAddress())) {
+        Assert.fail("Container " + contReport.getContainerId()
+            + " came up in the same host as another container.");
+      } else {
+        hosts.add(contReport.getNodeHttpAddress());
+      }
+    }
+
+    // Flex compa up to 4, which is more containers than the no of NMs
+    Map<String, Long> compCounts = new HashMap<>();
+    compCounts.put("compa", 4L);
+    exampleApp.getComponent("compa").setNumberOfContainers(4L);
+    client.flexByRestService(exampleApp.getName(), compCounts);
+    try {
+      // 10 secs is enough for the container to be started. The down side of
+      // this test is that it has to wait that long. Setting a higher wait time
+      // will add to the total time taken by tests to run.
+      waitForServiceToBeStable(client, exampleApp, 10000);
+      Assert.fail("Service should not be in a stable state. It should throw "
+          + "a timeout exception.");
+    } catch (Exception e) {
+      // Check that service state is not STABLE and only 3 containers are
+      // running and the fourth one should not get allocated.
+      service = client.getStatus(exampleApp.getName());
+      component = service.getComponent("compa");
+      Assert.assertNotEquals("Service state should not be STABLE",
+          ServiceState.STABLE, service.getState());
+      Assert.assertEquals("Component state should be FLEXING",
+          ComponentState.FLEXING, component.getState());
+      Assert.assertEquals("3 containers are expected to be running", 3,
+          component.getContainers().size());
+    }
+
+    LOG.info("Stop/destroy service {}", exampleApp);
+    client.actionStop(exampleApp.getName(), true);
+    client.actionDestroy(exampleApp.getName());
+  }
+
   // Check containers launched are in dependency order
   // Get all containers into a list and sort based on container launch time 
e.g.
   // compa-c1, compa-c2, compb-c1, compb-c2;
@@ -534,7 +638,14 @@ public class TestYarnNativeServices extends 
ServiceTestUtils {
    */
   private void waitForServiceToBeStable(ServiceClient client,
       Service exampleApp) throws TimeoutException, InterruptedException {
-    waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE);
+    waitForServiceToBeStable(client, exampleApp, 200000);
+  }
+
+  private void waitForServiceToBeStable(ServiceClient client,
+      Service exampleApp, int waitForMillis)
+      throws TimeoutException, InterruptedException {
+    waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
+        waitForMillis);
   }
 
   /**
@@ -550,6 +661,12 @@ public class TestYarnNativeServices extends 
ServiceTestUtils {
     waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
   }
 
+  private void waitForServiceToBeInState(ServiceClient client,
+      Service exampleApp, ServiceState desiredState) throws TimeoutException,
+      InterruptedException {
+    waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
+  }
+
   /**
    * Wait until service is started. It does not have to reach a stable state.
    *
@@ -559,8 +676,8 @@ public class TestYarnNativeServices extends 
ServiceTestUtils {
    * @throws InterruptedException
    */
   private void waitForServiceToBeInState(ServiceClient client,
-      Service exampleApp, ServiceState desiredState) throws TimeoutException,
-      InterruptedException {
+      Service exampleApp, ServiceState desiredState, int waitForMillis) throws
+      TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       try {
         Service retrievedApp = client.getStatus(exampleApp.getName());
@@ -570,7 +687,7 @@ public class TestYarnNativeServices extends 
ServiceTestUtils {
         e.printStackTrace();
         return false;
       }
-    }, 2000, 200000);
+    }, 2000, waitForMillis);
   }
 
   private int countTotalContainers(Service service) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
index 80b4f51..cff7229 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
 import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
+import org.apache.hadoop.yarn.service.api.records.PlacementType;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
@@ -45,6 +47,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -208,8 +211,6 @@ public class TestServiceTimelinePublisher {
         info.get(ServiceTimelineMetricsConstants.LAUNCH_COMMAND));
     assertEquals("false",
         info.get(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER));
-    assertEquals("label",
-        info.get(ServiceTimelineMetricsConstants.PLACEMENT_POLICY));
   }
 
   private static Service createMockApplication() {
@@ -234,7 +235,10 @@ public class TestServiceTimelinePublisher {
     when(component.getResource()).thenReturn(resource);
     when(component.getLaunchCommand()).thenReturn("sleep 1");
     PlacementPolicy placementPolicy = new PlacementPolicy();
-    placementPolicy.setLabel("label");
+    PlacementConstraint placementConstraint = new PlacementConstraint();
+    placementConstraint.setType(PlacementType.ANTI_AFFINITY);
+    placementPolicy
+        .setConstraints(Collections.singletonList(placementConstraint));
     when(component.getPlacementPolicy()).thenReturn(placementPolicy);
     when(component.getConfiguration()).thenReturn(
         new org.apache.hadoop.yarn.service.api.records.Configuration());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b886836/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
index 11f75bb..a53dca1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
@@ -291,6 +291,7 @@ public class SchedulingRequestPBImpl extends 
SchedulingRequest {
         ", executionType=" + getExecutionType() +
         ", allocationTags=" + getAllocationTags() +
         ", resourceSizing=" + getResourceSizing() +
+        ", placementConstraint=" + getPlacementConstraint() +
         '}';
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to