http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
new file mode 100644
index 0000000..bfb12ee
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
+
+
+
+import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
+ * <code>AMRMProxy</code> request interceptor. It is responsible for the
+ * following:</p>
+ * <ul>
+ *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
+ *   response objects to extract instructions from the
+ *   <code>ClusterMonitor</code> running on the ResourceManager to aid in 
making
+ *   distributed scheduling decisions.</li>
+ *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
+ *   containers for the outstanding OPPORTUNISTIC container requests.</li>
+ * </ul>
+ */
+public final class DistributedScheduler extends AbstractRequestInterceptor {
+
+  static class PartitionedResourceRequests {
+    private List<ResourceRequest> guaranteed = new ArrayList<>();
+    private List<ResourceRequest> opportunistic = new ArrayList<>();
+    public List<ResourceRequest> getGuaranteed() {
+      return guaranteed;
+    }
+    public List<ResourceRequest> getOpportunistic() {
+      return opportunistic;
+    }
+  }
+
+  static class DistributedSchedulerParams {
+    Resource maxResource;
+    Resource minResource;
+    Resource incrementResource;
+    int containerTokenExpiryInterval;
+  }
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DistributedScheduler.class);
+
+  private final static RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  // Currently just used to keep track of allocated containers.
+  // Can be used for reporting stats later.
+  private Set<ContainerId> containersAllocated = new HashSet<>();
+
+  private DistributedSchedulerParams appParams =
+      new DistributedSchedulerParams();
+  private final OpportunisticContainerAllocator.ContainerIdCounter
+      containerIdCounter =
+          new OpportunisticContainerAllocator.ContainerIdCounter();
+  private Map<String, NodeId> nodeList = new LinkedHashMap<>();
+
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+  final Set<String> blacklist = new HashSet<>();
+
+  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by 
Priority,
+  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // to match a received Container to an outstanding OPPORTUNISTIC
+  // ResourceRequest (ask).
+  final TreeMap<Priority, Map<Resource, ResourceRequest>>
+      outstandingOpReqs = new TreeMap<>();
+
+  private ApplicationAttemptId applicationAttemptId;
+  private OpportunisticContainerAllocator containerAllocator;
+  private NMTokenSecretManagerInNM nmSecretManager;
+  private String appSubmitter;
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    initLocal(appContext.getApplicationAttemptId(),
+        appContext.getNMCotext().getContainerAllocator(),
+        appContext.getNMCotext().getNMTokenSecretManager(),
+        appContext.getUser());
+  }
+
+  @VisibleForTesting
+  void initLocal(ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerAllocator containerAllocator,
+      NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
+    this.applicationAttemptId = applicationAttemptId;
+    this.containerAllocator = containerAllocator;
+    this.nmSecretManager = nmSecretManager;
+    this.appSubmitter = appSubmitter;
+  }
+
+  /**
+   * Route register call to the corresponding distributed scheduling method 
viz.
+   * registerApplicationMasterForDistributedScheduling, and return response to
+   * the caller after stripping away Distributed Scheduling information.
+   *
+   * @param request
+   *          registration request
+   * @return Allocate Response
+   * @throws YarnException YarnException
+   * @throws IOException IOException
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return registerApplicationMasterForDistributedScheduling(request)
+        .getRegisterResponse();
+  }
+
+  /**
+   * Route allocate call to the allocateForDistributedScheduling method and
+   * return response to the caller after stripping away Distributed Scheduling
+   * information.
+   *
+   * @param request
+   *          allocation request
+   * @return Allocate Response
+   * @throws YarnException YarnException
+   * @throws IOException IOException
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
+        .newRecordInstance(DistributedSchedulingAllocateRequest.class);
+    distRequest.setAllocateRequest(request);
+    return allocateForDistributedScheduling(distRequest).getAllocateResponse();
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * Check if we already have a NMToken. if Not, generate the Token and
+   * add it to the response
+   */
+  private void updateResponseWithNMTokens(AllocateResponse response,
+      List<NMToken> nmTokens, List<Container> allocatedContainers) {
+    List<NMToken> newTokens = new ArrayList<>();
+    if (allocatedContainers.size() > 0) {
+      response.getAllocatedContainers().addAll(allocatedContainers);
+      for (Container alloc : allocatedContainers) {
+        if (!nodeTokens.containsKey(alloc.getNodeId())) {
+          newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
+        }
+      }
+      List<NMToken> retTokens = new ArrayList<>(nmTokens);
+      retTokens.addAll(newTokens);
+      response.setNMTokens(retTokens);
+    }
+  }
+
+  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+      askList) {
+    PartitionedResourceRequests partitionedRequests =
+        new PartitionedResourceRequests();
+    for (ResourceRequest rr : askList) {
+      if (rr.getExecutionTypeRequest().getExecutionType() ==
+          ExecutionType.OPPORTUNISTIC) {
+        partitionedRequests.getOpportunistic().add(rr);
+      } else {
+        partitionedRequests.getGuaranteed().add(rr);
+      }
+    }
+    return partitionedRequests;
+  }
+
+  private void updateParameters(
+      RegisterDistributedSchedulingAMResponse registerResponse) {
+    appParams.minResource = registerResponse.getMinContainerResource();
+    appParams.maxResource = registerResponse.getMaxContainerResource();
+    appParams.incrementResource =
+        registerResponse.getIncrContainerResource();
+    if (appParams.incrementResource == null) {
+      appParams.incrementResource = appParams.minResource;
+    }
+    appParams.containerTokenExpiryInterval = registerResponse
+        .getContainerTokenExpiryInterval();
+
+    containerIdCounter
+        .resetContainerIdCounter(registerResponse.getContainerIdStart());
+    setNodeList(registerResponse.getNodesForScheduling());
+  }
+
+  /**
+   * Takes a list of ResourceRequests (asks), extracts the key information viz.
+   * (Priority, ResourceName, Capability) and adds to the outstanding
+   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+   * the current YARN constraint that only a single ResourceRequest can exist 
at
+   * a give Priority and Capability.
+   *
+   * @param resourceAsks the list with the {@link ResourceRequest}s
+   */
+  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+    for (ResourceRequest request : resourceAsks) {
+      Priority priority = request.getPriority();
+
+      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
+      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+        continue;
+      }
+
+      if (request.getNumContainers() == 0) {
+        continue;
+      }
+
+      Map<Resource, ResourceRequest> reqMap =
+          this.outstandingOpReqs.get(priority);
+      if (reqMap == null) {
+        reqMap = new HashMap<>();
+        this.outstandingOpReqs.put(priority, reqMap);
+      }
+
+      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+      if (resourceRequest == null) {
+        resourceRequest = request;
+        reqMap.put(request.getCapability(), request);
+      } else {
+        resourceRequest.setNumContainers(
+            resourceRequest.getNumContainers() + request.getNumContainers());
+      }
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+            + ", with capability = " + request.getCapability() + " ) : "
+            + resourceRequest.getNumContainers());
+      }
+    }
+  }
+
+  /**
+   * This method matches a returned list of Container Allocations to any
+   * outstanding OPPORTUNISTIC ResourceRequest.
+   */
+  private void matchAllocationToOutstandingRequest(Resource capability,
+      List<Container> allocatedContainers) {
+    for (Container c : allocatedContainers) {
+      containersAllocated.add(c.getId());
+      Map<Resource, ResourceRequest> asks =
+          outstandingOpReqs.get(c.getPriority());
+
+      if (asks == null)
+        continue;
+
+      ResourceRequest rr = asks.get(capability);
+      if (rr != null) {
+        rr.setNumContainers(rr.getNumContainers() - 1);
+        if (rr.getNumContainers() == 0) {
+          asks.remove(capability);
+        }
+      }
+    }
+  }
+
+  private void setNodeList(List<NodeId> nodeList) {
+    this.nodeList.clear();
+    addToNodeList(nodeList);
+  }
+
+  private void addToNodeList(List<NodeId> nodes) {
+    for (NodeId n : nodes) {
+      this.nodeList.put(n.getHost(), n);
+    }
+  }
+
+  @Override
+  public RegisterDistributedSchedulingAMResponse
+      registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    LOG.info("Forwarding registration request to the" +
+        "Distributed Scheduler Service on YARN RM");
+    RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
+        .registerApplicationMasterForDistributedScheduling(request);
+    updateParameters(dsResp);
+    return dsResp;
+  }
+
+  @Override
+  public DistributedSchedulingAllocateResponse 
allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the" +
+          "Distributed Scheduler Service on YARN RM");
+    }
+    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+    PartitionedResourceRequests partitionedAsks =
+        partitionAskList(request.getAllocateRequest().getAskList());
+
+    List<ContainerId> releasedContainers =
+        request.getAllocateRequest().getReleaseList();
+    int numReleasedContainers = releasedContainers.size();
+    if (numReleasedContainers > 0) {
+      LOG.info("AttemptID: " + applicationAttemptId + " released: "
+          + numReleasedContainers);
+      containersAllocated.removeAll(releasedContainers);
+    }
+
+    // Also, update black list
+    ResourceBlacklistRequest rbr =
+        request.getAllocateRequest().getResourceBlacklistRequest();
+    if (rbr != null) {
+      blacklist.removeAll(rbr.getBlacklistRemovals());
+      blacklist.addAll(rbr.getBlacklistAdditions());
+    }
+
+    // Add OPPORTUNISTIC reqs to the outstanding reqs
+    addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+    List<Container> allocatedContainers = new ArrayList<>();
+    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given Cap (The actual container size
+      //          might be different than what is requested.. which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      Map<Resource, List<Container>> allocated =
+          containerAllocator.allocate(this.appParams, containerIdCounter,
+              outstandingOpReqs.get(priority).values(), blacklist,
+              applicationAttemptId, nodeList, appSubmitter);
+      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
+        allocatedContainers.addAll(e.getValue());
+      }
+    }
+
+    request.setAllocatedContainers(allocatedContainers);
+
+    // Send all the GUARANTEED Reqs to RM
+    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
+    DistributedSchedulingAllocateResponse dsResp =
+        getNextInterceptor().allocateForDistributedScheduling(request);
+
+    // Update host to nodeId mapping
+    setNodeList(dsResp.getNodesForScheduling());
+    List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
+    for (NMToken nmToken : nmTokens) {
+      nodeTokens.put(nmToken.getNodeId(), nmToken);
+    }
+
+    List<ContainerStatus> completedContainers =
+        dsResp.getAllocateResponse().getCompletedContainersStatuses();
+
+    // Only account for opportunistic containers
+    for (ContainerStatus cs : completedContainers) {
+      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        containersAllocated.remove(cs.getContainerId());
+      }
+    }
+
+    // Check if we have NM tokens for all the allocated containers. If not
+    // generate one and update the response.
+    updateResponseWithNMTokens(
+        dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Number of opportunistic containers currently allocated by" +
+              "application: " + containersAllocated.size());
+    }
+    return dsResp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
deleted file mode 100644
index ec0e8a4..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
-    .AMRMProxyApplicationContext;
-import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
-
-
-
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMTokenSecretManagerInNM;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * <p>The LocalScheduler runs on the NodeManager and is modelled as an
- * <code>AMRMProxy</code> request interceptor. It is responsible for the
- * following :</p>
- * <ul>
- *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
- *   response objects to extract instructions from the
- *   <code>ClusterManager</code> running on the ResourceManager to aid in 
making
- *   Scheduling scheduling decisions</li>
- *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
- *   containers for the opportunistic resource outstandingOpReqs</li>
- * </ul>
- */
-public final class LocalScheduler extends AbstractRequestInterceptor {
-
-  static class PartitionedResourceRequests {
-    private List<ResourceRequest> guaranteed = new ArrayList<>();
-    private List<ResourceRequest> opportunistic = new ArrayList<>();
-    public List<ResourceRequest> getGuaranteed() {
-      return guaranteed;
-    }
-    public List<ResourceRequest> getOpportunistic() {
-      return opportunistic;
-    }
-  }
-
-  static class DistSchedulerParams {
-    Resource maxResource;
-    Resource minResource;
-    Resource incrementResource;
-    int containerTokenExpiryInterval;
-  }
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(LocalScheduler.class);
-
-  private final static RecordFactory RECORD_FACTORY =
-      RecordFactoryProvider.getRecordFactory(null);
-
-  // Currently just used to keep track of allocated Containers
-  // Can be used for reporting stats later
-  private Set<ContainerId> containersAllocated = new HashSet<>();
-
-  private DistSchedulerParams appParams = new DistSchedulerParams();
-  private final OpportunisticContainerAllocator.ContainerIdCounter 
containerIdCounter =
-      new OpportunisticContainerAllocator.ContainerIdCounter();
-  private Map<String, NodeId> nodeList = new LinkedHashMap<>();
-
-  // Mapping of NodeId to NodeTokens. Populated either from RM response or
-  // generated locally if required.
-  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
-  final Set<String> blacklist = new HashSet<>();
-
-  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by 
Priority,
-  // Resource Name (Host/rack/any) and capability. This mapping is required
-  // to match a received Container to an outstanding OPPORTUNISTIC
-  // ResourceRequests (ask)
-  final TreeMap<Priority, Map<Resource, ResourceRequest>>
-      outstandingOpReqs = new TreeMap<>();
-
-  private ApplicationAttemptId applicationAttemptId;
-  private OpportunisticContainerAllocator containerAllocator;
-  private NMTokenSecretManagerInNM nmSecretManager;
-  private String appSubmitter;
-
-  public void init(AMRMProxyApplicationContext appContext) {
-    super.init(appContext);
-    initLocal(appContext.getApplicationAttemptId(),
-        appContext.getNMCotext().getContainerAllocator(),
-        appContext.getNMCotext().getNMTokenSecretManager(),
-        appContext.getUser());
-  }
-
-  @VisibleForTesting
-  void initLocal(ApplicationAttemptId applicationAttemptId,
-      OpportunisticContainerAllocator containerAllocator,
-      NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
-    this.applicationAttemptId = applicationAttemptId;
-    this.containerAllocator = containerAllocator;
-    this.nmSecretManager = nmSecretManager;
-    this.appSubmitter = appSubmitter;
-  }
-
-  /**
-   * Route register call to the corresponding distributed scheduling method 
viz.
-   * registerApplicationMasterForDistributedScheduling, and return response to
-   * the caller after stripping away Distributed Scheduling information.
-   *
-   * @param request
-   *          registration request
-   * @return Allocate Response
-   * @throws YarnException
-   * @throws IOException
-   */
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return registerApplicationMasterForDistributedScheduling(request)
-        .getRegisterResponse();
-  }
-
-  /**
-   * Route allocate call to the allocateForDistributedScheduling method and
-   * return response to the caller after stripping away Distributed Scheduling
-   * information.
-   *
-   * @param request
-   *          allocation request
-   * @return Allocate Response
-   * @throws YarnException
-   * @throws IOException
-   */
-  @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
-    DistSchedAllocateRequest distRequest =
-        RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
-    distRequest.setAllocateRequest(request);
-    return allocateForDistributedScheduling(distRequest).getAllocateResponse();
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return getNextInterceptor().finishApplicationMaster(request);
-  }
-
-  /**
-   * Check if we already have a NMToken. if Not, generate the Token and
-   * add it to the response
-   * @param response
-   * @param nmTokens
-   * @param allocatedContainers
-   */
-  private void updateResponseWithNMTokens(AllocateResponse response,
-      List<NMToken> nmTokens, List<Container> allocatedContainers) {
-    List<NMToken> newTokens = new ArrayList<>();
-    if (allocatedContainers.size() > 0) {
-      response.getAllocatedContainers().addAll(allocatedContainers);
-      for (Container alloc : allocatedContainers) {
-        if (!nodeTokens.containsKey(alloc.getNodeId())) {
-          newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
-        }
-      }
-      List<NMToken> retTokens = new ArrayList<>(nmTokens);
-      retTokens.addAll(newTokens);
-      response.setNMTokens(retTokens);
-    }
-  }
-
-  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
-      askList) {
-    PartitionedResourceRequests partitionedRequests =
-        new PartitionedResourceRequests();
-    for (ResourceRequest rr : askList) {
-      if (rr.getExecutionTypeRequest().getExecutionType() ==
-          ExecutionType.OPPORTUNISTIC) {
-        partitionedRequests.getOpportunistic().add(rr);
-      } else {
-        partitionedRequests.getGuaranteed().add(rr);
-      }
-    }
-    return partitionedRequests;
-  }
-
-  private void updateParameters(
-      DistSchedRegisterResponse registerResponse) {
-    appParams.minResource = registerResponse.getMinAllocatableCapabilty();
-    appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
-    appParams.incrementResource =
-        registerResponse.getIncrAllocatableCapabilty();
-    if (appParams.incrementResource == null) {
-      appParams.incrementResource = appParams.minResource;
-    }
-    appParams.containerTokenExpiryInterval = registerResponse
-        .getContainerTokenExpiryInterval();
-
-    containerIdCounter
-        .resetContainerIdCounter(registerResponse.getContainerIdStart());
-    setNodeList(registerResponse.getNodesForScheduling());
-  }
-
-  /**
-   * Takes a list of ResourceRequests (asks), extracts the key information viz.
-   * (Priority, ResourceName, Capability) and adds it the outstanding
-   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
-   * the current YARN constraint that only a single ResourceRequest can exist 
at
-   * a give Priority and Capability
-   * @param resourceAsks
-   */
-  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
-    for (ResourceRequest request : resourceAsks) {
-      Priority priority = request.getPriority();
-
-      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
-      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
-        continue;
-      }
-
-      if (request.getNumContainers() == 0) {
-        continue;
-      }
-
-      Map<Resource, ResourceRequest> reqMap =
-          this.outstandingOpReqs.get(priority);
-      if (reqMap == null) {
-        reqMap = new HashMap<>();
-        this.outstandingOpReqs.put(priority, reqMap);
-      }
-
-      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
-      if (resourceRequest == null) {
-        resourceRequest = request;
-        reqMap.put(request.getCapability(), request);
-      } else {
-        resourceRequest.setNumContainers(
-            resourceRequest.getNumContainers() + request.getNumContainers());
-      }
-      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
-        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
-            + ", with capability = " + request.getCapability() + " ) : "
-            + resourceRequest.getNumContainers());
-      }
-    }
-  }
-
-  /**
-   * This method matches a returned list of Container Allocations to any
-   * outstanding OPPORTUNISTIC ResourceRequest
-   * @param capability
-   * @param allocatedContainers
-   */
-  public void matchAllocationToOutstandingRequest(Resource capability,
-      List<Container> allocatedContainers) {
-    for (Container c : allocatedContainers) {
-      containersAllocated.add(c.getId());
-      Map<Resource, ResourceRequest> asks =
-          outstandingOpReqs.get(c.getPriority());
-
-      if (asks == null)
-        continue;
-
-      ResourceRequest rr = asks.get(capability);
-      if (rr != null) {
-        rr.setNumContainers(rr.getNumContainers() - 1);
-        if (rr.getNumContainers() == 0) {
-          asks.remove(capability);
-        }
-      }
-    }
-  }
-
-  private void setNodeList(List<NodeId> nodeList) {
-    this.nodeList.clear();
-    addToNodeList(nodeList);
-  }
-
-  private void addToNodeList(List<NodeId> nodes) {
-    for (NodeId n : nodes) {
-      this.nodeList.put(n.getHost(), n);
-    }
-  }
-
-  @Override
-  public DistSchedRegisterResponse
-      registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request)
-              throws YarnException, IOException {
-    LOG.info("Forwarding registration request to the" +
-        "Distributed Scheduler Service on YARN RM");
-    DistSchedRegisterResponse dsResp = getNextInterceptor()
-        .registerApplicationMasterForDistributedScheduling(request);
-    updateParameters(dsResp);
-    return dsResp;
-  }
-
-  @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Forwarding allocate request to the" +
-          "Distributed Scheduler Service on YARN RM");
-    }
-    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks = partitionAskList(
-        request.getAllocateRequest().getAskList());
-
-    List<ContainerId> releasedContainers =
-        request.getAllocateRequest().getReleaseList();
-    int numReleasedContainers = releasedContainers.size();
-    if (numReleasedContainers > 0) {
-      LOG.info("AttemptID: " + applicationAttemptId + " released: "
-          + numReleasedContainers);
-      containersAllocated.removeAll(releasedContainers);
-    }
-
-    // Also, update black list
-    ResourceBlacklistRequest rbr =
-        request.getAllocateRequest().getResourceBlacklistRequest();
-    if (rbr != null) {
-      blacklist.removeAll(rbr.getBlacklistRemovals());
-      blacklist.addAll(rbr.getBlacklistAdditions());
-    }
-
-    // Add OPPORTUNISTIC reqs to the outstanding reqs
-    addToOutstandingReqs(partitionedAsks.getOpportunistic());
-
-    List<Container> allocatedContainers = new ArrayList<>();
-    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
-      // Allocated containers :
-      //  Key = Requested Capability,
-      //  Value = List of Containers of given Cap (The actual container size
-      //          might be different than what is requested.. which is why
-      //          we need the requested capability (key) to match against
-      //          the outstanding reqs)
-      Map<Resource, List<Container>> allocated =
-          containerAllocator.allocate(this.appParams, containerIdCounter,
-              outstandingOpReqs.get(priority).values(), blacklist,
-              applicationAttemptId, nodeList, appSubmitter);
-      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
-        allocatedContainers.addAll(e.getValue());
-      }
-    }
-    request.setAllocatedContainers(allocatedContainers);
-
-    // Send all the GUARANTEED Reqs to RM
-    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
-    DistSchedAllocateResponse dsResp =
-        getNextInterceptor().allocateForDistributedScheduling(request);
-
-    // Update host to nodeId mapping
-    setNodeList(dsResp.getNodesForScheduling());
-    List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
-    for (NMToken nmToken : nmTokens) {
-      nodeTokens.put(nmToken.getNodeId(), nmToken);
-    }
-
-    List<ContainerStatus> completedContainers =
-        dsResp.getAllocateResponse().getCompletedContainersStatuses();
-
-    // Only account for opportunistic containers
-    for (ContainerStatus cs : completedContainers) {
-      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        containersAllocated.remove(cs.getContainerId());
-      }
-    }
-
-    // Check if we have NM tokens for all the allocated containers. If not
-    // generate one and update the response.
-    updateResponseWithNMTokens(
-        dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "Number of opportunistic containers currently allocated by" +
-              "application: " + containersAllocated.size());
-    }
-    return dsResp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 22a6a24..ce5bda0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -29,7 +29,7 @@ import 
org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import 
org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import 
org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -37,15 +37,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * <p>The OpportunisticContainerAllocator allocates containers on a given list
- * of Nodes after it modifies the container sizes to within allowable limits
- * specified by the <code>ClusterManager</code> running on the RM. It tries to
- * distribute the containers as evenly as possible. It also uses the
- * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
- * the allocated containers</p>
+ * <p>
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as 
possible.
+ * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
+ * required NM tokens for the allocated containers.
+ * </p>
  */
 public class OpportunisticContainerAllocator {
 
@@ -78,15 +80,15 @@ public class OpportunisticContainerAllocator {
     this.webpagePort = webpagePort;
   }
 
-  public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
-      ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
-      Set<String> blacklist, ApplicationAttemptId appAttId,
-      Map<String, NodeId> allNodes, String userName) throws YarnException {
+  public Map<Resource, List<Container>> allocate(
+      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+      Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
+      ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
+      String userName) throws YarnException {
     Map<Resource, List<Container>> containers = new HashMap<>();
-    Set<String> nodesAllocated = new HashSet<>();
     for (ResourceRequest anyAsk : resourceAsks) {
       allocateOpportunisticContainers(appParams, idCounter, blacklist, 
appAttId,
-          allNodes, userName, containers, nodesAllocated, anyAsk);
+          allNodes, userName, containers, anyAsk);
       LOG.info("Opportunistic allocation requested for ["
           + "priority=" + anyAsk.getPriority()
           + ", num_containers=" + anyAsk.getNumContainers()
@@ -96,30 +98,30 @@ public class OpportunisticContainerAllocator {
     return containers;
   }
 
-  private void allocateOpportunisticContainers(DistSchedulerParams appParams,
-      ContainerIdCounter idCounter, Set<String> blacklist,
-      ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
-      Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
-      ResourceRequest anyAsk) throws YarnException {
+  private void allocateOpportunisticContainers(
+      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+      Set<String> blacklist, ApplicationAttemptId id,
+      Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      throws YarnException {
     int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ?
-        0 : containers.get(anyAsk.getCapability()).size());
+        - (containers.isEmpty() ? 0 :
+            containers.get(anyAsk.getCapability()).size());
 
-    List<String> topKNodesLeft = new ArrayList<>();
-    for (String s : allNodes.keySet()) {
-      // Bias away from whatever we have already allocated and respect 
blacklist
-      if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+    List<NodeId> nodesForScheduling = new ArrayList<>();
+    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+      // Do not use blacklisted nodes for scheduling.
+      if (blacklist.contains(nodeEntry.getKey())) {
         continue;
       }
-      topKNodesLeft.add(s);
+      nodesForScheduling.add(nodeEntry.getValue());
     }
     int numAllocated = 0;
-    int nextNodeToAllocate = 0;
+    int nextNodeToSchedule = 0;
     for (int numCont = 0; numCont < toAllocate; numCont++) {
-      String topNode = topKNodesLeft.get(nextNodeToAllocate);
-      nextNodeToAllocate++;
-      nextNodeToAllocate %= topKNodesLeft.size();
-      NodeId nodeId = allNodes.get(topNode);
+      nextNodeToSchedule++;
+      nextNodeToSchedule %= nodesForScheduling.size();
+      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
       Container container = buildContainer(appParams, idCounter, anyAsk, id,
           userName, nodeId);
       List<Container> cList = containers.get(anyAsk.getCapability());
@@ -134,7 +136,7 @@ public class OpportunisticContainerAllocator {
     LOG.info("Allocated " + numAllocated + " opportunistic containers.");
   }
 
-  private Container buildContainer(DistSchedulerParams appParams,
+  private Container buildContainer(DistributedSchedulerParams appParams,
       ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId 
id,
       String userName, NodeId nodeId) throws YarnException {
     ContainerId cId =
@@ -165,7 +167,7 @@ public class OpportunisticContainerAllocator {
     return container;
   }
 
-  private Resource normalizeCapability(DistSchedulerParams appParams,
+  private Resource normalizeCapability(DistributedSchedulerParams appParams,
       ResourceRequest ask) {
     return Resources.normalize(RESOURCE_CALCULATOR,
         ask.getCapability(), appParams.minResource, appParams.maxResource,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
new file mode 100644
index 0000000..b093b3b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test cases for {@link DistributedScheduler}.
+ */
+public class TestDistributedScheduler {
+
+  @Test
+  public void testDistributedScheduler() throws Exception {
+
+    Configuration conf = new Configuration();
+    DistributedScheduler distributedScheduler = new DistributedScheduler();
+
+    RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
+
+    registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
+        NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
+
+    final AtomicBoolean flipFlag = new AtomicBoolean(true);
+    Mockito.when(
+        finalReqIntcptr.allocateForDistributedScheduling(
+            Mockito.any(DistributedSchedulingAllocateRequest.class)))
+        .thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
+          @Override
+          public DistributedSchedulingAllocateResponse answer(
+              InvocationOnMock invocationOnMock) throws Throwable {
+            flipFlag.set(!flipFlag.get());
+            if (flipFlag.get()) {
+              return createAllocateResponse(Arrays.asList(
+                  NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
+            } else {
+              return createAllocateResponse(Arrays.asList(
+                  NodeId.newInstance("d", 4), NodeId.newInstance("c", 3)));
+            }
+          }
+        });
+
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    ResourceRequest guaranteedReq =
+        createResourceRequest(ExecutionType.GUARANTEED, 5, "*");
+
+    ResourceRequest opportunisticReq =
+        createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
+
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 4 containers were allocated
+    AllocateResponse allocateResponse =
+        distributedScheduler.allocate(allocateRequest);
+    Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
+
+    // Verify equal distribution on hosts a and b, and none on c or d
+    Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
+
+    // New Allocate request
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq =
+        createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 6 containers were allocated
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
+    Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
+
+    // Verify new containers are equally distribution on hosts c and d,
+    // and none on a or b
+    allocs = mapAllocs(allocateResponse, 6);
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
+
+    // Ensure the DistributedScheduler respects the list order..
+    // The first request should be allocated to "d" since it is ranked higher
+    // The second request should be allocated to "c" since the ranking is
+    // flipped on every allocate response.
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq =
+        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
+    allocs = mapAllocs(allocateResponse, 1);
+    Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
+
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq =
+        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
+    allocs = mapAllocs(allocateResponse, 1);
+    Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
+
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq =
+        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
+    allocs = mapAllocs(allocateResponse, 1);
+    Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
+  }
+
+  private void registerAM(DistributedScheduler distributedScheduler,
+      RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
+      throws Exception {
+    RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
+        Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
+    distSchedRegisterResponse.setRegisterResponse(
+        Records.newRecord(RegisterApplicationMasterResponse.class));
+    distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
+    distSchedRegisterResponse.setContainerIdStart(0);
+    distSchedRegisterResponse.setMaxContainerResource(
+        Resource.newInstance(1024, 4));
+    distSchedRegisterResponse.setMinContainerResource(
+        Resource.newInstance(512, 2));
+    distSchedRegisterResponse.setNodesForScheduling(nodeList);
+    Mockito.when(
+        finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
+            Mockito.any(RegisterApplicationMasterRequest.class)))
+        .thenReturn(distSchedRegisterResponse);
+
+    distributedScheduler.registerApplicationMaster(
+        Records.newRecord(RegisterApplicationMasterRequest.class));
+  }
+
+  private RequestInterceptor setup(Configuration conf,
+      DistributedScheduler distributedScheduler) {
+    NodeStatusUpdater nodeStatusUpdater = 
Mockito.mock(NodeStatusUpdater.class);
+    Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
+    Context context = Mockito.mock(Context.class);
+    NMContainerTokenSecretManager nmContainerTokenSecretManager = new
+        NMContainerTokenSecretManager(conf);
+    MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+    nmContainerTokenSecretManager.setMasterKey(mKey);
+    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
+        (nmContainerTokenSecretManager);
+    OpportunisticContainerAllocator containerAllocator =
+        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+
+    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+        new NMTokenSecretManagerInNM();
+    nmTokenSecretManagerInNM.setMasterKey(mKey);
+    distributedScheduler.initLocal(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+        containerAllocator, nmTokenSecretManagerInNM, "test");
+
+    RequestInterceptor finalReqIntcptr = 
Mockito.mock(RequestInterceptor.class);
+    distributedScheduler.setNextInterceptor(finalReqIntcptr);
+    return finalReqIntcptr;
+  }
+
+  private ResourceRequest createResourceRequest(ExecutionType execType,
+      int numContainers, String resourceName) {
+    ResourceRequest opportunisticReq = 
Records.newRecord(ResourceRequest.class);
+    opportunisticReq.setExecutionTypeRequest(
+        ExecutionTypeRequest.newInstance(execType, true));
+    opportunisticReq.setNumContainers(numContainers);
+    opportunisticReq.setCapability(Resource.newInstance(1024, 4));
+    opportunisticReq.setPriority(Priority.newInstance(100));
+    opportunisticReq.setRelaxLocality(true);
+    opportunisticReq.setResourceName(resourceName);
+    return opportunisticReq;
+  }
+
+  private DistributedSchedulingAllocateResponse createAllocateResponse(
+      List<NodeId> nodes) {
+    DistributedSchedulingAllocateResponse distSchedAllocateResponse =
+        Records.newRecord(DistributedSchedulingAllocateResponse.class);
+    distSchedAllocateResponse
+        .setAllocateResponse(Records.newRecord(AllocateResponse.class));
+    distSchedAllocateResponse.setNodesForScheduling(nodes);
+    return distSchedAllocateResponse;
+  }
+
+  private Map<NodeId, List<ContainerId>> mapAllocs(
+      AllocateResponse allocateResponse, int expectedSize) throws Exception {
+    Assert.assertEquals(expectedSize,
+        allocateResponse.getAllocatedContainers().size());
+    Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
+    for (Container c : allocateResponse.getAllocatedContainers()) {
+      ContainerTokenIdentifier cTokId = BuilderUtils
+          .newContainerTokenIdentifier(c.getContainerToken());
+      Assert.assertEquals(
+          c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
+          cTokId.getNmHostAddress());
+      List<ContainerId> cIds = allocs.get(c.getNodeId());
+      if (cIds == null) {
+        cIds = new ArrayList<>();
+        allocs.put(c.getNodeId(), cIds);
+      }
+      cIds.add(c.getId());
+    }
+    return allocs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
deleted file mode 100644
index 8de849b..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMTokenSecretManagerInNM;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class TestLocalScheduler {
-
-  @Test
-  public void testLocalScheduler() throws Exception {
-
-    Configuration conf = new Configuration();
-    LocalScheduler localScheduler = new LocalScheduler();
-
-    RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
-
-    registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
-        NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
-
-    final AtomicBoolean flipFlag = new AtomicBoolean(false);
-    Mockito.when(
-        finalReqIntcptr.allocateForDistributedScheduling(
-            Mockito.any(DistSchedAllocateRequest.class)))
-        .thenAnswer(new Answer<DistSchedAllocateResponse>() {
-          @Override
-          public DistSchedAllocateResponse answer(InvocationOnMock
-              invocationOnMock) throws Throwable {
-            flipFlag.set(!flipFlag.get());
-            if (flipFlag.get()) {
-              return createAllocateResponse(Arrays.asList(
-                  NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
-            } else {
-              return createAllocateResponse(Arrays.asList(
-                  NodeId.newInstance("d", 4), NodeId.newInstance("c", 3)));
-            }
-          }
-        });
-
-    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
-    ResourceRequest guaranteedReq =
-        createResourceRequest(ExecutionType.GUARANTEED, 5, "*");
-
-    ResourceRequest opportunisticReq =
-        createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
-    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-
-    // Verify 4 containers were allocated
-    AllocateResponse allocateResponse =
-        localScheduler.allocate(allocateRequest);
-    Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
-
-    // Verify equal distribution on hosts a and b
-    // And None on c and d
-    Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
-    Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
-    Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
-    Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
-    Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
-
-    // New Allocate request
-    allocateRequest = Records.newRecord(AllocateRequest.class);
-    opportunisticReq =
-        createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*");
-    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-
-    // Verify 6 containers were allocated
-    allocateResponse = localScheduler.allocate(allocateRequest);
-    Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
-
-    // Verify New containers are equally distribution on hosts c and d
-    // And None on a and b
-    allocs = mapAllocs(allocateResponse, 6);
-    Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
-    Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
-    Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
-    Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
-
-    // Ensure the LocalScheduler respects the list order..
-    // The first request should be allocated to "d" since it is ranked higher
-    // The second request should be allocated to "c" since the ranking is
-    // flipped on every allocate response.
-    allocateRequest = Records.newRecord(AllocateRequest.class);
-    opportunisticReq =
-        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
-    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
-    allocs = mapAllocs(allocateResponse, 1);
-    Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
-
-    allocateRequest = Records.newRecord(AllocateRequest.class);
-    opportunisticReq =
-        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
-    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
-    allocs = mapAllocs(allocateResponse, 1);
-    Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
-
-    allocateRequest = Records.newRecord(AllocateRequest.class);
-    opportunisticReq =
-        createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
-    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
-    allocs = mapAllocs(allocateResponse, 1);
-    Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
-  }
-
-  private void registerAM(LocalScheduler localScheduler, RequestInterceptor
-      finalReqIntcptr, List<NodeId> nodeList) throws Exception {
-    DistSchedRegisterResponse distSchedRegisterResponse =
-        Records.newRecord(DistSchedRegisterResponse.class);
-    distSchedRegisterResponse.setRegisterResponse(
-        Records.newRecord(RegisterApplicationMasterResponse.class));
-    distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
-    distSchedRegisterResponse.setContainerIdStart(0);
-    distSchedRegisterResponse.setMaxAllocatableCapabilty(
-        Resource.newInstance(1024, 4));
-    distSchedRegisterResponse.setMinAllocatableCapabilty(
-        Resource.newInstance(512, 2));
-    distSchedRegisterResponse.setNodesForScheduling(nodeList);
-    Mockito.when(
-        finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
-            Mockito.any(RegisterApplicationMasterRequest.class)))
-        .thenReturn(distSchedRegisterResponse);
-
-    localScheduler.registerApplicationMaster(
-        Records.newRecord(RegisterApplicationMasterRequest.class));
-  }
-
-  private RequestInterceptor setup(Configuration conf, LocalScheduler
-      localScheduler) {
-    NodeStatusUpdater nodeStatusUpdater = 
Mockito.mock(NodeStatusUpdater.class);
-    Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
-    Context context = Mockito.mock(Context.class);
-    NMContainerTokenSecretManager nmContainerTokenSecretManager = new
-        NMContainerTokenSecretManager(conf);
-    MasterKey mKey = new MasterKey() {
-      @Override
-      public int getKeyId() {
-        return 1;
-      }
-      @Override
-      public void setKeyId(int keyId) {}
-      @Override
-      public ByteBuffer getBytes() {
-        return ByteBuffer.allocate(8);
-      }
-      @Override
-      public void setBytes(ByteBuffer bytes) {}
-    };
-    nmContainerTokenSecretManager.setMasterKey(mKey);
-    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
-        (nmContainerTokenSecretManager);
-    OpportunisticContainerAllocator containerAllocator =
-        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
-
-    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
-        new NMTokenSecretManagerInNM();
-    nmTokenSecretManagerInNM.setMasterKey(mKey);
-    localScheduler.initLocal(
-        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
-        containerAllocator, nmTokenSecretManagerInNM, "test");
-
-    RequestInterceptor finalReqIntcptr = 
Mockito.mock(RequestInterceptor.class);
-    localScheduler.setNextInterceptor(finalReqIntcptr);
-    return finalReqIntcptr;
-  }
-
-  private ResourceRequest createResourceRequest(ExecutionType execType,
-      int numContainers, String resourceName) {
-    ResourceRequest opportunisticReq = 
Records.newRecord(ResourceRequest.class);
-    opportunisticReq.setExecutionTypeRequest(
-        ExecutionTypeRequest.newInstance(execType, true));
-    opportunisticReq.setNumContainers(numContainers);
-    opportunisticReq.setCapability(Resource.newInstance(1024, 4));
-    opportunisticReq.setPriority(Priority.newInstance(100));
-    opportunisticReq.setRelaxLocality(true);
-    opportunisticReq.setResourceName(resourceName);
-    return opportunisticReq;
-  }
-
-  private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) 
{
-    DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
-        (DistSchedAllocateResponse.class);
-    distSchedAllocateResponse.setAllocateResponse(
-        Records.newRecord(AllocateResponse.class));
-    distSchedAllocateResponse.setNodesForScheduling(nodes);
-    return distSchedAllocateResponse;
-  }
-
-  private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
-      allocateResponse, int expectedSize) throws Exception {
-    Assert.assertEquals(expectedSize,
-        allocateResponse.getAllocatedContainers().size());
-    Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
-    for (Container c : allocateResponse.getAllocatedContainers()) {
-      ContainerTokenIdentifier cTokId = BuilderUtils
-          .newContainerTokenIdentifier(c.getContainerToken());
-      Assert.assertEquals(
-          c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
-          cTokId.getNmHostAddress());
-      List<ContainerId> cIds = allocs.get(c.getNodeId());
-      if (cIds == null) {
-        cIds = new ArrayList<>();
-        allocs.put(c.getNodeId(), cIds);
-      }
-      cIds.add(c.getId());
-    }
-    return allocs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
new file mode 100644
index 0000000..843ac09
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
+import 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import 
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
+
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The DistributedSchedulingAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class DistributedSchedulingAMService extends ApplicationMasterService
+    implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(DistributedSchedulingAMService.class);
+
+  private final NodeQueueLoadMonitor nodeMonitor;
+
+  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+      new ConcurrentHashMap<>();
+  private final int k;
+
+  public DistributedSchedulingAMService(RMContext rmContext,
+                                      YarnScheduler scheduler) {
+    super(DistributedSchedulingAMService.class.getName(), rmContext, 
scheduler);
+    this.k = rmContext.getYarnConfiguration().getInt(
+        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
+        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
+    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
+        YarnConfiguration.
+            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+    NodeQueueLoadMonitor.LoadComparator comparator =
+        NodeQueueLoadMonitor.LoadComparator.valueOf(
+            rmContext.getYarnConfiguration().get(
+                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
+                YarnConfiguration.
+                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
+
+    NodeQueueLoadMonitor topKSelector =
+        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+
+    float sigma = rmContext.getYarnConfiguration()
+        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
+            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
+
+    int limitMin, limitMax;
+
+    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
+    } else {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
+    }
+
+    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
+    this.nodeMonitor = topKSelector;
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
+        addr, serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    // To support application running on NMs that DO NOT support
+    // Dist Scheduling... The server multiplexes both the
+    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ApplicationMasterProtocolPB.class,
+        ApplicationMasterProtocolService.newReflectiveBlockingService(
+            new ApplicationMasterProtocolPBServiceImpl(this)));
+    return server;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public RegisterDistributedSchedulingAMResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+                YarnConfiguration.
+                    DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+                YarnConfiguration.
+                    DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
+                
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    dsResp.setNodesForScheduling(
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
+    return dsResp;
+  }
+
+  @Override
+  public DistributedSchedulingAllocateResponse 
allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
+    List<Container> distAllocContainers = request.getAllocatedContainers();
+    for (Container container : distAllocContainers) {
+      // Create RMContainer
+      SchedulerApplicationAttempt appAttempt =
+          ((AbstractYarnScheduler) rmContext.getScheduler())
+              .getCurrentAttemptForContainer(container.getId());
+      RMContainer rmContainer = new RMContainerImpl(container,
+          appAttempt.getApplicationAttemptId(), container.getNodeId(),
+          appAttempt.getUser(), rmContext, true);
+      appAttempt.addRMContainer(container.getId(), rmContainer);
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(),
+              RMContainerEventType.LAUNCHED));
+    }
+    AllocateResponse response = allocate(request.getAllocateRequest());
+    DistributedSchedulingAllocateResponse dsResp = recordFactory
+        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
+    return dsResp;
+  }
+
+  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+                            String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.add(nodeId);
+      }
+    }
+  }
+
+  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> 
mapping,
+                                 String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.remove(nodeId);
+      }
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+    case NODE_ADDED:
+      if (!(event instanceof NodeAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
+      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+          nodeAddedEvent.getAddedRMNode());
+      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      break;
+    case NODE_REMOVED:
+      if (!(event instanceof NodeRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeRemovedSchedulerEvent nodeRemovedEvent =
+          (NodeRemovedSchedulerEvent) event;
+      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+      removeFromMapping(rackToNode,
+          nodeRemovedEvent.getRemovedRMNode().getRackName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      removeFromMapping(hostToNode,
+          nodeRemovedEvent.getRemovedRMNode().getHostName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      break;
+    case NODE_UPDATE:
+      if (!(event instanceof NodeUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
+          event;
+      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+      break;
+    case NODE_RESOURCE_UPDATE:
+      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+          (NodeResourceUpdateSchedulerEvent) event;
+      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+          nodeResourceUpdatedEvent.getResourceOption());
+      break;
+
+    // <-- IGNORED EVENTS : START -->
+    case APP_ADDED:
+      break;
+    case APP_REMOVED:
+      break;
+    case APP_ATTEMPT_ADDED:
+      break;
+    case APP_ATTEMPT_REMOVED:
+      break;
+    case CONTAINER_EXPIRED:
+      break;
+    case NODE_LABELS_UPDATE:
+      break;
+    // <-- IGNORED EVENTS : END -->
+    default:
+      LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
+          + event.toString());
+    }
+
+  }
+
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return nodeMonitor.getThresholdCalculator();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to