This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f05974ba0a4 YARN-11785. Race condition in QueueMetrics due to 
non-thread-safe HashMap causes MetricsException. (#7459) Contributed by Tao 
Yang.
f05974ba0a4 is described below

commit f05974ba0a4d0fba11060c11b99d75131e13dfb6
Author: Tao Yang <taoy...@apache.org>
AuthorDate: Wed Mar 5 12:28:45 2025 +0800

    YARN-11785. Race condition in QueueMetrics due to non-thread-safe HashMap 
causes MetricsException. (#7459) Contributed by Tao Yang.
    
    * YARN-11785. Race condition in QueueMetrics due to non-thread-safe HashMap 
causes MetricsException.
    
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../resourcemanager/scheduler/QueueMetrics.java    |  3 +-
 .../scheduler/TestQueueMetrics.java                | 61 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 11672439c61..53d2f261f10 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -253,7 +254,7 @@ public synchronized static void clearQueueMetrics() {
    * Simple metrics cache to help prevent re-registrations.
    */
   private static final Map<String, QueueMetrics> QUEUE_METRICS =
-      new HashMap<String, QueueMetrics>();
+      new ConcurrentHashMap<>();
 
   /**
    * Returns the metrics cache to help prevent re-registrations.
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
index 2137285bac0..68a3b0f39da 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
@@ -37,6 +37,9 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
@@ -61,6 +64,7 @@
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -794,6 +798,63 @@ public void testCollectAllMetrics() {
         .checkAgainst(queueSource, true);
   }
 
+  @Test
+  public void testQueueMetricsRaceCondition() throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(2);
+    final int numIterations = 100000;
+    final AtomicInteger exceptionCount = new AtomicInteger(0);
+    final AtomicInteger getCount = new AtomicInteger(0);
+
+    // init a queue metrics for testing
+    String queueName = "test";
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(ms, queueName, null, false, conf);
+    QueueMetrics.getQueueMetrics().put(queueName, metrics);
+
+    /*
+     * simulate the concurrent calls for QueueMetrics#getQueueMetrics
+     */
+    // thread A will keep querying the same queue metrics for a specified 
number of iterations
+    Thread threadA = new Thread(() -> {
+      try {
+        for (int i = 0; i < numIterations; i++) {
+          QueueMetrics qm = QueueMetrics.getQueueMetrics().get(queueName);
+          if (qm != null) {
+            getCount.incrementAndGet();
+          }
+        }
+      } catch (Exception e) {
+        System.out.println("Exception: " + e.getMessage());
+        exceptionCount.incrementAndGet();
+      } finally {
+        latch.countDown();
+      }
+    });
+    // thread B will keep adding new queue metrics for a specified number of 
iterations
+    Thread threadB = new Thread(() -> {
+      try {
+        for (int i = 0; i < numIterations; i++) {
+          QueueMetrics.getQueueMetrics().put("q" + i, metrics);
+        }
+      } catch (Exception e) {
+        exceptionCount.incrementAndGet();
+      } finally {
+        latch.countDown();
+      }
+    });
+
+    // start threads and wait for them to finish
+    threadA.start();
+    threadB.start();
+    latch.await();
+
+    // check if all get operations are successful to
+    // make sure there is no race condition
+    assertEquals(numIterations, getCount.get());
+    // check if there is any exception
+    assertEquals(0, exceptionCount.get());
+  }
+
   private static void checkAggregatedNodeTypes(MetricsSource source,
       long nodeLocal, long rackLocal, long offSwitch) {
     MetricsRecordBuilder rb = getMetrics(source);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to