Repository: hadoop
Updated Branches:
  refs/heads/branch-2 dfaac5643 -> 9942ca2bf


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942ca2b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 9e5a807..3555faa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -123,65 +123,72 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     return queue.getMetrics();
   }
 
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  public void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
-    Container container = rmContainer.getContainer();
-    ContainerId containerId = container.getId();
-    
-    // Remove from the list of newly allocated containers if found
-    newlyAllocatedContainers.remove(rmContainer);
-    
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerFinishedEvent(
-            containerId,
-            containerStatus,
-            event)
-    );
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Completed container: " + rmContainer.getContainerId() +
-              " in state: " + rmContainer.getState() + " event:" + event);
-    }
+    try {
+      writeLock.lock();
+      Container container = rmContainer.getContainer();
+      ContainerId containerId = container.getId();
+
+      // Remove from the list of newly allocated containers if found
+      newlyAllocatedContainers.remove(rmContainer);
+
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed container: " + rmContainer.getContainerId()
+            + " in state: " + rmContainer.getState() + " event:" + event);
+      }
+
+      // Remove from the list of containers
+      liveContainers.remove(rmContainer.getContainerId());
 
-    // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+      Resource containerResource = rmContainer.getContainer().getResource();
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId, containerResource);
 
-    Resource containerResource = rmContainer.getContainer().getResource();
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
-        getApplicationId(), containerId, containerResource);
-    
-    // Update usage metrics 
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    this.attemptResourceUsage.decUsed(containerResource);
+      // Update usage metrics
+      queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+      this.attemptResourceUsage.decUsed(containerResource);
 
-    // remove from preemption map if it is completed
-    preemptionMap.remove(rmContainer);
+      // remove from preemption map if it is completed
+      preemptionMap.remove(rmContainer);
 
