This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a552b42 [ISSUE-376] Fix concurrency problems may occur when the 
ApplicationManager register app (#382)
9a552b42 is described below

commit 9a552b42f3b5073600b995d0da38204c2fe65e58
Author: jokercurry <[email protected]>
AuthorDate: Sat Dec 3 21:21:06 2022 +0800

    [ISSUE-376] Fix concurrency problems may occur when the ApplicationManager 
register app (#382)
    
    ### What changes were proposed in this pull request?
    Lock when reading and writing the map `appAndTimes`.
    
    
    ### Why are the changes needed?
    To resolve #376 .
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passed origin uts.
---
 .../uniffle/coordinator/AccessQuotaChecker.java    | 25 ++++++----------
 .../uniffle/coordinator/ApplicationManager.java    | 11 ++++----
 .../apache/uniffle/coordinator/QuotaManager.java   | 27 ++++++++++++++++++
 .../uniffle/coordinator/QuotaManagerTest.java      | 33 ++++++++++++++++++++++
 4 files changed, 73 insertions(+), 23 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
index edaa6031..78923ab8 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
@@ -18,10 +18,8 @@
 package org.apache.uniffle.coordinator;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.atomic.LongAdder;
 
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,21 +50,14 @@ public class AccessQuotaChecker extends 
AbstractAccessChecker {
     final String uuid = hostIp.hashCode() + "-" + COUNTER.sum();
     final String user = accessInfo.getUser();
     // low version client user attribute is an empty string
-    if (!"".equals(user)) {
-      Map<String, Map<String, Long>> currentUserApps = 
quotaManager.getCurrentUserAndApp();
-      Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x 
-> Maps.newConcurrentMap());
-      Integer defaultAppNum = 
quotaManager.getDefaultUserApps().getOrDefault(user,
-          conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
-      int currentAppNum = appAndTimes.size();
-      if (currentAppNum >= defaultAppNum) {
-        String msg = "Denied by AccessClusterLoadChecker => "
-            + "User: " + user + ", current app num is: " + currentAppNum
-            + ", default app num is: " + defaultAppNum + ". We will reject 
this app[uuid=" + uuid + "].";
-        LOG.error(msg);
-        CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
-        return new AccessCheckResult(false, msg);
-      }
-      appAndTimes.put(uuid, System.currentTimeMillis());
+    if (!"".equals(user) && quotaManager.checkQuota(user, uuid)) {
+      String msg = "Denied by AccessQuotaChecker => "
+          + "User: " + user + ", current app num is: " + 
quotaManager.getCurrentUserAndApp().get(user).size()
+          + ", default app num is: " + 
quotaManager.getDefaultUserApps().get(user)
+          + ". We will reject this app[uuid=" + uuid + "].";
+      LOG.error(msg);
+      CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
+      return new AccessCheckResult(false, msg);
     }
     return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 19c88ac2..73d21683 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -106,12 +106,11 @@ public class ApplicationManager {
       CoordinatorMetrics.counterTotalAppNum.inc();
       LOG.info("New application is registered: {}", appId);
     }
-    long currentTimeMillis = System.currentTimeMillis();
-    String[] appIdAndUuid = appId.split("_");
-    String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
-    // if appId created successfully, we need to remove the uuid
-    appAndTime.remove(uuidFromApp);
-    appAndTime.put(appId, currentTimeMillis);
+    if (quotaManager != null) {
+      quotaManager.registerApplicationInfo(appId, appAndTime);
+    } else {
+      appAndTime.put(appId, System.currentTimeMillis());
+    }
   }
 
   public void refreshAppId(String appId) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index 266aff40..69deefee 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -49,12 +49,14 @@ public class QuotaManager {
   private final Map<String, Map<String, Long>> currentUserAndApp = 
Maps.newConcurrentMap();
   private final Map<String, String> appIdToUser = Maps.newConcurrentMap();
   private final String quotaFilePath;
+  private final Integer quotaAppNum;
   private FileSystem hadoopFileSystem;
   private final AtomicLong quotaFileLastModify = new AtomicLong(0L);
   private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
 
   public QuotaManager(CoordinatorConf conf) {
     this.quotaFilePath = 
conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);
+    this.quotaAppNum = 
conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM);
     if (quotaFilePath == null) {
       LOG.warn("{} is not configured, each user will use the default quota : 
{}",
           CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH.key(),
@@ -110,6 +112,31 @@ public class QuotaManager {
     }
   }
 
+  public boolean checkQuota(String user, String uuid) {
+    Map<String, Long> appAndTimes = currentUserAndApp.computeIfAbsent(user, x 
-> Maps.newConcurrentMap());
+    Integer defaultAppNum = defaultUserApps.getOrDefault(user, quotaAppNum);
+    synchronized (this) {
+      int currentAppNum = appAndTimes.size();
+      if (currentAppNum >= defaultAppNum) {
+        return true;
+      } else {
+        appAndTimes.put(uuid, System.currentTimeMillis());
+        return false;
+      }
+    }
+  }
+
+  public void registerApplicationInfo(String appId, Map<String, Long> 
appAndTime) {
+    long currentTimeMillis = System.currentTimeMillis();
+    String[] appIdAndUuid = appId.split("_");
+    String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
+    // if appId created successfully, we need to remove the uuid
+    synchronized (this) {
+      appAndTime.remove(uuidFromApp);
+      appAndTime.put(appId, currentTimeMillis);
+    }
+  }
+
   @VisibleForTesting
   public Map<String, Integer> getDefaultUserApps() {
     return defaultUserApps;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index 8611d4a6..3ead278e 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.coordinator;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * QuotaManager is a manager for resource restriction.
@@ -100,4 +104,33 @@ public class QuotaManagerTest {
     // it didn't detectUserResource because 
`org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
     assertNull(applicationManager.getQuotaManager());
   }
+
+  @Test
+  public void testCheckQuota() throws Exception {
+    final String quotaFile =
+        new 
Path(remotePath.getAbsolutePath()).getFileSystem(hdfsConf).getName() + 
"/quotaFile.properties";
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
+        quotaFile);
+    final ApplicationManager applicationManager = new ApplicationManager(conf);
+    final AtomicInteger uuid = new AtomicInteger();
+    Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+    final int i1 = uuid.incrementAndGet();
+    uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+    Map<String, Long> appAndTime = 
applicationManager.getQuotaManager().getCurrentUserAndApp()
+        .computeIfAbsent("user1", x -> uuidAndTime);
+    // This thread may remove the uuid and put the appId in.
+    final Thread registerThread = new Thread(() ->
+        
applicationManager.getQuotaManager().registerApplicationInfo("application_test_"
 + i1, appAndTime));
+    registerThread.start();
+    final boolean icCheck = applicationManager.getQuotaManager()
+        .checkQuota("user1", String.valueOf(i1));
+    registerThread.join();
+    assertTrue(icCheck);
+    
assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user1").size(),
 5);
+  }
 }

Reply via email to