YARN-3139. Improve locks in 
AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/caafa980
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/caafa980
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/caafa980

Branch: refs/heads/branch-2
Commit: caafa980af9a19427855df1d4b1d5b7681c3944e
Parents: 69c1ab4
Author: Jian He <jia...@apache.org>
Authored: Thu Oct 6 07:54:22 2016 -0700
Committer: Jian He <jia...@apache.org>
Committed: Thu Oct 6 07:55:14 2016 -0700

----------------------------------------------------------------------
 .../server/resourcemanager/RMServerUtils.java   |    5 +-
 .../scheduler/AbstractYarnScheduler.java        |  416 +++--
 .../scheduler/SchedulerApplicationAttempt.java  |    8 +-
 .../scheduler/capacity/CapacityScheduler.java   | 1731 ++++++++++--------
 .../scheduler/capacity/LeafQueue.java           |   17 +
 .../scheduler/common/fica/FiCaSchedulerApp.java |   16 +-
 .../scheduler/fair/FairScheduler.java           | 1048 ++++++-----
 7 files changed, 1755 insertions(+), 1486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index b90e499..b2a085a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -211,10 +211,7 @@ public class RMServerUtils {
   }
 
   /**
-   * Validate increase/decrease request. This function must be called under
-   * the queue lock to make sure that the access to container resource is
-   * atomic. Refer to LeafQueue.decreaseContainer() and
-   * CapacityScheduelr.updateIncreaseRequests()
+   * Validate increase/decrease request.
    * <pre>
    * - Throw exception when any other error happens
    * </pre>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 45415de..645e06d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,8 +73,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .LeafQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
@@ -94,7 +93,7 @@ public abstract class AbstractYarnScheduler
 
   protected Resource minimumAllocation;
 
-  protected RMContext rmContext;
+  protected volatile RMContext rmContext;
   
   private volatile Priority maxClusterLevelAppPriority;
 
@@ -112,6 +111,18 @@ public abstract class AbstractYarnScheduler
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
     EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
+  protected final ReentrantReadWriteLock.ReadLock readLock;
+
+  /*
+   * Use writeLock for any of operations below:
+   * - queue change (hierarchy / configuration / container allocation)
+   * - application(add/remove/allocate-container, but not include container
+   *   finish)
+   * - node (add/remove/change-resource/container-allocation, but not include
+   *   container finish)
+   */
+  protected final ReentrantReadWriteLock.WriteLock writeLock;
+
   /**
    * Construct the service.
    *
@@ -119,6 +130,9 @@ public abstract class AbstractYarnScheduler
    */
   public AbstractYarnScheduler(String name) {
     super(name);
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
   }
 
   @Override
@@ -141,6 +155,10 @@ public abstract class AbstractYarnScheduler
     return nodeTracker;
   }
 
+  /*
+   * YARN-3136 removed synchronized lock for this method for performance
+   * purposes
+   */
   public List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
@@ -155,9 +173,8 @@ public abstract class AbstractYarnScheduler
     }
     Collection<RMContainer> liveContainers =
         app.getCurrentAppAttempt().getLiveContainers();
-    ContainerId amContainerId =
-        rmContext.getRMApps().get(appId).getCurrentAppAttempt()
-          .getMasterContainer().getId();
+    ContainerId amContainerId = rmContext.getRMApps().get(appId)
+        .getCurrentAppAttempt().getMasterContainer().getId();
     for (RMContainer rmContainer : liveContainers) {
       if (!rmContainer.getContainerId().equals(amContainerId)) {
         containerList.add(rmContainer.getContainer());
@@ -211,54 +228,59 @@ public abstract class AbstractYarnScheduler
     nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
   }
 
-  protected synchronized void containerLaunchedOnNode(
+  protected void containerLaunchedOnNode(
       ContainerId containerId, SchedulerNode node) {
-    // Get the application for the finished container
-    SchedulerApplicationAttempt application =
-        getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application " + containerId.getApplicationAttemptId()
-          .getApplicationId() + " launched container " + containerId
-          + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
+    try {
+      readLock.lock();
+      // Get the application for the finished container
+      SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
+          containerId);
+      if (application == null) {
+        LOG.info("Unknown application " + containerId.getApplicationAttemptId()
+            .getApplicationId() + " launched container " + containerId
+            + " on node: " + node);
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+        return;
+      }
 
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
+      application.containerLaunchedOnNode(containerId, node.getNodeID());
+    } finally {
+      readLock.unlock();
+    }
   }
   
   protected void containerIncreasedOnNode(ContainerId containerId,
       SchedulerNode node, Container increasedContainerReportedByNM) {
+    /*
+     * No lock is required, as this method is protected by scheduler's 
writeLock
+     */
     // Get the application for the finished container
-    SchedulerApplicationAttempt application =
-        getCurrentAttemptForContainer(containerId);
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
+        containerId);
     if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " increased container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMNodeCleanContainerEvent(node.getNodeID(), 
containerId));
+      LOG.info("Unknown application " + containerId.getApplicationAttemptId()
+          .getApplicationId() + " increased container " + containerId
+          + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
     }