-    // Clear resource utilization metrics cache.
-    lastMemoryAggregateAllocationUpdateTime = -1;
+      // Clear resource utilization metrics cache.
+      lastMemoryAggregateAllocationUpdateTime = -1;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized void unreserveInternal(
+  private void unreserveInternal(
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(schedulerKey);
-    RMContainer reservedContainer = 
reservedContainers.remove(node.getNodeID());
-    if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(schedulerKey);
-    }
-    
-    // Reset the re-reservation count
-    resetReReservations(schedulerKey);
+    try {
+      writeLock.lock();
+      Map<NodeId, RMContainer> reservedContainers = 
this.reservedContainers.get(
+          schedulerKey);
+      RMContainer reservedContainer = reservedContainers.remove(
+          node.getNodeID());
+      if (reservedContainers.isEmpty()) {
+        this.reservedContainers.remove(schedulerKey);
+      }
+
+      // Reset the re-reservation count
+      resetReReservations(schedulerKey);
 
-    Resource resource = reservedContainer.getContainer().getResource();
-    this.attemptResourceUsage.decReserved(resource);
+      Resource resource = reservedContainer.getContainer().getResource();
+      this.attemptResourceUsage.decReserved(resource);
 
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size()
-        + " at priority " + schedulerKey.getPriority() + "; currentReservation 
"
-        + this.attemptResourceUsage.getReserved());
+      LOG.info(
+          "Application " + getApplicationId() + " unreserved " + " on node "
+              + node + ", currently has " + reservedContainers.size()
+              + " at priority " + schedulerKey.getPriority()
+              + "; currentReservation " + this.attemptResourceUsage
+              .getReserved());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void subtractResourcesOnBlacklistedNodes(
@@ -239,17 +246,6 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     return headroom;
   }
 
-  public synchronized float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
-    
-    // waitFactor can't be more than '1' 
-    // i.e. no point skipping more than clustersize opportunities
-    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
-  }
-
   /**
    * Return the level at which we are allowed to schedule containers, given the
    * current size of the cluster and thresholds indicating how many nodes to
@@ -261,44 +257,56 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * @param rackLocalityThreshold rackLocalityThreshold
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevel(
+  NodeType getAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, int numNodes,
       double nodeLocalityThreshold, double rackLocalityThreshold) {
     // upper limit on threshold
-    if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
-    if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
+    if (nodeLocalityThreshold > 1.0) {
+      nodeLocalityThreshold = 1.0;
+    }
+    if (rackLocalityThreshold > 1.0) {
+      rackLocalityThreshold = 1.0;
+    }
 
     // If delay scheduling is not being used, can schedule anywhere
     if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // Default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
-
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+    try {
+      writeLock.lock();
 
-    // If level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
+      // Default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
+      }
 
-    double threshold = allowed.equals(NodeType.NODE_LOCAL) ? 
nodeLocalityThreshold :
-      rackLocalityThreshold;
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // Relax locality constraints once we've surpassed threshold.
-    if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey);
+      // If level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey);
+
+      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityThreshold :
+          rackLocalityThreshold;
+
+      // Relax locality constraints once we've surpassed threshold.
+      if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
   /**
@@ -311,119 +319,131 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * @param currentTimeMs currentTimeMs
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevelByTime(
+  NodeType getAllowedLocalityLevelByTime(
       SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
       long rackLocalityDelayMs, long currentTimeMs) {
-
     // if not being used, can schedule anywhere
     if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      // add the initial time of priority to prevent comparing with FsApp
-      // startTime and allowedLocalityLevel degrade
-      lastScheduledContainer.put(schedulerKey, currentTimeMs);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Init the lastScheduledContainer time, priority: "
-            + schedulerKey.getPriority() + ", time: " + currentTimeMs);
+    try {
+      writeLock.lock();
+
+      // default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        // add the initial time of priority to prevent comparing with FsApp
+        // startTime and allowedLocalityLevel degrade
+        lastScheduledContainer.put(schedulerKey, currentTimeMs);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Init the lastScheduledContainer time, priority: " + schedulerKey
+                  .getPriority() + ", time: " + currentTimeMs);
+        }
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
       }
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
 
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // if level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) {
-      return NodeType.OFF_SWITCH;
-    }
-
-    // check waiting time
-    long waitTime = currentTimeMs;
-    if (lastScheduledContainer.containsKey(schedulerKey)) {
-      waitTime -= lastScheduledContainer.get(schedulerKey);
-    } else {
-      waitTime -= getStartTime();
-    }
+      // if level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
+      }
 
-    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
-            nodeLocalityDelayMs : rackLocalityDelayMs;
+      // check waiting time
+      long waitTime = currentTimeMs;
+      if (lastScheduledContainer.containsKey(schedulerKey)) {
+        waitTime -= lastScheduledContainer.get(schedulerKey);
+      } else{
+        waitTime -= getStartTime();
+      }
 
-    if (waitTime > thresholdTime) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
-      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+      long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityDelayMs :
+          rackLocalityDelayMs;
+
+      if (waitTime > thresholdTime) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
-  synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
+  public RMContainer allocate(NodeType type, FSSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container reservedContainer) {
-    // Update allowed locality level
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
-    if (allowed != null) {
-      if (allowed.equals(NodeType.OFF_SWITCH) &&
-          (type.equals(NodeType.NODE_LOCAL) ||
-              type.equals(NodeType.RACK_LOCAL))) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+    RMContainer rmContainer;
+    Container container;
+
+    try {
+      writeLock.lock();
+      // Update allowed locality level
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      if (allowed != null) {
+        if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
+            NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
+            NodeType.NODE_LOCAL)) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        }
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL) &&
-          type.equals(NodeType.NODE_LOCAL)) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
       }
-    }
 
-    // Required sanity check - AM can call 'allocate' to update resource 
-    // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(schedulerKey) <= 0) {
-      return null;
-    }
+      container = reservedContainer;
+      if (container == null) {
+        container = createContainer(node, request.getCapability(),
+            schedulerKey);
+      }
 
-    Container container = reservedContainer;
-    if (container == null) {
-      container =
-          createContainer(node, request.getCapability(), schedulerKey);
-    }
-    
-    // Create RMContainer
-    RMContainer rmContainer = new RMContainerImpl(container,
-        getApplicationAttemptId(), node.getNodeID(),
-        appSchedulingInfo.getUser(), rmContext);
-    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
+      // Create RMContainer
+      rmContainer = new RMContainerImpl(container,
+          getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), rmContext);
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
-    liveContainers.put(container.getId(), rmContainer);    
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
+      liveContainers.put(container.getId(), rmContainer);
 
-    // Update consumption and track allocations
-    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, schedulerKey, request, container);
-    this.attemptResourceUsage.incUsed(container.getResource());
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
+      this.attemptResourceUsage.incUsed(container.getResource());
 
-    // Update resource requests related to "request" and store in RMContainer
-    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + container.getId()
+            .getApplicationAttemptId() + " container=" + container.getId()
+            + " host=" + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+          "SchedulerApp", getApplicationId(), container.getId(),
+          container.getResource());
+    } finally {
+      writeLock.unlock();
+    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationAttemptId=" 
-          + container.getId().getApplicationAttemptId() 
-          + " container=" + container.getId() + " host="
-          + container.getNodeId().getHost() + " type=" + type);
-    }
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 
-        getApplicationId(), container.getId(), container.getResource());
-    
     return rmContainer;
   }
 
@@ -434,19 +454,30 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * @param schedulerKey Scheduler Key
    * @param level NodeType
    */
