This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ecf665c6facf YARN-11191. Fix potentional deadlock in GlobalScheduler 
refreshQueues (#6732)
ecf665c6facf is described below

commit ecf665c6facf89d3b87b6e3cc684274b8155ca60
Author: Tamas Domok <tdo...@cloudera.com>
AuthorDate: Wed Apr 24 14:58:50 2024 +0200

    YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues 
(#6732)
---
 .../scheduler/capacity/AbstractLeafQueue.java      |  5 ++
 .../scheduler/capacity/AbstractParentQueue.java    | 13 ++++
 .../scheduler/capacity/CSQueue.java                |  6 ++
 .../capacity/preemption/PreemptionManager.java     |  6 +-
 .../scheduler/capacity/TestCapacityScheduler.java  | 78 ++++++++++++++++++++++
 5 files changed, 106 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index e40b8aaeb0bb..565f89de3208 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -369,6 +369,11 @@ public class AbstractLeafQueue extends AbstractCSQueue {
     return null;
   }
 
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    return null;
+  }
+
   /**
    * Set user limit.
    * @param userLimit new user limit
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
index 9cb545d37808..87333bf50a14 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
@@ -1347,6 +1348,18 @@ public abstract class AbstractParentQueue extends 
AbstractCSQueue {
 
   }
 
+  @Override
+  public List<CSQueue> getChildQueuesByTryLock() {
+    try {
+      while (!readLock.tryLock()){
+        LockSupport.parkNanos(10000);
+      }
+      return new ArrayList<>(childQueues);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public void recoverContainer(Resource clusterResource,
       SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index a5672a8bcdf0..df3199220b28 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -175,6 +175,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return child queues
    */
   public List<CSQueue> getChildQueues();
+
+  /**
+   * Get child queues By tryLock.
+   * @return child queues
+   */
+  List<CSQueue> getChildQueuesByTryLock();
   
   /**
    * Check if the <code>user</code> has permission to perform the operation
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
index 408198f70461..3aab8e8a5090 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -55,8 +56,9 @@ public class PreemptionManager {
             new PreemptableQueue(parentEntity));
       }
 
-      if (current.getChildQueues() != null) {
-        for (CSQueue child : current.getChildQueues()) {
+      List<CSQueue> childQueues = current.getChildQueuesByTryLock();
+      if (childQueues != null) {
+        for (CSQueue child : childQueues) {
           refreshQueues(current, child);
         }
       }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 3bef7125470f..bddba79f6c66 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -3047,4 +3048,81 @@ public class TestCapacityScheduler {
     Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
     rm1.close();
   }
+
+  /**
+   * (YARN-11191) This test ensures that no deadlock happens while the
+   * refreshQueues is called on the preemptionManager (refresh thread) and the
+   * AbstractCSQueue.getTotalKillableResource is called from the schedule 
thread.
+   *
+   * @throws Exception TestTimedOutException means deadlock
+   */
+  @Test (timeout = 20000)
+  public void testRefreshQueueWithOpenPreemption() throws Exception {
+    CapacitySchedulerConfiguration csConf = new 
CapacitySchedulerConfiguration();
+    csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT), new 
String[]{"a"});
+    QueuePath a = new QueuePath("root.a");
+    csConf.setCapacity(a, 100);
+    csConf.setQueues(a, new String[]{"b"});
+    QueuePath b = new QueuePath("root.a.b");
+    csConf.setCapacity(b, 100);
+
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    try (MockRM rm = new MockRM(csConf)) {
+      CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+      PreemptionManager preemptionManager = scheduler.getPreemptionManager();
+      rm.getRMContext().setNodeLabelManager(mgr);
+      rm.start();
+
+      AbstractParentQueue queue = (AbstractParentQueue) 
scheduler.getQueue("a");
+
+      // The scheduler thread holds the queue's read-lock for 5 seconds
+      // then the preemption's read-lock is used
+      Thread schedulerThread = new Thread(() -> {
+        queue.readLock.lock();
+        try {
+          Thread.sleep(5 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.getKillableContainers("a",
+            queue.getDefaultNodeLabelExpression());
+        queue.readLock.unlock();
+      }, "SCHEDULE");
+
+      // The complete thread locks/unlocks the queue's write-lock after 1 
seconds
+      Thread completeThread = new Thread(() -> {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        queue.writeLock.lock();
+        queue.writeLock.unlock();
+      }, "COMPLETE");
+
+
+      // The refresh thread holds the preemption's write-lock after 2 seconds
+      // while it calls the getChildQueues(ByTryLock) that
+      // locks(tryLocks) the queue's read-lock
+      Thread refreshThread = new Thread(() -> {
+        try {
+          Thread.sleep(2 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        preemptionManager.refreshQueues(queue.getParent(), queue);
+      }, "REFRESH");
+      schedulerThread.start();
+      completeThread.start();
+      refreshThread.start();
+
+      schedulerThread.join();
+      completeThread.join();
+      refreshThread.join();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to