-    LeafQueue leafQueue = (LeafQueue) application.getQueue();
-    synchronized (leafQueue) {
-      RMContainer rmContainer = getRMContainer(containerId);
-      if (rmContainer == null) {
-        // Some unknown container sneaked into the system. Kill it.
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMNodeCleanContainerEvent(
-                node.getNodeID(), containerId));
-        return;
-      }
-      rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
-          containerId, increasedContainerReportedByNM.getResource()));
+
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
     }
+    rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
+        increasedContainerReportedByNM.getResource()));
   }
 
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication<T> app =
-        applications.get(applicationAttemptId.getApplicationId());
+    SchedulerApplication<T> app = applications.get(
+        applicationAttemptId.getApplicationId());
     return app == null ? null : app.getCurrentAppAttempt();
   }
 
@@ -338,96 +360,101 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  public synchronized void recoverContainersOnNode(
+  public void recoverContainersOnNode(
       List<NMContainerStatus> containerReports, RMNode nm) {
-    if (!rmContext.isWorkPreservingRecoveryEnabled()
-        || containerReports == null
-        || (containerReports != null && containerReports.isEmpty())) {
-      return;
-    }
-
-    for (NMContainerStatus container : containerReports) {
-      ApplicationId appId =
-          
container.getContainerId().getApplicationAttemptId().getApplicationId();
-      RMApp rmApp = rmContext.getRMApps().get(appId);
-      if (rmApp == null) {
-        LOG.error("Skip recovering container " + container
-            + " for unknown application.");
-        killOrphanContainerOnNode(nm, container);
-        continue;
+    try {
+      writeLock.lock();
+      if (!rmContext.isWorkPreservingRecoveryEnabled()
+          || containerReports == null || (containerReports != null
+          && containerReports.isEmpty())) {
+        return;
       }
 
-      SchedulerApplication<T> schedulerApp = applications.get(appId);
-      if (schedulerApp == null) {
-        LOG.info("Skip recovering container  " + container
-            + " for unknown SchedulerApplication. Application current state is 
"
-            + rmApp.getState());
-        killOrphanContainerOnNode(nm, container);
-        continue;
-      }
+      for (NMContainerStatus container : containerReports) {
+        ApplicationId appId =
+            container.getContainerId().getApplicationAttemptId()
+                .getApplicationId();
+        RMApp rmApp = rmContext.getRMApps().get(appId);
+        if (rmApp == null) {
+          LOG.error("Skip recovering container " + container
+              + " for unknown application.");
+          killOrphanContainerOnNode(nm, container);
+          continue;
+        }
 
-      LOG.info("Recovering container " + container);
-      SchedulerApplicationAttempt schedulerAttempt =
-          schedulerApp.getCurrentAppAttempt();
-
-      if (!rmApp.getApplicationSubmissionContext()
-        .getKeepContainersAcrossApplicationAttempts()) {
-        // Do not recover containers for stopped attempt or previous attempt.
-        if (schedulerAttempt.isStopped()
-            || !schedulerAttempt.getApplicationAttemptId().equals(
-              container.getContainerId().getApplicationAttemptId())) {
-          LOG.info("Skip recovering container " + container
-              + " for already stopped attempt.");
+        SchedulerApplication<T> schedulerApp = applications.get(appId);
+        if (schedulerApp == null) {
+          LOG.info("Skip recovering container  " + container
+              + " for unknown SchedulerApplication. "
+              + "Application current state is " + rmApp.getState());
           killOrphanContainerOnNode(nm, container);
           continue;
         }
-      }
 
-      // create container
-      RMContainer rmContainer = recoverAndCreateContainer(container, nm);
-
-      // recover RMContainer
-      rmContainer.handle(new 
RMContainerRecoverEvent(container.getContainerId(),
-        container));
-
-      // recover scheduler node
-      SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
-      schedulerNode.recoverContainer(rmContainer);
-
-      // recover queue: update headroom etc.
-      Queue queue = schedulerAttempt.getQueue();
-      queue.recoverContainer(
-          getClusterResource(), schedulerAttempt, rmContainer);
-
-      // recover scheduler attempt
-      schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
-            
-      // set master container for the current running AMContainer for this
-      // attempt.
-      RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
-      if (appAttempt != null) {
-        Container masterContainer = appAttempt.getMasterContainer();
-
-        // Mark current running AMContainer's RMContainer based on the master
-        // container ID stored in AppAttempt.
-        if (masterContainer != null
-            && masterContainer.getId().equals(rmContainer.getContainerId())) {
-          ((RMContainerImpl)rmContainer).setAMContainer(true);
+        LOG.info("Recovering container " + container);
+        SchedulerApplicationAttempt schedulerAttempt =
+            schedulerApp.getCurrentAppAttempt();
+
+        if (!rmApp.getApplicationSubmissionContext()
+            .getKeepContainersAcrossApplicationAttempts()) {
+          // Do not recover containers for stopped attempt or previous attempt.
+          if (schedulerAttempt.isStopped() || !schedulerAttempt
+              .getApplicationAttemptId().equals(
+                  container.getContainerId().getApplicationAttemptId())) {
+            LOG.info("Skip recovering container " + container
+                + " for already stopped attempt.");
+            killOrphanContainerOnNode(nm, container);
+            continue;
+          }
         }
-      }
 
-      synchronized (schedulerAttempt) {
-        Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
-        if (releases.contains(container.getContainerId())) {
+        // create container
+        RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+        // recover RMContainer
+        rmContainer.handle(
+            new RMContainerRecoverEvent(container.getContainerId(), 
container));
+
+        // recover scheduler node
+        SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
+        schedulerNode.recoverContainer(rmContainer);
+
+        // recover queue: update headroom etc.
+        Queue queue = schedulerAttempt.getQueue();
+        queue.recoverContainer(getClusterResource(), schedulerAttempt,
+            rmContainer);
+
+        // recover scheduler attempt
+        schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
+
+        // set master container for the current running AMContainer for this
+        // attempt.
+        RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+        if (appAttempt != null) {
+          Container masterContainer = appAttempt.getMasterContainer();
+
+          // Mark current running AMContainer's RMContainer based on the master
+          // container ID stored in AppAttempt.
+          if (masterContainer != null && masterContainer.getId().equals(
+              rmContainer.getContainerId())) {
+            ((RMContainerImpl) rmContainer).setAMContainer(true);
+          }
+        }
+
+        if (schedulerAttempt.getPendingRelease().remove(
+            container.getContainerId())) {
           // release the container
-          rmContainer.handle(new RMContainerFinishedEvent(container
-            .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
-            container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
-            RMContainerEventType.RELEASED));
-          releases.remove(container.getContainerId());
+          rmContainer.handle(
+              new RMContainerFinishedEvent(container.getContainerId(),
+                  SchedulerUtils
+                      
.createAbnormalContainerStatus(container.getContainerId(),
+                          SchedulerUtils.RELEASED_CONTAINER),
+                  RMContainerEventType.RELEASED));
           LOG.info(container.getContainerId() + " is released by 
application.");
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -492,17 +519,15 @@ public abstract class AbstractYarnScheduler
     for (SchedulerApplication<T> app : applications.values()) {
       T attempt = app.getCurrentAppAttempt();
       if (attempt != null) {
-        synchronized (attempt) {
-          for (ContainerId containerId : attempt.getPendingRelease()) {
-            RMAuditLogger.logFailure(app.getUser(),
-                AuditConstants.RELEASE_CONTAINER,
-                "Unauthorized access or invalid container", "Scheduler",
-                "Trying to release container not owned by app "
-                    + "or with invalid id.", attempt.getApplicationId(),
-                containerId, null);
-          }
-          attempt.getPendingRelease().clear();
+        for (ContainerId containerId : attempt.getPendingRelease()) {
+          RMAuditLogger.logFailure(app.getUser(),
+              AuditConstants.RELEASE_CONTAINER,
+              "Unauthorized access or invalid container", "Scheduler",
+              "Trying to release container not owned by app "
+                  + "or with invalid id.", attempt.getApplicationId(),
+              containerId, null);
         }
+        attempt.getPendingRelease().clear();
       }
     }
   }
@@ -558,9 +583,7 @@ public abstract class AbstractYarnScheduler
             < nmExpireInterval) {
           LOG.info(containerId + " doesn't exist. Add the container"
               + " to the release request cache as it maybe on recovery.");
-          synchronized (attempt) {
-            attempt.getPendingRelease().add(containerId);
-          }
+          attempt.getPendingRelease().add(containerId);
         } else {
           RMAuditLogger.logFailure(attempt.getUser(),
             AuditConstants.RELEASE_CONTAINER,
@@ -603,81 +626,92 @@ public abstract class AbstractYarnScheduler
   }
 
   @Override
-  public synchronized void moveAllApps(String sourceQueue, String destQueue)
+  public void moveAllApps(String sourceQueue, String destQueue)
       throws YarnException {
-    // check if destination queue is a valid leaf queue
     try {
-      getQueueInfo(destQueue, false, false);
-    } catch (IOException e) {
-      LOG.warn(e);
-      throw new YarnException(e);
-    }
-    // check if source queue is a valid
-    List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
-    if (apps == null) {
-      String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
-      LOG.warn(errMsg);
-      throw new YarnException(errMsg);
-    }
-    // generate move events for each pending/running app
-    for (ApplicationAttemptId app : apps) {
-      SettableFuture<Object> future = SettableFuture.create();
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, 
future));
+      writeLock.lock();
+      // check if destination queue is a valid leaf queue
+      try {
+        getQueueInfo(destQueue, false, false);
+      } catch (IOException e) {
+        LOG.warn(e);
+        throw new YarnException(e);
+      }
+      // check if source queue is a valid
+      List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+      if (apps == null) {
+        String errMsg =
+            "The specified Queue: " + sourceQueue + " doesn't exist";
+        LOG.warn(errMsg);
+        throw new YarnException(errMsg);
+      }
+      // generate move events for each pending/running app
+      for (ApplicationAttemptId app : apps) {
+        SettableFuture<Object> future = SettableFuture.create();
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
   @Override
-  public synchronized void killAllAppsInQueue(String queueName)
+  public void killAllAppsInQueue(String queueName)
       throws YarnException {
-    // check if queue is a valid
-    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
-    if (apps == null) {
-      String errMsg = "The specified Queue: " + queueName + " doesn't exist";
-      LOG.warn(errMsg);
-      throw new YarnException(errMsg);
-    }
-    // generate kill events for each pending/running app
-    for (ApplicationAttemptId app : apps) {
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
-          "Application killed due to expiry of reservation queue " +
-          queueName + "."));
+    try {
+      writeLock.lock();
+      // check if queue is a valid
+      List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+      if (apps == null) {
+        String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+        LOG.warn(errMsg);
+        throw new YarnException(errMsg);
+      }
+      // generate kill events for each pending/running app
+      for (ApplicationAttemptId app : apps) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
+                "Application killed due to expiry of reservation queue "
+                    + queueName + "."));
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
   /**
    * Process resource update on a node.
    */
-  public synchronized void updateNodeResource(RMNode nm, 
+  public void updateNodeResource(RMNode nm,
       ResourceOption resourceOption) {
-    SchedulerNode node = getSchedulerNode(nm.getNodeID());
-    Resource newResource = resourceOption.getResource();
-    Resource oldResource = node.getTotalResource();
-    if(!oldResource.equals(newResource)) {
-      // Notify NodeLabelsManager about this change
-      rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
-          newResource);
-      
-      // Log resource change
-      LOG.info("Update resource on node: " + node.getNodeName()
-          + " from: " + oldResource + ", to: "
-          + newResource);
-
-      nodeTracker.removeNode(nm.getNodeID());
-
-      // update resource to node
-      node.updateTotalResource(newResource);
-
-      nodeTracker.addNode((N) node);
-    } else {
-      // Log resource change
-      LOG.warn("Update resource on node: " + node.getNodeName() 
-          + " with the same resource: " + newResource);
+    try {
+      writeLock.lock();
+      SchedulerNode node = getSchedulerNode(nm.getNodeID());
+      Resource newResource = resourceOption.getResource();
+      Resource oldResource = node.getTotalResource();
+      if (!oldResource.equals(newResource)) {
+        // Notify NodeLabelsManager about this change
+        rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
+            newResource);
+
+        // Log resource change
+        LOG.info("Update resource on node: " + node.getNodeName() + " from: "
+            + oldResource + ", to: " + newResource);
+
+        nodeTracker.removeNode(nm.getNodeID());
+
+        // update resource to node
+        node.updateTotalResource(newResource);
+
+        nodeTracker.addNode((N) node);
+      } else{
+        // Log resource change
+        LOG.warn("Update resource on node: " + node.getNodeName()
+            + " with the same resource: " + newResource);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -735,7 +769,7 @@ public abstract class AbstractYarnScheduler
   }
 
   @Override
-  public synchronized void setClusterMaxPriority(Configuration conf)
+  public void setClusterMaxPriority(Configuration conf)
       throws YarnException {
     try {
       maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index adc3a97..0f4ad11 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -178,7 +179,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
     this.queue = queue;
-    this.pendingRelease = new HashSet<ContainerId>();
+    this.pendingRelease = Collections.newSetFromMap(
+        new ConcurrentHashMap<ContainerId, Boolean>());
     this.attemptId = applicationAttemptId;
     if (rmContext.getRMApps() != null &&
         rmContext.getRMApps()
@@ -1153,6 +1155,10 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     // queue's resource usage for specific partition
   }
 
+  public ReentrantReadWriteLock.WriteLock getWriteLock() {
+    return writeLock;
+  }
+
   @Override
   public boolean isRecovering() {
     return isAttemptRecovering;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to