-  public synchronized void resetAllowedLocalityLevel(
+  public void resetAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, NodeType level) {
-    NodeType old = allowedLocalityLevel.get(schedulerKey);
-    LOG.info("Raising locality level from " + old + " to " + level + " at " +
-        " priority " + schedulerKey.getPriority());
-    allowedLocalityLevel.put(schedulerKey, level);
+    NodeType old;
+    try {
+      writeLock.lock();
+      old = allowedLocalityLevel.put(schedulerKey, level);
+    } finally {
+      writeLock.unlock();
+    }
+
+    LOG.info("Raising locality level from " + old + " to " + level + " at "
+        + " priority " + schedulerKey.getPriority());
   }
 
   // related methods
   public void addPreemption(RMContainer container, long time) {
     assert preemptionMap.get(container) == null;
-    preemptionMap.put(container, time);
-    Resources.addTo(preemptedResources, container.getAllocatedResource());
+    try {
+      writeLock.lock();
+      preemptionMap.put(container, time);
+      Resources.addTo(preemptedResources, container.getAllocatedResource());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public Long getContainerPreemptionTime(RMContainer container) {
@@ -584,21 +615,35 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
         getUser(), rmContainer.getContainer().getResource());
   }
 
-  private synchronized void setReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations == null) {
-      rackReservations = new HashSet<>();
-      reservations.put(rackName, rackReservations);
+  private void setReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations == null) {
+        rackReservations = new HashSet<>();
+        reservations.put(rackName, rackReservations);
+      }
+      rackReservations.add(node.getNodeName());
+    } finally {
+      writeLock.unlock();
     }
-    rackReservations.add(node.getNodeName());
   }
 
-  private synchronized void clearReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations != null) {
-      rackReservations.remove(node.getNodeName());
+  private void clearReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations != null) {
+        rackReservations.remove(node.getNodeName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -737,7 +782,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     // For each priority, see if we can schedule a node local, rack local
     // or off-switch request. Rack of off-switch requests may be delayed
     // (not scheduled) in order to promote better locality.
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey schedulerKey : keysToTry) {
         // Skip it for reserved container, since
         // we already check it in isValidReservation.
@@ -772,8 +818,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
-          return assignContainer(node, localRequest,
-              NodeType.NODE_LOCAL, reserved, schedulerKey);
+          return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
+              reserved, schedulerKey);
         }
 
         if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@@ -781,29 +827,31 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
         }
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
-            && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
-            allowedLocality.equals(NodeType.OFF_SWITCH))) {
-          return assignContainer(node, rackLocalRequest,
-              NodeType.RACK_LOCAL, reserved, schedulerKey);
+            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
+            .equals(NodeType.OFF_SWITCH))) {
+          return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
+              reserved, schedulerKey);
         }
 
-        ResourceRequest offSwitchRequest =
-            getResourceRequest(schedulerKey, ResourceRequest.ANY);
+        ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
+            ResourceRequest.ANY);
         if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
           continue;
         }
 
-        if (offSwitchRequest != null &&
-            offSwitchRequest.getNumContainers() != 0) {
-          if (!hasNodeOrRackLocalRequests(schedulerKey) ||
-              allowedLocality.equals(NodeType.OFF_SWITCH)) {
-            return assignContainer(
-                node, offSwitchRequest, NodeType.OFF_SWITCH, reserved,
-                schedulerKey);
+        if (offSwitchRequest != null
+            && offSwitchRequest.getNumContainers() != 0) {
+          if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
+              .equals(NodeType.OFF_SWITCH)) {
+            return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
+                reserved, schedulerKey);
           }
         }
       }
+    } finally {
+      writeLock.unlock();
     }
+
     return Resources.none();
   }
 
@@ -963,14 +1011,17 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     Resources.addTo(demand, getCurrentConsumption());
 
     // Add up outstanding resource requests
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey k : getSchedulerKeys()) {
         ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
         if (r != null) {
-          Resources.multiplyAndAddTo(demand,
-              r.getCapability(), r.getNumContainers());
+          Resources.multiplyAndAddTo(demand, r.getCapability(),
+              r.getNumContainers());
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to