This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a97ca0aa [CELEBORN-1577][PHASE2] QuotaManager should support 
interrupt shuffle
0a97ca0aa is described below

commit 0a97ca0aa98645121cc8fabc0ede3a4e05408249
Author: Xianming Lei <[email protected]>
AuthorDate: Mon Mar 24 22:05:45 2025 +0800

    [CELEBORN-1577][PHASE2] QuotaManager should support interrupt shuffle
    
    ### What changes were proposed in this pull request?
    1. Worker reports resourceConsumption to master
    2. QuotaManager calculates the resourceConsumption of each app and marks 
the apps that exceed the quota.
        2.1 When the tenant's resourceConsumption exceeds the tenant's quota, 
select the app with a larger consumption to mark interrupted.
        2.2 When the resourceConsumption of the cluster exceeds the cluster 
quota, select the app with larger consumption to mark interrupted.
    3. Master returns to Driver through heartbeat, whether app is marked 
interrupted
    
    ### Why are the changes needed?
    The current storage quota logic can only limit new shuffles, and cannot 
limit the writing of existing shuffles. In our production environment, there is 
such an scenario: the cluster is small, but the user's app single shuffle is 
large which occupied disk resources, we want to interrupt those shuffle.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UTs.
    
    Closes #2819 from leixm/CELEBORN-1577-2.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 129 +++-
 .../common/quota/ResourceConsumption.scala         |  21 +
 .../quota/{Quota.scala => StorageQuota.scala}      |   6 +-
 docs/configuration/master.md                       |   4 +
 docs/configuration/quota.md                        |  19 +-
 docs/migration.md                                  |   8 +
 .../master/clustermeta/AbstractMetaManager.java    |  25 +-
 .../clustermeta/SingleMasterMetaManager.java       |  25 +-
 .../master/clustermeta/ha/HAMasterMetaManager.java |   8 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  20 +-
 .../celeborn/service/deploy/master/Master.scala    |  83 +--
 .../service/deploy/master/MasterSource.scala       |   2 +
 .../service/deploy/master/quota/QuotaManager.scala | 429 ++++++++++++--
 .../service/deploy/master/quota/QuotaStatus.scala  |  28 +-
 ...onfig-quota.yaml => dynamicConfig-quota-2.yaml} |  20 +-
 ...onfig-quota.yaml => dynamicConfig-quota-3.yaml} |  18 +-
 master/src/test/resources/dynamicConfig-quota.yaml |  17 +-
 .../deploy/master/quota/QuotaManagerSuite.scala    | 659 ++++++++++++++++++++-
 .../common/service/config/DynamicConfig.java       | 115 +++-
 .../deploy/worker/storage/StorageManager.scala     |  19 +-
 20 files changed, 1358 insertions(+), 297 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 01e7b0d65..f132cd189 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -674,6 +674,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def estimatedPartitionSizeForEstimationUpdateInterval: Long =
     get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
   def masterResourceConsumptionInterval: Long = 
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
+  def masterResourceConsumptionMetricsEnabled: Boolean =
+    get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED)
   def clusterName: String = get(CLUSTER_NAME)
 
   // //////////////////////////////////////////////////////
@@ -893,6 +895,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   //                      Quota                         //
   // //////////////////////////////////////////////////////
   def quotaEnabled: Boolean = get(QUOTA_ENABLED)
+  def clusterQuotaEnabled: Boolean = get(CLUSTER_QUOTA_ENABLED)
+  def tenantQuotaEnabled: Boolean = get(TENANT_QUOTA_ENABLED)
+  def userQuotaEnabled: Boolean = get(USER_QUOTA_ENABLED)
   def quotaInterruptShuffleEnabled: Boolean = 
get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)
 
   // //////////////////////////////////////////////////////
@@ -5511,6 +5516,30 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLUSTER_QUOTA_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.quota.cluster.enabled")
+      .categories("quota", "master")
+      .doc("Whether to enable cluster-level quota.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val TENANT_QUOTA_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.quota.tenant.enabled")
+      .categories("quota", "master")
+      .doc("Whether to enable tenant-level quota.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val USER_QUOTA_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.quota.user.enabled")
+      .categories("quota", "master")
+      .doc("Whether to enable user-level quota.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val CONTAINER_INFO_PROVIDER: ConfigEntry[String] =
     buildConf("celeborn.container.info.provider")
       .categories("master", "worker")
@@ -5520,38 +5549,38 @@ object CelebornConf extends Logging {
       .stringConf
       
.createWithDefault("org.apache.celeborn.server.common.container.DefaultContainerInfoProvider")
 
-  val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+  val QUOTA_TENANT_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
     buildConf("celeborn.quota.tenant.diskBytesWritten")
       .categories("quota")
       .dynamic
-      .doc("Quota dynamic configuration for written disk bytes.")
+      .doc("Tenant level quota dynamic configuration for written disk bytes.")
       .version("0.5.0")
-      .longConf
+      .bytesConf(ByteUnit.BYTE)
       .createWithDefault(Long.MaxValue)
 
-  val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
+  val QUOTA_TENANT_DISK_FILE_COUNT: ConfigEntry[Long] =
     buildConf("celeborn.quota.tenant.diskFileCount")
       .categories("quota")
       .dynamic
-      .doc("Quota dynamic configuration for written disk file count.")
+      .doc("Tenant level quota dynamic configuration for written disk file 
count.")
       .version("0.5.0")
       .longConf
       .createWithDefault(Long.MaxValue)
 
-  val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+  val QUOTA_TENANT_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
     buildConf("celeborn.quota.tenant.hdfsBytesWritten")
       .categories("quota")
       .dynamic
-      .doc("Quota dynamic configuration for written hdfs bytes.")
+      .doc("Tenant level quota dynamic configuration for written hdfs bytes.")
       .version("0.5.0")
-      .longConf
+      .bytesConf(ByteUnit.BYTE)
       .createWithDefault(Long.MaxValue)
 
-  val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
+  val QUOTA_TENANT_HDFS_FILE_COUNT: ConfigEntry[Long] =
     buildConf("celeborn.quota.tenant.hdfsFileCount")
       .categories("quota")
       .dynamic
-      .doc("Quota dynamic configuration for written hdfs file count.")
+      .doc("Tenant level quota dynamic configuration for written hdfs file 
count.")
       .version("0.5.0")
       .longConf
       .createWithDefault(Long.MaxValue)
@@ -6182,4 +6211,84 @@ object CelebornConf extends Logging {
       .doubleConf
       .checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
       .createWithDefault(1)
+
+  val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .longConf
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .longConf
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .longConf
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .longConf
+      .createWithDefault(Long.MaxValue)
+
+  val MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.master.userResourceConsumption.metrics.enabled")
+      .categories("master")
+      .doc("Whether to enable resource consumption metrics.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefaultString("false")
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala
 
b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala
index 10d1114b9..d454bb894 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala
@@ -30,6 +30,12 @@ case class ResourceConsumption(
     hdfsFileCount: Long,
     var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) 
{
 
+  def withSubResourceConsumptions(
+      resourceConsumptions: util.Map[String, ResourceConsumption]): 
ResourceConsumption = {
+    subResourceConsumptions = resourceConsumptions
+    this
+  }
+
   def add(other: ResourceConsumption): ResourceConsumption = {
     ResourceConsumption(
       diskBytesWritten + other.diskBytesWritten,
@@ -38,6 +44,14 @@ case class ResourceConsumption(
       hdfsFileCount + other.hdfsFileCount)
   }
 
+  def subtract(other: ResourceConsumption): ResourceConsumption = {
+    ResourceConsumption(
+      diskBytesWritten - other.diskBytesWritten,
+      diskFileCount - other.diskFileCount,
+      hdfsBytesWritten - other.hdfsBytesWritten,
+      hdfsFileCount - other.hdfsFileCount)
+  }
+
   def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
     String,
     ResourceConsumption]): Map[String, ResourceConsumption] = {
@@ -77,4 +91,11 @@ case class ResourceConsumption(
       s" hdfsFileCount: $hdfsFileCount," +
       s" subResourceConsumptions: $subResourceConsumptionString)"
   }
+
+  def simpleString: String = {
+    s"ResourceConsumption(diskBytesWritten: 
${Utils.bytesToString(diskBytesWritten)}," +
+      s" diskFileCount: $diskFileCount," +
+      s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
+      s" hdfsFileCount: $hdfsFileCount)"
+  }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala 
b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala
similarity index 90%
copy from common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
copy to 
common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala
index 8a8452258..1a7a8e52a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.util.Utils
 
-case class Quota(
+case class StorageQuota(
     diskBytesWritten: Long,
     diskFileCount: Long,
     hdfsBytesWritten: Long,
@@ -34,3 +34,7 @@ case class Quota(
       s"]"
   }
 }
+
+object StorageQuota {
+  val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, 
Long.MaxValue, Long.MaxValue)
+}
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 894acced7..6441fd912 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -79,9 +79,13 @@ license: |
 | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This 
configuration is a guidance for load-aware slot allocation algorithm. This 
value is control how many disk groups will be created. | 0.3.0 | 
celeborn.slots.assign.loadAware.numDiskGroups | 
 | celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
 | celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master 
to assign slots, Celeborn supports two types of policy: roundrobin and 
loadaware. Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy | 
+| celeborn.master.userResourceConsumption.metrics.enabled | false | false | 
Whether to enable resource consumption metrics. | 0.6.0 |  | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | false | Time 
length for a window about compute user resource consumption. | 0.3.0 |  | 
 | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker 
unavailable info would be cleared when the retention period is expired. Set -1 
to disable the expiration. | 0.3.1 |  | 
+| celeborn.quota.cluster.enabled | true | false | Whether to enable 
cluster-level quota. | 0.6.0 |  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service when Master side checks that there is no enough quota for 
current user. | 0.2.0 |  | 
+| celeborn.quota.tenant.enabled | true | false | Whether to enable 
tenant-level quota. | 0.6.0 |  | 
+| celeborn.quota.user.enabled | true | false | Whether to enable user-level 
quota. | 0.6.0 |  | 
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 
 | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md
index b1fa4f940..4174940e2 100644
--- a/docs/configuration/quota.md
+++ b/docs/configuration/quota.md
@@ -19,10 +19,21 @@ license: |
 <!--begin-include-->
 | Key | Default | isDynamic | Description | Since | Deprecated |
 | --- | ------- | --------- | ----------- | ----- | ---------- |
+| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | 
Cluster level quota dynamic configuration for written disk bytes. | 0.6.0 |  | 
+| celeborn.quota.cluster.diskFileCount | 9223372036854775807 | true | Cluster 
level quota dynamic configuration for written disk file count. | 0.6.0 |  | 
+| celeborn.quota.cluster.enabled | true | false | Whether to enable 
cluster-level quota. | 0.6.0 |  | 
+| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | 
Cluster level quota dynamic configuration for written hdfs bytes. | 0.6.0 |  | 
+| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807 | true | Cluster 
level quota dynamic configuration for written hdfs file count. | 0.6.0 |  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service when Master side checks that there is no enough quota for 
current user. | 0.2.0 |  | 
 | celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable 
interrupt shuffle when quota exceeds. | 0.6.0 |  | 
-| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota 
dynamic configuration for written disk bytes. | 0.5.0 |  | 
-| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota 
dynamic configuration for written disk file count. | 0.5.0 |  | 
-| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota 
dynamic configuration for written hdfs bytes. | 0.5.0 |  | 
-| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota 
dynamic configuration for written hdfs file count. | 0.5.0 |  | 
+| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | 
Tenant level quota dynamic configuration for written disk bytes. | 0.5.0 |  | 
+| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Tenant 
level quota dynamic configuration for written disk file count. | 0.5.0 |  | 
+| celeborn.quota.tenant.enabled | true | false | Whether to enable 
tenant-level quota. | 0.6.0 |  | 
+| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | 
Tenant level quota dynamic configuration for written hdfs bytes. | 0.5.0 |  | 
+| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Tenant 
level quota dynamic configuration for written hdfs file count. | 0.5.0 |  | 
+| celeborn.quota.user.diskBytesWritten | 9223372036854775807b | true | User 
level quota dynamic configuration for written disk bytes. | 0.6.0 |  | 
+| celeborn.quota.user.diskFileCount | 9223372036854775807 | true | User level 
quota dynamic configuration for written disk file count. | 0.6.0 |  | 
+| celeborn.quota.user.enabled | true | false | Whether to enable user-level 
quota. | 0.6.0 |  | 
+| celeborn.quota.user.hdfsBytesWritten | 9223372036854775807b | true | User 
level quota dynamic configuration for written hdfs bytes. | 0.6.0 |  | 
+| celeborn.quota.user.hdfsFileCount | 9223372036854775807 | true | User level 
quota dynamic configuration for written hdfs file count. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/migration.md b/docs/migration.md
index 800655e5e..d53495280 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -23,6 +23,14 @@ license: |
 
 # Upgrading from 0.5 to 0.6
 
+- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to 
`celeborn.quota.user.diskBytesWritten`. Please use 
`celeborn.quota.user.diskBytesWritten` if you want to set user level quota.
+
+- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to 
`celeborn.quota.user.diskFileCount`. Please use 
`celeborn.quota.user.diskFileCount` if you want to set user level quota.
+
+- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsBytesWritten` to 
`celeborn.quota.user.hdfsBytesWritten`. Please use 
`celeborn.quota.user.hdfsBytesWritten` if you want to set user level quota.
+
+- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to 
`celeborn.quota.user.hdfsFileCount`. Please use 
`celeborn.quota.user.hdfsFileCount` if you want to set user level quota.
+
 - Since 0.6.0, Celeborn changed the default value of 
`celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means 
Celeborn will involve more workers in offering slots.
 
 - Since 0.6.0, Celeborn deprecate 
`celeborn.worker.congestionControl.low.watermark`. Please use 
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a794e059c..ab3449e74 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -250,13 +250,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       int fetchPort,
       int replicatePort,
       Map<String, DiskInfo> disks,
-      Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       long time,
       WorkerStatus workerStatus,
       boolean highWorkload) {
     WorkerInfo worker =
-        new WorkerInfo(
-            host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, 
userResourceConsumption);
+        new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, 
disks, null);
     AtomicLong availableSlots = new AtomicLong();
     LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
     synchronized (workersMap) {
@@ -264,7 +262,6 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       workerInfo.ifPresent(
           info -> {
             info.updateThenGetDiskInfos(disks, 
Option.apply(estimatedPartitionSize));
-            info.updateThenGetUserResourceConsumption(userResourceConsumption);
             availableSlots.set(info.totalAvailableSlots());
             info.lastHeartbeat_$eq(time);
             info.setWorkerStatus(workerStatus);
@@ -310,8 +307,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       int replicatePort,
       int internalPort,
       String networkLocation,
-      Map<String, DiskInfo> disks,
-      Map<UserIdentifier, ResourceConsumption> userResourceConsumption) {
+      Map<String, DiskInfo> disks) {
     WorkerInfo workerInfo =
         new WorkerInfo(
             host,
@@ -321,7 +317,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
             replicatePort,
             internalPort,
             disks,
-            userResourceConsumption);
+            new HashMap<>());
     workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
     if (networkLocation != null
         && !networkLocation.isEmpty()
@@ -613,4 +609,19 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
           fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v + 
fallbackCounts.get(k));
     }
   }
+
+  public void updateWorkerResourceConsumptions(
+      String host,
+      int rpcPort,
+      int pushPort,
+      int fetchPort,
+      int replicatePort,
+      Map<UserIdentifier, ResourceConsumption> resourceConsumptions) {
+    WorkerInfo worker =
+        new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, 
null, null);
+    synchronized (workersMap) {
+      Optional<WorkerInfo> workerInfo = 
Optional.ofNullable(workersMap.get(worker.toUniqueId()));
+      workerInfo.ifPresent(info -> 
info.updateThenGetUserResourceConsumption(resourceConsumptions));
+    }
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 765beb791..779ef4683 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -124,16 +124,9 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
       WorkerStatus workerStatus,
       String requestId) {
     updateWorkerHeartbeatMeta(
-        host,
-        rpcPort,
-        pushPort,
-        fetchPort,
-        replicatePort,
-        disks,
-        userResourceConsumption,
-        time,
-        workerStatus,
-        highWorkload);
+        host, rpcPort, pushPort, fetchPort, replicatePort, disks, time, 
workerStatus, highWorkload);
+    updateWorkerResourceConsumptions(
+        host, rpcPort, pushPort, fetchPort, replicatePort, 
userResourceConsumption);
   }
 
   @Override
@@ -149,15 +142,9 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       String requestId) {
     updateRegisterWorkerMeta(
-        host,
-        rpcPort,
-        pushPort,
-        fetchPort,
-        replicatePort,
-        internalPort,
-        networkLocation,
-        disks,
-        userResourceConsumption);
+        host, rpcPort, pushPort, fetchPort, replicatePort, internalPort, 
networkLocation, disks);
+    updateWorkerResourceConsumptions(
+        host, rpcPort, pushPort, fetchPort, replicatePort, 
userResourceConsumption);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 1b50b5d7a..6d72627f8 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -290,13 +290,13 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                       .setFetchPort(fetchPort)
                       .setReplicatePort(replicatePort)
                       .putAllDisks(MetaUtil.toPbDiskInfos(disks))
-                      .putAllUserResourceConsumption(
-                          
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
                       .setWorkerStatus(MetaUtil.toPbWorkerStatus(workerStatus))
                       .setTime(time)
                       .setHighWorkload(highWorkload)
                       .build())
               .build());
+      updateWorkerResourceConsumptions(
+          host, rpcPort, pushPort, fetchPort, replicatePort, 
userResourceConsumption);
     } catch (CelebornRuntimeException e) {
       LOG.error("Handle worker heartbeat for {} failed!", host, e);
       throw e;
@@ -330,10 +330,10 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                       .setInternalPort(internalPort)
                       .setNetworkLocation(networkLocation)
                       .putAllDisks(MetaUtil.toPbDiskInfos(disks))
-                      .putAllUserResourceConsumption(
-                          
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
                       .build())
               .build());
+      updateWorkerResourceConsumptions(
+          host, rpcPort, pushPort, fetchPort, replicatePort, 
userResourceConsumption);
     } catch (CelebornRuntimeException e) {
       LOG.error("Handle worker register for {} failed!", host, e);
       throw e;
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index b1ee9b72e..44a82d8d3 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -199,9 +199,6 @@ public class MetaHandler {
           pushPort = request.getWorkerHeartbeatRequest().getPushPort();
           fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
           diskInfos = 
MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
-          userResourceConsumption =
-              MetaUtil.fromPbUserResourceConsumption(
-                  
request.getWorkerHeartbeatRequest().getUserResourceConsumptionMap());
           replicatePort = 
request.getWorkerHeartbeatRequest().getReplicatePort();
           boolean highWorkload = 
request.getWorkerHeartbeatRequest().getHighWorkload();
           if (request.getWorkerHeartbeatRequest().hasWorkerStatus()) {
@@ -212,14 +209,13 @@ public class MetaHandler {
           }
 
           LOG.debug(
-              "Handle worker heartbeat for {} {} {} {} {} {} {}",
+              "Handle worker heartbeat for {} {} {} {} {} {}",
               host,
               rpcPort,
               pushPort,
               fetchPort,
               replicatePort,
-              diskInfos,
-              userResourceConsumption);
+              diskInfos);
           metaSystem.updateWorkerHeartbeatMeta(
               host,
               rpcPort,
@@ -227,7 +223,6 @@ public class MetaHandler {
               fetchPort,
               replicatePort,
               diskInfos,
-              userResourceConsumption,
               request.getWorkerHeartbeatRequest().getTime(),
               workerStatus,
               highWorkload);
@@ -242,19 +237,15 @@ public class MetaHandler {
           String networkLocation = 
request.getRegisterWorkerRequest().getNetworkLocation();
           int internalPort = 
request.getRegisterWorkerRequest().getInternalPort();
           diskInfos = 
MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
-          userResourceConsumption =
-              MetaUtil.fromPbUserResourceConsumption(
-                  
request.getRegisterWorkerRequest().getUserResourceConsumptionMap());
           LOG.debug(
-              "Handle worker register for {} {} {} {} {} {} {} {}",
+              "Handle worker register for {} {} {} {} {} {} {}",
               host,
               rpcPort,
               pushPort,
               fetchPort,
               replicatePort,
               internalPort,
-              diskInfos,
-              userResourceConsumption);
+              diskInfos);
           metaSystem.updateRegisterWorkerMeta(
               host,
               rpcPort,
@@ -263,8 +254,7 @@ public class MetaHandler {
               replicatePort,
               internalPort,
               networkLocation,
-              diskInfos,
-              userResourceConsumption);
+              diskInfos);
           break;
 
         case ReportWorkerUnavailable:
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1e52da3e2..55d3ad579 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -190,11 +190,13 @@ private[celeborn] class Master(
   private val hasHDFSStorage = conf.hasHDFSStorage
   private val hasS3Storage = conf.hasS3Storage
 
-  private val quotaManager = new QuotaManager(conf, configService)
+  private val quotaManager = new QuotaManager(
+    statusSystem,
+    masterSource,
+    resourceConsumptionSource,
+    conf,
+    configService)
   private val tagsManager = new TagsManager(Option(configService))
-  private val masterResourceConsumptionInterval = 
conf.masterResourceConsumptionInterval
-  private val userResourceConsumptions =
-    JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, 
Long)]()
 
   private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
   private val slotsAssignLoadAwareDiskGroupNum = 
conf.masterSlotAssignLoadAwareDiskGroupNum
@@ -548,6 +550,7 @@ private[celeborn] class Master(
         .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
       val workersToRemove = new 
util.ArrayList[WorkerInfo](pb.getWorkersToRemoveList
         .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+
       executeWithLeaderChecker(
         context,
         handleWorkerExclude(
@@ -1059,6 +1062,7 @@ private[celeborn] class Master(
       override def run(): Unit = {
         workersAssignedToApp.remove(appId)
         statusSystem.handleAppLost(appId, requestId)
+        quotaManager.handleAppLost(appId)
         logInfo(s"Removed application $appId")
         if (hasHDFSStorage || hasS3Storage) {
           checkAndCleanExpiredAppDirsOnDFS(appId)
@@ -1148,7 +1152,7 @@ private[celeborn] class Master(
         new util.ArrayList[WorkerInfo](
           (statusSystem.shutdownWorkers.asScala ++ 
statusSystem.decommissionWorkers.asScala).asJava),
         new util.ArrayList(appRelatedShuffles),
-        CheckQuotaResponse(isAvailable = true, "")))
+        quotaManager.checkApplicationQuotaStatus(appId)))
     } else {
       context.reply(OneWayMessageResponse)
     }
@@ -1164,78 +1168,11 @@ private[celeborn] class Master(
     }
   }
 
-  private def handleResourceConsumption(userIdentifier: UserIdentifier): 
ResourceConsumption = {
-    val userResourceConsumption = 
computeUserResourceConsumption(userIdentifier)
-    gaugeResourceConsumption(userIdentifier)
-    userResourceConsumption
-  }
-
-  private def gaugeResourceConsumption(
-      userIdentifier: UserIdentifier,
-      applicationId: String = null): Unit = {
-    val resourceConsumptionLabel =
-      if (applicationId == null) userIdentifier.toMap
-      else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel 
-> applicationId)
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.DISK_FILE_COUNT,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, applicationId).diskFileCount
-    }
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.DISK_BYTES_WRITTEN,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
applicationId).diskBytesWritten
-    }
-    if (hasHDFSStorage) {
-      resourceConsumptionSource.addGauge(
-        ResourceConsumptionSource.HDFS_FILE_COUNT,
-        resourceConsumptionLabel) { () =>
-        computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
-      }
-      resourceConsumptionSource.addGauge(
-        ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
-        resourceConsumptionLabel) { () =>
-        computeResourceConsumption(userIdentifier, 
applicationId).hdfsBytesWritten
-      }
-    }
-  }
-
-  private def computeResourceConsumption(
-      userIdentifier: UserIdentifier,
-      applicationId: String = null): ResourceConsumption = {
-    val newResourceConsumption = computeUserResourceConsumption(userIdentifier)
-    if (applicationId == null) {
-      val current = System.currentTimeMillis()
-      if (userResourceConsumptions.containsKey(userIdentifier)) {
-        val resourceConsumptionAndUpdateTime = 
userResourceConsumptions.get(userIdentifier)
-        if (current - resourceConsumptionAndUpdateTime._2 <= 
masterResourceConsumptionInterval) {
-          return resourceConsumptionAndUpdateTime._1
-        }
-      }
-      userResourceConsumptions.put(userIdentifier, (newResourceConsumption, 
current))
-      newResourceConsumption
-    } else {
-      newResourceConsumption.subResourceConsumptions.get(applicationId)
-    }
-  }
-
-  // TODO: Support calculate topN app resource consumption.
-  private def computeUserResourceConsumption(
-      userIdentifier: UserIdentifier): ResourceConsumption = {
-    val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap 
{
-      workerInfo => 
workerInfo.userResourceConsumption.asScala.get(userIdentifier)
-    }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
-    resourceConsumption
-  }
-
   private[master] def handleCheckQuota(
       userIdentifier: UserIdentifier,
       context: RpcCallContext): Unit = {
-    val userResourceConsumption = handleResourceConsumption(userIdentifier)
     if (conf.quotaEnabled) {
-      val (isAvailable, reason) =
-        quotaManager.checkQuotaSpaceAvailable(userIdentifier, 
userResourceConsumption)
-      context.reply(CheckQuotaResponse(isAvailable, reason))
+      context.reply(quotaManager.checkUserQuotaStatus(userIdentifier))
     } else {
       context.reply(CheckQuotaResponse(true, ""))
     }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 8309209c4..c27ff0949 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -65,4 +65,6 @@ object MasterSource {
   // Capacity
   val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
   val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"
+
+  val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime"
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
index 07906b7d3..f9886383c 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
@@ -16,96 +16,403 @@
  */
 package org.apache.celeborn.service.deploy.master.quota
 
+import java.util.{Map => JMap}
+import java.util.concurrent.TimeUnit
+import java.util.function.Predicate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.quota.{Quota, ResourceConsumption}
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource
+import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource._
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse
+import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota}
+import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.service.config.ConfigService
+import org.apache.celeborn.service.deploy.master.MasterSource
+import 
org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME
+import 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager
+import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._
+
+class QuotaManager(
+    statusSystem: AbstractMetaManager,
+    masterSource: MasterSource,
+    resourceConsumptionSource: ResourceConsumptionSource,
+    celebornConf: CelebornConf,
+    configService: ConfigService) extends Logging {
+
+  val userQuotaStatus: JMap[UserIdentifier, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  val tenantQuotaStatus: JMap[String, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  val resourceConsumptionMetricsEnabled = 
celebornConf.masterResourceConsumptionMetricsEnabled
+  @volatile
+  var clusterQuotaStatus: QuotaStatus = new QuotaStatus()
+  val appQuotaStatus: JMap[String, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  private val quotaChecker =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker")
+  quotaChecker.scheduleWithFixedDelay(
+    new Runnable {
+      override def run(): Unit = {
+        try {
+          updateResourceConsumption()
+        } catch {
+          case t: Throwable => logError("Update user resource consumption 
failed.", t)
+        }
+      }
+    },
+    0L,
+    celebornConf.masterResourceConsumptionInterval,
+    TimeUnit.MILLISECONDS)
+
+  def handleAppLost(appId: String): Unit = {
+    appQuotaStatus.remove(appId)
+  }
 
-class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) 
extends Logging {
-  private val DEFAULT_QUOTA = Quota(
-    celebornConf.get(CelebornConf.QUOTA_DISK_BYTES_WRITTEN),
-    celebornConf.get(CelebornConf.QUOTA_DISK_FILE_COUNT),
-    celebornConf.get(CelebornConf.QUOTA_HDFS_BYTES_WRITTEN),
-    celebornConf.get(CelebornConf.QUOTA_HDFS_FILE_COUNT))
-  def getQuota(userIdentifier: UserIdentifier): Quota = {
-    if (configService != null) {
-      val config =
-        configService.getTenantUserConfigFromCache(userIdentifier.tenantId, 
userIdentifier.name)
-      config.getQuota
+  def checkUserQuotaStatus(user: UserIdentifier): CheckQuotaResponse = {
+    val tenantStatus = tenantQuotaStatus.getOrDefault(user.tenantId, 
QuotaStatus())
+    val userStatus = userQuotaStatus.getOrDefault(user, QuotaStatus())
+    if (userQuotaEnabled && userStatus.exceed) {
+      logInfo(s"user $user quota exceeded, detail: ${userStatus.exceedReason}")
+      CheckQuotaResponse(false, userStatus.exceedReason)
+    } else if (tenantQuotaEnabled && tenantStatus.exceed) {
+      logInfo(s"User $user was rejected because of tenant " +
+        s"${user.tenantId} quota exceeded, detail: 
${tenantStatus.exceedReason}")
+      CheckQuotaResponse(false, tenantStatus.exceedReason)
+    } else if (clusterQuotaEnabled && clusterQuotaStatus.exceed) {
+      logInfo(s"User $user was rejected because of cluster quota exceeded, " +
+        s"detail: ${clusterQuotaStatus.exceedReason}")
+      CheckQuotaResponse(false, clusterQuotaStatus.exceedReason)
     } else {
-      DEFAULT_QUOTA
+      CheckQuotaResponse(true, "")
     }
   }
 
-  def checkQuotaSpaceAvailable(
-      userIdentifier: UserIdentifier,
-      resourceResumption: ResourceConsumption): (Boolean, String) = {
-    val quota = getQuota(userIdentifier)
+  def checkApplicationQuotaStatus(applicationId: String): CheckQuotaResponse = 
{
+    val status = appQuotaStatus.getOrDefault(applicationId, QuotaStatus())
+    if (status.exceed) {
+      logInfo(s"application $applicationId quota exceeded, detail: 
${status.exceedReason}")
+    }
+    CheckQuotaResponse(!status.exceed, status.exceedReason)
+  }
+
+  def getUserStorageQuota(user: UserIdentifier): StorageQuota = {
+    Option(configService)
+      .map(_.getTenantUserConfigFromCache(user.tenantId, 
user.name).getUserStorageQuota)
+      .getOrElse(StorageQuota.DEFAULT_QUOTA)
+  }
+
+  def getTenantStorageQuota(tenantId: String): StorageQuota = {
+    Option(configService)
+      .map(_.getTenantConfigFromCache(tenantId).getTenantStorageQuota)
+      .getOrElse(StorageQuota.DEFAULT_QUOTA)
+  }
+
+  def getClusterStorageQuota: StorageQuota = {
+    Option(configService)
+      .map(_.getSystemConfigFromCache.getClusterStorageQuota)
+      .getOrElse(StorageQuota.DEFAULT_QUOTA)
+  }
+
+  private def interruptShuffleEnabled: Boolean = {
+    Option(configService)
+      .map(_.getSystemConfigFromCache.interruptShuffleEnabled())
+      .getOrElse(celebornConf.quotaInterruptShuffleEnabled)
+  }
+
+  private def checkUserQuotaSpace(
+      user: UserIdentifier,
+      consumption: ResourceConsumption): QuotaStatus = {
+    val quota = getUserStorageQuota(user)
+    checkQuotaSpace(s"$USER_EXHAUSTED user: $user. ", consumption, quota)
+  }
+
+  private def checkTenantQuotaSpace(
+      tenantId: String,
+      consumption: ResourceConsumption): QuotaStatus = {
+    val quota = getTenantStorageQuota(tenantId)
+    checkQuotaSpace(s"$USER_EXHAUSTED tenant: $tenantId. ", consumption, quota)
+  }
+
+  private def checkClusterQuotaSpace(consumption: ResourceConsumption): 
QuotaStatus = {
+    checkQuotaSpace(CLUSTER_EXHAUSTED, consumption, getClusterStorageQuota)
+  }
+
+  private def checkQuotaSpace(
+      reason: String,
+      consumption: ResourceConsumption,
+      quota: StorageQuota): QuotaStatus = {
     val checkResults = Seq(
-      checkDiskBytesWritten(userIdentifier, 
resourceResumption.diskBytesWritten, quota),
-      checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount, 
quota),
-      checkHdfsBytesWritten(userIdentifier, 
resourceResumption.hdfsBytesWritten, quota),
-      checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount, 
quota))
+      checkQuota(
+        consumption.diskBytesWritten,
+        quota.diskBytesWritten,
+        "DISK_BYTES_WRITTEN",
+        Utils.bytesToString),
+      checkQuota(
+        consumption.diskFileCount,
+        quota.diskFileCount,
+        "DISK_FILE_COUNT",
+        _.toString),
+      checkQuota(
+        consumption.hdfsBytesWritten,
+        quota.hdfsBytesWritten,
+        "HDFS_BYTES_WRITTEN",
+        Utils.bytesToString),
+      checkQuota(
+        consumption.hdfsFileCount,
+        quota.hdfsFileCount,
+        "HDFS_FILE_COUNT",
+        _.toString))
     val exceed = checkResults.foldLeft(false)(_ || _._1)
-    val reason = checkResults.foldLeft("")(_ + _._2)
-    (!exceed, reason)
+    val exceedReason =
+      if (exceed) {
+        s"$reason ${checkResults.foldLeft("")(_ + _._2)}"
+      } else {
+        ""
+      }
+    QuotaStatus(exceed, exceedReason)
   }
 
-  private def checkDiskBytesWritten(
-      userIdentifier: UserIdentifier,
+  private def checkQuota(
       value: Long,
-      quota: Quota): (Boolean, String) = {
-    val exceed = (quota.diskBytesWritten > 0 && value >= 
quota.diskBytesWritten)
+      quota: Long,
+      quotaType: String,
+      format: Long => String): (Boolean, String) = {
+    val exceed = quota > 0 && value >= quota
     var reason = ""
     if (exceed) {
-      reason = s"User $userIdentifier used diskBytesWritten 
(${Utils.bytesToString(value)}) " +
-        s"exceeds quota (${Utils.bytesToString(quota.diskBytesWritten)}). "
+      reason = s"$quotaType(${format(value)}) exceeds quota(${format(quota)}). 
"
       logWarning(reason)
     }
     (exceed, reason)
   }
 
-  private def checkDiskFileCount(
-      userIdentifier: UserIdentifier,
-      value: Long,
-      quota: Quota): (Boolean, String) = {
-    val exceed = (quota.diskFileCount > 0 && value >= quota.diskFileCount)
-    var reason = ""
-    if (exceed) {
-      reason =
-        s"User $userIdentifier used diskFileCount($value) exceeds 
quota(${quota.diskFileCount}). "
-      logWarning(reason)
+  private def checkConsumptionExceeded(
+      used: ResourceConsumption,
+      threshold: StorageQuota): Boolean = {
+    used.diskBytesWritten >= threshold.diskBytesWritten ||
+    used.diskFileCount >= threshold.diskFileCount ||
+    used.hdfsBytesWritten >= threshold.hdfsBytesWritten ||
+    used.hdfsFileCount >= threshold.hdfsFileCount
+  }
+
+  def updateResourceConsumption(): Unit = {
+    masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, 
this.getClass.getSimpleName, Map.empty) {
+      val clusterQuota = getClusterStorageQuota
+      var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0)
+      val activeUsers = mutable.Set[UserIdentifier]()
+
+      val tenantResourceConsumptions =
+        statusSystem.availableWorkers.asScala.flatMap { workerInfo =>
+          workerInfo.userResourceConsumption.asScala
+        }.groupBy(_._1.tenantId).toSeq.map { case (tenantId, 
tenantConsumptionList) =>
+          var tenantResourceConsumption = ResourceConsumption(0, 0, 0, 0)
+          val userResourceConsumptions =
+            tenantConsumptionList.groupBy(_._1).map {
+              case (userIdentifier, userConsumptionList) =>
+                activeUsers.add(userIdentifier)
+                // Step 1: Compute user consumption and set quota status.
+                val resourceConsumptionList = 
userConsumptionList.map(_._2).toSeq
+                val userResourceConsumption =
+                  computeUserResourceConsumption(resourceConsumptionList)
+
+                // Step 2: Update user resource consumption metrics.
+                // For extract metrics
+                registerUserResourceConsumptionMetrics(userIdentifier, 
userResourceConsumption)
+
+                // Step 3: Expire user level exceeded app except already 
expired app
+                clusterResourceConsumption = 
clusterResourceConsumption.add(userResourceConsumption)
+                tenantResourceConsumption = 
tenantResourceConsumption.add(userResourceConsumption)
+                val quotaStatus = checkUserQuotaSpace(userIdentifier, 
userResourceConsumption)
+                userQuotaStatus.put(userIdentifier, quotaStatus)
+                if (interruptShuffleEnabled && quotaStatus.exceed) {
+                  val subResourceConsumptions =
+                    checkUserResourceConsumption(
+                      userIdentifier,
+                      resourceConsumptionList,
+                      userResourceConsumption)
+                  (Some(subResourceConsumptions), resourceConsumptionList)
+                } else {
+                  (None, resourceConsumptionList)
+                }
+            }.toSeq
+
+          val quotaStatus = checkTenantQuotaSpace(tenantId, 
tenantResourceConsumption)
+          tenantQuotaStatus.put(tenantId, quotaStatus)
+          // Expire tenant level exceeded app except already expired app
+          if (interruptShuffleEnabled && quotaStatus.exceed) {
+            val appConsumptions =
+              checkTenantResourceConsumption(
+                tenantId,
+                userResourceConsumptions,
+                tenantResourceConsumption)
+            (Some(appConsumptions), tenantConsumptionList.map(_._2).toSeq)
+          } else {
+            (None, tenantConsumptionList.map(_._2).toSeq)
+          }
+        }
+
+      // Clear expired users/tenant quota status
+      clearQuotaStatus(activeUsers)
+
+      // Expire cluster level exceeded app except already expired app
+      clusterQuotaStatus = checkClusterQuotaSpace(clusterResourceConsumption)
+      if (interruptShuffleEnabled && clusterQuotaStatus.exceed) {
+        checkClusterResourceConsumption(
+          tenantResourceConsumptions,
+          clusterResourceConsumption,
+          clusterQuota)
+      }
     }
-    (exceed, reason)
   }
 
-  private def checkHdfsBytesWritten(
+  def checkUserResourceConsumption(
       userIdentifier: UserIdentifier,
-      value: Long,
-      quota: Quota): (Boolean, String) = {
-    val exceed = (quota.hdfsBytesWritten > 0 && value >= 
quota.hdfsBytesWritten)
-    var reason = ""
-    if (exceed) {
-      reason = s"User $userIdentifier used 
hdfsBytesWritten(${Utils.bytesToString(value)}) " +
-        s"exceeds quota(${Utils.bytesToString(quota.hdfsBytesWritten)}). "
-      logWarning(reason)
+      resourceConsumptionList: Seq[ResourceConsumption],
+      usedResourceConsumption: ResourceConsumption): Seq[(String, 
ResourceConsumption)] = {
+    val appConsumptions = computeSubConsumption(resourceConsumptionList).toSeq
+    // Compute expired size
+    val (expired, notExpired) = appConsumptions.partition { case (app, _) =>
+      appQuotaStatus.containsKey(app)
     }
-    (exceed, reason)
+    val notExpiredUserConsumptions =
+      expired.map(_._2).foldLeft(usedResourceConsumption)(_.subtract(_))
+    expireApplication(
+      notExpiredUserConsumptions,
+      getUserStorageQuota(userIdentifier),
+      notExpired,
+      USER_EXHAUSTED)
+    appConsumptions
   }
 
-  private def checkHdfsFileCount(
+  def checkTenantResourceConsumption(
+      tenantId: String,
+      consumptions: Seq[(Option[Seq[(String, ResourceConsumption)]], 
Seq[ResourceConsumption])],
+      usedResourceConsumption: ResourceConsumption): Seq[(String, 
ResourceConsumption)] = {
+    val appConsumptions = consumptions.map {
+      case (None, subConsumptionList) => 
computeSubConsumption(subConsumptionList)
+      case (Some(subConsumptions), _) => subConsumptions
+    }.flatMap(_.toSeq).toSeq
+
+    // Compute nonExpired app total usage
+    val (expired, notExpired) = appConsumptions.partition { case (app, _) =>
+      appQuotaStatus.containsKey(app)
+    }
+    val notExpiredResourceConsumption =
+      expired.map(_._2).foldLeft(usedResourceConsumption)(_.subtract(_))
+    expireApplication(
+      notExpiredResourceConsumption,
+      getTenantStorageQuota(tenantId),
+      notExpired,
+      TENANT_EXHAUSTED)
+    appConsumptions
+  }
+
+  def checkClusterResourceConsumption(
+      consumptions: Seq[(Option[Seq[(String, ResourceConsumption)]], 
Seq[ResourceConsumption])],
+      usedResourceConsumption: ResourceConsumption,
+      clusterQuota: StorageQuota): Unit = {
+    val appConsumptions = consumptions.map {
+      case (None, subConsumptionList) => 
computeSubConsumption(subConsumptionList)
+      case (Some(subConsumptions), _) => subConsumptions
+    }.flatMap(_.toSeq).toSeq
+
+    // Compute nonExpired app total usage
+    val (expired, notExpired) = appConsumptions.partition { case (app, _) =>
+      appQuotaStatus.containsKey(app)
+    }
+    val notExpiredClusterResourceConsumption =
+      expired.map(_._2).foldLeft(usedResourceConsumption)(_.subtract(_))
+    expireApplication(
+      notExpiredClusterResourceConsumption,
+      clusterQuota,
+      notExpired,
+      CLUSTER_EXHAUSTED)
+  }
+
+  private def expireApplication(
+      used: ResourceConsumption,
+      threshold: StorageQuota,
+      appMap: Seq[(String, ResourceConsumption)],
+      expireReason: String): Unit = {
+    var nonExpired = used
+    if (checkConsumptionExceeded(used, threshold)) {
+      val sortedConsumption =
+        appMap.sortBy(_._2)(Ordering.by((r: ResourceConsumption) =>
+          (
+            r.diskBytesWritten,
+            r.diskFileCount,
+            r.hdfsBytesWritten,
+            r.hdfsFileCount)).reverse)
+      for ((appId, consumption) <- sortedConsumption
+        if checkConsumptionExceeded(nonExpired, threshold)) {
+        val reason = s"$expireReason Used: ${consumption.simpleString}, 
Threshold: $threshold"
+        appQuotaStatus.put(appId, QuotaStatus(exceed = true, reason))
+        nonExpired = nonExpired.subtract(consumption)
+      }
+    }
+  }
+
+  private def computeUserResourceConsumption(
+      consumptions: Seq[ResourceConsumption]): ResourceConsumption = {
+    consumptions.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
+  }
+
+  private def computeSubConsumption(
+      resourceConsumptionList: Seq[ResourceConsumption]): Map[String, 
ResourceConsumption] = {
+    resourceConsumptionList.foldRight(Map.empty[String, ResourceConsumption]) {
+      case (consumption, subConsumption) =>
+        consumption.addSubResourceConsumptions(subConsumption)
+    }
+  }
+
+  private def registerUserResourceConsumptionMetrics(
       userIdentifier: UserIdentifier,
-      value: Long,
-      quota: Quota): (Boolean, String) = {
-    val exceed = (quota.hdfsFileCount > 0 && value >= quota.hdfsFileCount)
-    var reason = ""
-    if (exceed) {
-      reason =
-        s"User $userIdentifier used hdfsFileCount($value) exceeds 
quota(${quota.hdfsFileCount}). "
-      logWarning(reason)
+      resourceConsumption: ResourceConsumption): Unit = {
+    if (resourceConsumptionMetricsEnabled) {
+      resourceConsumptionSource.addGauge(DISK_FILE_COUNT, 
userIdentifier.toMap) { () =>
+        resourceConsumption.diskBytesWritten
+      }
+      resourceConsumptionSource.addGauge(DISK_BYTES_WRITTEN, 
userIdentifier.toMap) { () =>
+        resourceConsumption.diskBytesWritten
+      }
+      resourceConsumptionSource.addGauge(HDFS_FILE_COUNT, 
userIdentifier.toMap) { () =>
+        resourceConsumption.hdfsFileCount
+      }
+      resourceConsumptionSource.addGauge(HDFS_BYTES_WRITTEN, 
userIdentifier.toMap) { () =>
+        resourceConsumption.hdfsBytesWritten
+      }
     }
-    (exceed, reason)
+  }
+
+  private def clearQuotaStatus(activeUsers: mutable.Set[UserIdentifier]): Unit 
= {
+    userQuotaStatus.keySet().removeIf(new Predicate[UserIdentifier] {
+      override def test(userIdentifier: UserIdentifier): Boolean =
+        !activeUsers.contains(userIdentifier)
+    })
+
+    tenantQuotaStatus.keySet().removeIf(new Predicate[String] {
+      override def test(tenantId: String): Boolean =
+        !activeUsers.exists(_.tenantId == tenantId)
+    })
+  }
+
+  def clusterQuotaEnabled: Boolean = {
+    Option(configService)
+      .map(_.getSystemConfigFromCache.clusterQuotaEnabled())
+      .getOrElse(celebornConf.clusterQuotaEnabled)
+  }
+
+  def tenantQuotaEnabled: Boolean = {
+    Option(configService)
+      .map(_.getSystemConfigFromCache.tenantQuotaEnabled())
+      .getOrElse(celebornConf.tenantQuotaEnabled)
+  }
+
+  def userQuotaEnabled: Boolean = {
+    Option(configService)
+      .map(_.getSystemConfigFromCache.userQuotaEnabled())
+      .getOrElse(celebornConf.userQuotaEnabled)
   }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala
similarity index 58%
rename from common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
rename to 
master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala
index 8a8452258..2d7d23835 100644
--- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala
@@ -15,22 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.common.quota
+package org.apache.celeborn.service.deploy.master.quota
 
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.util.Utils
+import QuotaStatus._
 
-case class Quota(
-    diskBytesWritten: Long,
-    diskFileCount: Long,
-    hdfsBytesWritten: Long,
-    hdfsFileCount: Long) extends Logging {
-  override def toString: String = {
-    s"Quota[" +
-      s"diskBytesWritten=${Utils.bytesToString(diskBytesWritten)}, " +
-      s"diskFileCount=$diskFileCount, " +
-      s"hdfsBytesWritten=${Utils.bytesToString(hdfsBytesWritten)}, " +
-      s"hdfsFileCount=$hdfsFileCount" +
-      s"]"
-  }
+case class QuotaStatus(exceed: Boolean = false, exceedReason: String = NORMAL)
+
+object QuotaStatus {
+  val NORMAL: String = ""
+  val CLUSTER_EXHAUSTED: String =
+    "Interrupt application caused by the cluster storage usage reach 
threshold."
+  val TENANT_EXHAUSTED: String =
+    "Interrupt application caused by the tenant storage usage reach threshold."
+  val USER_EXHAUSTED: String =
+    "Interrupt or reject application caused by the user storage usage reach 
threshold."
 }
diff --git a/master/src/test/resources/dynamicConfig-quota.yaml 
b/master/src/test/resources/dynamicConfig-quota-2.yaml
similarity index 65%
copy from master/src/test/resources/dynamicConfig-quota.yaml
copy to master/src/test/resources/dynamicConfig-quota-2.yaml
index 156a3f692..a30b7430e 100644
--- a/master/src/test/resources/dynamicConfig-quota.yaml
+++ b/master/src/test/resources/dynamicConfig-quota-2.yaml
@@ -16,20 +16,20 @@
 #
 -  level: SYSTEM
    config:
-     celeborn.quota.tenant.diskBytesWritten: 1G
-     celeborn.quota.tenant.diskFileCount: 100
-     celeborn.quota.tenant.hdfsBytesWritten: 1G
+     celeborn.quota.user.diskBytesWritten: 1000G
+     celeborn.quota.user.diskFileCount: 100
+     celeborn.quota.user.hdfsBytesWritten: 1G
+     celeborn.quota.cluster.diskBytesWritten: 130G
+     celeborn.quota.interruptShuffle.enabled: true
 
 -  tenantId: tenant_01
    level: TENANT
    config:
-     celeborn.quota.tenant.diskBytesWritten: 10G
-     celeborn.quota.tenant.diskFileCount: 1000
-     celeborn.quota.tenant.hdfsBytesWritten: 10G
+     celeborn.quota.user.diskBytesWritten: 10G
+     celeborn.quota.user.diskFileCount: 1000
+     celeborn.quota.user.hdfsBytesWritten: 10G
    users:
      - name: Jerry
        config:
-         celeborn.quota.tenant.diskBytesWritten: 100G
-         celeborn.quota.tenant.diskFileCount: 10000
-
-
+         celeborn.quota.user.diskBytesWritten: 100G
+         celeborn.quota.user.diskFileCount: 10000
diff --git a/master/src/test/resources/dynamicConfig-quota.yaml 
b/master/src/test/resources/dynamicConfig-quota-3.yaml
similarity index 67%
copy from master/src/test/resources/dynamicConfig-quota.yaml
copy to master/src/test/resources/dynamicConfig-quota-3.yaml
index 156a3f692..30711c07a 100644
--- a/master/src/test/resources/dynamicConfig-quota.yaml
+++ b/master/src/test/resources/dynamicConfig-quota-3.yaml
@@ -16,20 +16,22 @@
 #
 -  level: SYSTEM
    config:
-     celeborn.quota.tenant.diskBytesWritten: 1G
-     celeborn.quota.tenant.diskFileCount: 100
-     celeborn.quota.tenant.hdfsBytesWritten: 1G
+     celeborn.quota.cluster.diskBytesWritten: 300G
+     celeborn.quota.interruptShuffle.enabled: true
 
 -  tenantId: tenant_01
    level: TENANT
    config:
-     celeborn.quota.tenant.diskBytesWritten: 10G
-     celeborn.quota.tenant.diskFileCount: 1000
-     celeborn.quota.tenant.hdfsBytesWritten: 10G
+     celeborn.quota.tenant.diskBytesWritten: 150G
+     celeborn.quota.tenant.diskFileCount: 1500
    users:
      - name: Jerry
        config:
-         celeborn.quota.tenant.diskBytesWritten: 100G
-         celeborn.quota.tenant.diskFileCount: 10000
+         celeborn.quota.user.diskBytesWritten: 100G
+         celeborn.quota.user.diskFileCount: 10000
+     - name: John
+       config:
+         celeborn.quota.user.diskBytesWritten: 100G
+         celeborn.quota.user.diskFileCount: 10000
 
 
diff --git a/master/src/test/resources/dynamicConfig-quota.yaml 
b/master/src/test/resources/dynamicConfig-quota.yaml
index 156a3f692..a829b7c93 100644
--- a/master/src/test/resources/dynamicConfig-quota.yaml
+++ b/master/src/test/resources/dynamicConfig-quota.yaml
@@ -16,20 +16,21 @@
 #
 -  level: SYSTEM
    config:
-     celeborn.quota.tenant.diskBytesWritten: 1G
-     celeborn.quota.tenant.diskFileCount: 100
-     celeborn.quota.tenant.hdfsBytesWritten: 1G
+     celeborn.quota.user.diskBytesWritten: 1G
+     celeborn.quota.user.diskFileCount: 100
+     celeborn.quota.user.hdfsBytesWritten: 1G
+     celeborn.quota.interruptShuffle.enabled: true
 
 -  tenantId: tenant_01
    level: TENANT
    config:
-     celeborn.quota.tenant.diskBytesWritten: 10G
-     celeborn.quota.tenant.diskFileCount: 1000
-     celeborn.quota.tenant.hdfsBytesWritten: 10G
+     celeborn.quota.user.diskBytesWritten: 10G
+     celeborn.quota.user.diskFileCount: 1000
+     celeborn.quota.user.hdfsBytesWritten: 10G
    users:
      - name: Jerry
        config:
-         celeborn.quota.tenant.diskBytesWritten: 100G
-         celeborn.quota.tenant.diskFileCount: 10000
+         celeborn.quota.user.diskBytesWritten: 100G
+         celeborn.quota.user.diskFileCount: 10000
 
 
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
index 90da1cfd1..f8182777c 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
@@ -17,45 +17,114 @@
 
 package org.apache.celeborn.service.deploy.master.quota
 
-import org.junit.Assert.assertEquals
+import java.util
+
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, 
mapAsScalaMapConverter}
+import scala.util.Random
+
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.quota.{Quota, ResourceConsumption}
-import org.apache.celeborn.common.util.Utils
-import 
org.apache.celeborn.server.common.service.config.DynamicConfigServiceFactory
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse
+import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota}
+import org.apache.celeborn.common.rpc.RpcEnv
+import org.apache.celeborn.common.util.{JavaUtils, Utils}
+import org.apache.celeborn.server.common.service.config.{ConfigService, 
DynamicConfigServiceFactory, FsConfigServiceImpl}
+import org.apache.celeborn.service.deploy.master.MasterSource
+import 
org.apache.celeborn.service.deploy.master.clustermeta.{AbstractMetaManager, 
SingleMasterMetaManager}
 
-class QuotaManagerSuite extends CelebornFunSuite {
+class QuotaManagerSuite extends CelebornFunSuite
+  with BeforeAndAfterAll
+  with BeforeAndAfterEach
+  with Logging {
   protected var quotaManager: QuotaManager = _
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    DynamicConfigServiceFactory.reset()
+  private var resourceConsumptionSource: ResourceConsumptionSource = _
+
+  val worker = new WorkerInfo(
+    "localhost",
+    10001,
+    10002,
+    10003,
+    10004)
+
+  var statusSystem: AbstractMetaManager = _
+
+  var rpcEnv: RpcEnv = _
 
-    val conf = new CelebornConf()
+  val workerToResourceConsumptions =
+    JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, 
ResourceConsumption]]()
+
+  val conf = new CelebornConf()
+
+  var configService: ConfigService = _
+
+  override def beforeAll(): Unit = {
     conf.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS")
     conf.set(
       CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key,
       getTestResourceFile("dynamicConfig-quota.yaml").getPath)
-    quotaManager = new QuotaManager(conf, 
DynamicConfigServiceFactory.getConfigService(conf))
+    conf.set("celeborn.master.userResourceConsumption.metrics.enabled", "true")
+    resourceConsumptionSource = new ResourceConsumptionSource(conf, "Master")
+    DynamicConfigServiceFactory.reset()
+    configService = DynamicConfigServiceFactory.getConfigService(conf)
+
+    rpcEnv = RpcEnv.create(
+      "test-rpc",
+      "rpc",
+      "localhost",
+      9001,
+      conf,
+      "master",
+      None)
+    statusSystem = new SingleMasterMetaManager(rpcEnv, conf)
+    statusSystem.availableWorkers.add(worker)
+    quotaManager = new QuotaManager(
+      statusSystem,
+      new MasterSource(conf),
+      resourceConsumptionSource,
+      conf,
+      configService)
+  }
+
+  override def afterAll(): Unit = {
+    rpcEnv.shutdown()
   }
 
   test("test celeborn quota conf") {
+    configService.refreshCache()
     assertEquals(
-      quotaManager.getQuota(UserIdentifier("tenant_01", "Jerry")),
-      Quota(Utils.byteStringAsBytes("100G"), 10000, 
Utils.byteStringAsBytes("10G"), Long.MaxValue))
+      quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", "Jerry")),
+      StorageQuota(
+        Utils.byteStringAsBytes("100G"),
+        10000,
+        Utils.byteStringAsBytes("10G"),
+        Long.MaxValue))
     // Fallback to tenant level
     assertEquals(
-      quotaManager.getQuota(UserIdentifier("tenant_01", "name_not_exist")),
-      Quota(Utils.byteStringAsBytes("10G"), 1000, 
Utils.byteStringAsBytes("10G"), Long.MaxValue))
+      quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", 
"name_not_exist")),
+      StorageQuota(
+        Utils.byteStringAsBytes("10G"),
+        1000,
+        Utils.byteStringAsBytes("10G"),
+        Long.MaxValue))
     // Fallback to system level
     assertEquals(
-      quotaManager.getQuota(UserIdentifier("tenant_not_exist", "Tom")),
-      Quota(Utils.byteStringAsBytes("1G"), 100, Utils.byteStringAsBytes("1G"), 
Long.MaxValue))
+      quotaManager.getUserStorageQuota(UserIdentifier("tenant_not_exist", 
"Tom")),
+      StorageQuota(
+        Utils.byteStringAsBytes("1G"),
+        100,
+        Utils.byteStringAsBytes("1G"),
+        Long.MaxValue))
   }
 
-  test("test check quota return result") {
+  test("test check user quota return result") {
     val user = UserIdentifier("tenant_01", "Jerry")
     val rc1 =
       ResourceConsumption(Utils.byteStringAsBytes("10G"), 20, 
Utils.byteStringAsBytes("1G"), 40)
@@ -68,22 +137,558 @@ class QuotaManagerSuite extends CelebornFunSuite {
         Utils.byteStringAsBytes("30G"),
         40)
 
-    val res1 = quotaManager.checkQuotaSpaceAvailable(user, rc1)
-    val res2 = quotaManager.checkQuotaSpaceAvailable(user, rc2)
-    val res3 = quotaManager.checkQuotaSpaceAvailable(user, rc3)
+    addUserConsumption(user, rc1)
+    quotaManager.updateResourceConsumption()
+    val res1 = checkUserQuota(user)
+
+    addUserConsumption(user, rc2)
+    quotaManager.updateResourceConsumption()
+    val res2 = checkUserQuota(user)
 
-    val exp1 = (true, "")
-    val exp2 = (
+    addUserConsumption(user, rc3)
+    quotaManager.updateResourceConsumption()
+    val res3 = checkUserQuota(user)
+
+    val exp1 = CheckQuotaResponse(true, "")
+    val exp2 = CheckQuotaResponse(
       false,
-      s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ")
-    val exp3 = (
+      s"Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        s"user: `tenant_01`.`Jerry`.  " +
+        s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ")
+    val exp3 = CheckQuotaResponse(
       false,
-      s"User $user used diskBytesWritten (200.0 GiB) exceeds quota (100.0 
GiB). " +
-        s"User $user used diskFileCount(20000) exceeds quota(10000). " +
-        s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). 
")
+      s"Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        s"user: `tenant_01`.`Jerry`.  " +
+        s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " +
+        s"DISK_FILE_COUNT(20000) exceeds quota(10000). " +
+        s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ")
 
     assert(res1 == exp1)
     assert(res2 == exp2)
     assert(res3 == exp3)
+    clearUserConsumption()
+  }
+
+  test("test check application quota return result") {
+    val user = UserIdentifier("tenant_01", "Jerry")
+    var rc =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("200G"),
+        20000,
+        Utils.byteStringAsBytes("30G"),
+        40)
+    rc.withSubResourceConsumptions(
+      Map(
+        "app1" -> ResourceConsumption(
+          Utils.byteStringAsBytes("150G"),
+          15000,
+          Utils.byteStringAsBytes("25G"),
+          20),
+        "app2" -> ResourceConsumption(
+          Utils.byteStringAsBytes("50G"),
+          5000,
+          Utils.byteStringAsBytes("5G"),
+          20)).asJava)
+
+    addUserConsumption(user, rc)
+    conf.set("celeborn.quota.cluster.diskBytesWritten", "60gb")
+    configService.refreshCache()
+    quotaManager.updateResourceConsumption()
+    var res1 = checkUserQuota(user)
+    var res2 = checkApplicationQuota(user, "app1")
+    var res3 = checkApplicationQuota(user, "app2")
+
+    val succeed = CheckQuotaResponse(true, "")
+    val failed = CheckQuotaResponse(
+      false,
+      s"Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        s"user: `tenant_01`.`Jerry`.  " +
+        s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " +
+        s"DISK_FILE_COUNT(20000) exceeds quota(10000). " +
+        s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ")
+    assert(res1 == failed)
+    assert(res2 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: " +
+        "ResourceConsumption(" +
+        "diskBytesWritten: 150.0 GiB, " +
+        "diskFileCount: 15000, " +
+        "hdfsBytesWritten: 25.0 GiB, " +
+        "hdfsFileCount: 20), " +
+        "Threshold: " +
+        "Quota[" +
+        "diskBytesWritten=100.0 GiB, " +
+        "diskFileCount=10000, " +
+        "hdfsBytesWritten=10.0 GiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res3 == succeed)
+
+    conf.set("celeborn.quota.cluster.diskBytesWritten", "50gb")
+    configService.refreshCache()
+    quotaManager.updateResourceConsumption()
+    res1 = checkUserQuota(user)
+    res2 = checkApplicationQuota(user, "app1")
+    res3 = checkApplicationQuota(user, "app2")
+
+    assert(res1 == failed)
+    assert(res2 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 150.0 GiB, " +
+        "diskFileCount: 15000, " +
+        "hdfsBytesWritten: 25.0 GiB, " +
+        "hdfsFileCount: 20), " +
+        "Threshold: Quota[" +
+        "diskBytesWritten=100.0 GiB, " +
+        "diskFileCount=10000, " +
+        "hdfsBytesWritten=10.0 GiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res3 == CheckQuotaResponse(
+      false,
+      "Interrupt application caused by the cluster storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 50.0 GiB, " +
+        "diskFileCount: 5000, " +
+        "hdfsBytesWritten: 5.0 GiB, " +
+        "hdfsFileCount: 20), " +
+        "Threshold: " +
+        "Quota[" +
+        "diskBytesWritten=50.0 GiB, " +
+        "diskFileCount=9223372036854775807, " +
+        "hdfsBytesWritten=8.0 EiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    clearUserConsumption()
+
+    rc =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("50G"),
+        1000,
+        Utils.byteStringAsBytes("5G"),
+        40)
+    rc.withSubResourceConsumptions(
+      Map(
+        "app1" -> ResourceConsumption(
+          Utils.byteStringAsBytes("40G"),
+          500,
+          Utils.byteStringAsBytes("3G"),
+          20),
+        "app2" -> ResourceConsumption(
+          Utils.byteStringAsBytes("10G"),
+          500,
+          Utils.byteStringAsBytes("2G"),
+          20)).asJava)
+
+    addUserConsumption(user, rc)
+    conf.set("celeborn.quota.cluster.diskBytesWritten", "20gb")
+    configService.refreshCache()
+    quotaManager.updateResourceConsumption()
+
+    res1 = checkUserQuota(user)
+    res2 = checkApplicationQuota(user, "app1")
+    res3 = checkApplicationQuota(user, "app2")
+
+    assert(res1 == CheckQuotaResponse(
+      false,
+      "Interrupt application caused by the cluster storage usage reach 
threshold. " +
+        "DISK_BYTES_WRITTEN(50.0 GiB) exceeds quota(20.0 GiB). "))
+    assert(res2 == CheckQuotaResponse(
+      false,
+      "Interrupt application caused by the cluster storage usage reach 
threshold. " +
+        "Used: " +
+        "ResourceConsumption(" +
+        "diskBytesWritten: 40.0 GiB, " +
+        "diskFileCount: 500, " +
+        "hdfsBytesWritten: 3.0 GiB, " +
+        "hdfsFileCount: 20), " +
+        "Threshold: " +
+        "Quota[diskBytesWritten=20.0 GiB, " +
+        "diskFileCount=9223372036854775807, " +
+        "hdfsBytesWritten=8.0 EiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res3 == CheckQuotaResponse(true, ""))
+
+    clearUserConsumption()
+  }
+
+  test("test handleResourceConsumption time - case1") {
+    // 1000 users 100wapplications, all exceeded
+    conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb")
+    conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb")
+    configService.refreshCache()
+    val MAX = 2L * 1024 * 1024 * 1024
+    val MIN = 1L * 1024 * 1024 * 1024
+    val random = new Random()
+    for (i <- 0 until 1000) {
+      val user = UserIdentifier("default", s"user$i")
+      val subResourceConsumption = (0 until 1000).map {
+        index =>
+          val appId = s"$user$i app$index"
+          val consumption = ResourceConsumption(
+            MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+            MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+            MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+            MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+          (appId, consumption)
+      }.toMap
+      val userConsumption = subResourceConsumption.values.foldRight(
+        ResourceConsumption(0, 0, 0, 0))(_ add _)
+      userConsumption.subResourceConsumptions = subResourceConsumption.asJava
+      addUserConsumption(user, userConsumption)
+    }
+
+    val start = System.currentTimeMillis()
+    quotaManager.updateResourceConsumption()
+    val duration = System.currentTimeMillis() - start
+    print(s"duration=$duration")
+
+    val res = resourceConsumptionSource.getMetrics
+    for (i <- 0 until 1000) {
+      val user = UserIdentifier("default", s"user$i")
+      assert(res.contains(
+        
s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable)
+      (0 until 1000).foreach {
+        index =>
+          val appId = s"$user$i app$index"
+          
assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable)
+      }
+    }
+    clearUserConsumption()
+  }
+
+  test("test handleResourceConsumption time - case2") {
+    // 1000 users 2000000 applications, all exceeded
+    conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb")
+    conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb")
+    configService.refreshCache()
+    val MAX = 2L * 1024 * 1024 * 1024
+    val MIN = 1L * 1024 * 1024 * 1024
+    val random = new Random()
+    for (i <- 0 until 1000) {
+      val user = UserIdentifier("default", s"user$i")
+      val subResourceConsumption =
+        if (i < 100) {
+          (0 until 1000).map {
+            index =>
+              val appId = s"$user$i case2_app$index"
+              val consumption = ResourceConsumption(
+                MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+                MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+                MIN + Math.abs(random.nextLong()) % (MAX - MIN),
+                MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+              (appId, consumption)
+          }.toMap
+        } else {
+          (0 until 1000).map {
+            index =>
+              val appId = s"$user$i case2_app$index"
+              val consumption = ResourceConsumption(0, 0, 0, 0)
+              (appId, consumption)
+          }.toMap
+        }
+      val userConsumption = subResourceConsumption.values.foldRight(
+        ResourceConsumption(0, 0, 0, 0))(_ add _)
+      userConsumption.subResourceConsumptions = subResourceConsumption.asJava
+      addUserConsumption(user, userConsumption)
+    }
+
+    val start = System.currentTimeMillis()
+    quotaManager.updateResourceConsumption()
+    val duration = System.currentTimeMillis() - start
+    print(s"duration=$duration")
+
+    val res = resourceConsumptionSource.getMetrics
+    for (i <- 0 until 1000) {
+      val user = UserIdentifier("default", s"user$i")
+      assert(res.contains(
+        
s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assert(res.contains(
+        
s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}"""))
+      assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable)
+      (0 until 1000).foreach {
+        index =>
+          val appId = s"$user$i case2_app$index"
+          if (i < 100) {
+            
assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable)
+          } else {
+            
assertTrue(quotaManager.checkApplicationQuotaStatus(appId).isAvailable)
+          }
+      }
+    }
+    clearUserConsumption()
+  }
+
+  test("test user level conf") {
+    val conf1 = new CelebornConf()
+    conf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS")
+    conf1.set(
+      CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key,
+      getTestResourceFile("dynamicConfig-quota-2.yaml").getPath)
+    val rpcEnv = RpcEnv.create(
+      "test-rpc",
+      "rpc",
+      "localhost",
+      9002,
+      conf,
+      "master",
+      None)
+    val statusSystem1 = new SingleMasterMetaManager(rpcEnv, conf)
+    statusSystem1.availableWorkers.add(worker)
+    val quotaManager1 = new QuotaManager(
+      statusSystem1,
+      new MasterSource(conf1),
+      resourceConsumptionSource,
+      conf1,
+      new FsConfigServiceImpl(conf1))
+
+    val user = UserIdentifier("tenant_01", "Jerry")
+    val user1 = UserIdentifier("tenant_01", "John")
+
+    val rc =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("200G"),
+        20000,
+        Utils.byteStringAsBytes("30G"),
+        40)
+    rc.withSubResourceConsumptions(
+      Map(
+        "app1" -> ResourceConsumption(
+          Utils.byteStringAsBytes("150G"),
+          15000,
+          Utils.byteStringAsBytes("25G"),
+          20),
+        "app2" -> ResourceConsumption(
+          Utils.byteStringAsBytes("50G"),
+          5000,
+          Utils.byteStringAsBytes("5G"),
+          20)).asJava)
+
+    val rc1 =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("80G"),
+        0,
+        0,
+        0)
+
+    rc1.withSubResourceConsumptions(
+      Map(
+        "app3" -> ResourceConsumption(
+          Utils.byteStringAsBytes("80G"),
+          0,
+          0,
+          0)).asJava)
+
+    addUserConsumption(user, rc)
+    addUserConsumption(user1, rc1)
+
+    quotaManager1.updateResourceConsumption()
+    val res1 = quotaManager1.checkUserQuotaStatus(user)
+    val res2 = quotaManager1.checkApplicationQuotaStatus("app1")
+    val res3 = quotaManager1.checkApplicationQuotaStatus("app2")
+    val res4 = quotaManager1.checkApplicationQuotaStatus("app3")
+    assert(res1 == CheckQuotaResponse(
+      false,
+      s"Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        s"user: `tenant_01`.`Jerry`.  " +
+        s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " +
+        s"DISK_FILE_COUNT(20000) exceeds quota(10000). " +
+        s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). "))
+    assert(res2 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 150.0 GiB, " +
+        "diskFileCount: 15000, " +
+        "hdfsBytesWritten: 25.0 GiB, " +
+        "hdfsFileCount: 20), " +
+        "Threshold: " +
+        "Quota[" +
+        "diskBytesWritten=100.0 GiB, " +
+        "diskFileCount=10000, " +
+        "hdfsBytesWritten=10.0 GiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res3 == CheckQuotaResponse(true, ""))
+    assert(res4 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: " +
+        "ResourceConsumption(" +
+        "diskBytesWritten: 80.0 GiB, " +
+        "diskFileCount: 0, " +
+        "hdfsBytesWritten: 0.0 B, " +
+        "hdfsFileCount: 0), " +
+        "Threshold: " +
+        "Quota[" +
+        "diskBytesWritten=10.0 GiB, " +
+        "diskFileCount=1000, " +
+        "hdfsBytesWritten=10.0 GiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+
+    clearUserConsumption()
+  }
+
+  test("test tenant level conf") {
+    val conf1 = new CelebornConf()
+    conf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS")
+    conf1.set(
+      CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key,
+      getTestResourceFile("dynamicConfig-quota-3.yaml").getPath)
+    val rpcEnv = RpcEnv.create(
+      "test-rpc",
+      "rpc",
+      "localhost",
+      9003,
+      conf,
+      "master",
+      None)
+    val statusSystem1 = new SingleMasterMetaManager(rpcEnv, conf)
+    statusSystem1.availableWorkers.add(worker)
+    val quotaManager1 = new QuotaManager(
+      statusSystem1,
+      new MasterSource(conf1),
+      resourceConsumptionSource,
+      conf1,
+      new FsConfigServiceImpl(conf1))
+
+    val user1 = UserIdentifier("tenant_01", "Jerry")
+    val user2 = UserIdentifier("tenant_01", "John")
+
+    val rc1 =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("230G"),
+        0,
+        0,
+        0)
+    rc1.withSubResourceConsumptions(
+      Map(
+        "app1" -> ResourceConsumption(
+          Utils.byteStringAsBytes("150G"),
+          0,
+          0,
+          0),
+        "app2" -> ResourceConsumption(
+          Utils.byteStringAsBytes("80G"),
+          0,
+          0,
+          0)).asJava)
+
+    val rc2 =
+      ResourceConsumption(
+        Utils.byteStringAsBytes("220G"),
+        0,
+        0,
+        0)
+
+    rc2.withSubResourceConsumptions(
+      Map(
+        "app3" -> ResourceConsumption(
+          Utils.byteStringAsBytes("150G"),
+          0,
+          0,
+          0),
+        "app4" -> ResourceConsumption(
+          Utils.byteStringAsBytes("70G"),
+          0,
+          0,
+          0)).asJava)
+
+    addUserConsumption(user1, rc1)
+    addUserConsumption(user2, rc2)
+
+    quotaManager1.updateResourceConsumption()
+    val res1 = quotaManager1.checkUserQuotaStatus(user1)
+    val res2 = quotaManager1.checkUserQuotaStatus(user2)
+    val res3 = quotaManager1.checkApplicationQuotaStatus("app1")
+    val res4 = quotaManager1.checkApplicationQuotaStatus("app2")
+    val res5 = quotaManager1.checkApplicationQuotaStatus("app3")
+    val res6 = quotaManager1.checkApplicationQuotaStatus("app4")
+    assert(res1 == CheckQuotaResponse(
+      false,
+      "" +
+        "Interrupt or reject application caused by the user storage usage 
reach threshold. " +
+        "user: `tenant_01`.`Jerry`.  DISK_BYTES_WRITTEN(230.0 GiB) exceeds 
quota(100.0 GiB). "))
+    assert(res2 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "user: `tenant_01`.`John`.  DISK_BYTES_WRITTEN(220.0 GiB) exceeds 
quota(100.0 GiB). "))
+    assert(res3 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 150.0 GiB, " +
+        "diskFileCount: 0, " +
+        "hdfsBytesWritten: 0.0 B, " +
+        "hdfsFileCount: 0), " +
+        "Threshold: Quota[" +
+        "diskBytesWritten=100.0 GiB, " +
+        "diskFileCount=10000, " +
+        "hdfsBytesWritten=8.0 EiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res4 == CheckQuotaResponse(
+      false,
+      "Interrupt application caused by the tenant storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 80.0 GiB, " +
+        "diskFileCount: 0, " +
+        "hdfsBytesWritten: 0.0 B, " +
+        "hdfsFileCount: 0), " +
+        "Threshold: Quota[" +
+        "diskBytesWritten=150.0 GiB, " +
+        "diskFileCount=1500, " +
+        "hdfsBytesWritten=8.0 EiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res5 == CheckQuotaResponse(
+      false,
+      "Interrupt or reject application caused by the user storage usage reach 
threshold. " +
+        "Used: ResourceConsumption(" +
+        "diskBytesWritten: 150.0 GiB, " +
+        "diskFileCount: 0, " +
+        "hdfsBytesWritten: 0.0 B, " +
+        "hdfsFileCount: 0), " +
+        "Threshold: Quota[" +
+        "diskBytesWritten=100.0 GiB, " +
+        "diskFileCount=10000, " +
+        "hdfsBytesWritten=8.0 EiB, " +
+        "hdfsFileCount=9223372036854775807]"))
+    assert(res6 == CheckQuotaResponse(true, ""))
+    clearUserConsumption()
+  }
+
+  def checkUserQuota(userIdentifier: UserIdentifier): CheckQuotaResponse = {
+    quotaManager.checkUserQuotaStatus(userIdentifier)
+  }
+
+  def checkApplicationQuota(
+      userIdentifier: UserIdentifier,
+      applicationId: String): CheckQuotaResponse = {
+    quotaManager.checkApplicationQuotaStatus(applicationId)
+  }
+
+  def addUserConsumption(
+      userIdentifier: UserIdentifier,
+      resourceConsumption: ResourceConsumption): Unit = {
+    worker.userResourceConsumption.put(userIdentifier, resourceConsumption)
+    workerToResourceConsumptions.put(worker.toUniqueId, 
worker.userResourceConsumption)
+  }
+
+  def clearUserConsumption(): Unit = {
+    val applicationSet = worker.userResourceConsumption.asScala.values.flatMap 
{ consumption =>
+      Option(consumption.subResourceConsumptions).map(_.asScala.keySet)
+    }.flatten.toSet
+
+    applicationSet.foreach(quotaManager.handleAppLost)
+    worker.userResourceConsumption.clear()
   }
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
index c09ac0823..d55145f33 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.internal.config.ConfigEntry;
-import org.apache.celeborn.common.quota.Quota;
+import org.apache.celeborn.common.quota.StorageQuota;
 import org.apache.celeborn.common.quota.UserTrafficQuota;
 import org.apache.celeborn.common.quota.WorkerTrafficQuota;
 import org.apache.celeborn.common.tags.WorkerTagsMeta;
@@ -44,7 +44,6 @@ import org.apache.celeborn.common.util.Utils;
 public abstract class DynamicConfig {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicConfig.class);
   protected volatile Map<String, String> configs = new HashMap<>();
-  protected volatile Quota quota = null;
   protected volatile Map<String, Set<String>> tags = null;
 
   public abstract DynamicConfig getParentLevelConfig();
@@ -94,37 +93,26 @@ public abstract class DynamicConfig {
     return null;
   }
 
-  public Quota getQuota() {
-    if (quota == null) {
-      synchronized (DynamicConfig.class) {
-        if (quota == null) {
-          quota = currentQuota();
-        }
-      }
-    }
-    return quota;
-  }
-
-  protected Quota currentQuota() {
-    return new Quota(
+  public StorageQuota getTenantStorageQuota() {
+    return new StorageQuota(
         getValue(
-            CelebornConf.QUOTA_DISK_BYTES_WRITTEN().key(),
-            CelebornConf.QUOTA_DISK_BYTES_WRITTEN(),
+            CelebornConf.QUOTA_TENANT_DISK_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_TENANT_DISK_BYTES_WRITTEN(),
             Long.TYPE,
             ConfigType.BYTES),
         getValue(
-            CelebornConf.QUOTA_DISK_FILE_COUNT().key(),
-            CelebornConf.QUOTA_DISK_FILE_COUNT(),
+            CelebornConf.QUOTA_TENANT_DISK_FILE_COUNT().key(),
+            CelebornConf.QUOTA_TENANT_DISK_FILE_COUNT(),
             Long.TYPE,
             ConfigType.STRING),
         getValue(
-            CelebornConf.QUOTA_HDFS_BYTES_WRITTEN().key(),
-            CelebornConf.QUOTA_HDFS_BYTES_WRITTEN(),
+            CelebornConf.QUOTA_TENANT_HDFS_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_TENANT_HDFS_BYTES_WRITTEN(),
             Long.TYPE,
             ConfigType.BYTES),
         getValue(
-            CelebornConf.QUOTA_HDFS_FILE_COUNT().key(),
-            CelebornConf.QUOTA_HDFS_FILE_COUNT(),
+            CelebornConf.QUOTA_TENANT_HDFS_FILE_COUNT().key(),
+            CelebornConf.QUOTA_TENANT_HDFS_FILE_COUNT(),
             Long.TYPE,
             ConfigType.STRING));
   }
@@ -181,6 +169,86 @@ public abstract class DynamicConfig {
             ConfigType.STRING));
   }
 
+  public StorageQuota getClusterStorageQuota() {
+    return new StorageQuota(
+        getValue(
+            CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN(),
+            Long.TYPE,
+            ConfigType.BYTES),
+        getValue(
+            CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT().key(),
+            CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT(),
+            Long.TYPE,
+            ConfigType.STRING),
+        getValue(
+            CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN(),
+            Long.TYPE,
+            ConfigType.BYTES),
+        getValue(
+            CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT().key(),
+            CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT(),
+            Long.TYPE,
+            ConfigType.STRING));
+  }
+
+  public StorageQuota getUserStorageQuota() {
+    return new StorageQuota(
+        getValue(
+            CelebornConf.QUOTA_USER_DISK_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_USER_DISK_BYTES_WRITTEN(),
+            Long.TYPE,
+            ConfigType.BYTES),
+        getValue(
+            CelebornConf.QUOTA_USER_DISK_FILE_COUNT().key(),
+            CelebornConf.QUOTA_USER_DISK_FILE_COUNT(),
+            Long.TYPE,
+            ConfigType.STRING),
+        getValue(
+            CelebornConf.QUOTA_USER_HDFS_BYTES_WRITTEN().key(),
+            CelebornConf.QUOTA_USER_HDFS_BYTES_WRITTEN(),
+            Long.TYPE,
+            ConfigType.BYTES),
+        getValue(
+            CelebornConf.QUOTA_USER_HDFS_FILE_COUNT().key(),
+            CelebornConf.QUOTA_USER_HDFS_FILE_COUNT(),
+            Long.TYPE,
+            ConfigType.STRING));
+  }
+
+  public boolean interruptShuffleEnabled() {
+    return getValue(
+        CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key(),
+        CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED(),
+        Boolean.TYPE,
+        ConfigType.BOOLEAN);
+  }
+
+  public boolean clusterQuotaEnabled() {
+    return getValue(
+        CelebornConf.CLUSTER_QUOTA_ENABLED().key(),
+        CelebornConf.CLUSTER_QUOTA_ENABLED(),
+        Boolean.TYPE,
+        ConfigType.BOOLEAN);
+  }
+
+  public boolean tenantQuotaEnabled() {
+    return getValue(
+        CelebornConf.TENANT_QUOTA_ENABLED().key(),
+        CelebornConf.TENANT_QUOTA_ENABLED(),
+        Boolean.TYPE,
+        ConfigType.BOOLEAN);
+  }
+
+  public boolean userQuotaEnabled() {
+    return getValue(
+        CelebornConf.USER_QUOTA_ENABLED().key(),
+        CelebornConf.USER_QUOTA_ENABLED(),
+        Boolean.TYPE,
+        ConfigType.BOOLEAN);
+  }
+
   public Map<String, String> getConfigs() {
     return configs;
   }
@@ -212,6 +280,7 @@ public abstract class DynamicConfig {
     BYTES,
     STRING,
     TIME_MS,
+    BOOLEAN
   }
 
   public static <T> T convert(Class<T> clazz, String value) {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 5a5ab2506..a3c808a67 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -919,18 +919,15 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
         // userIdentifier -> List((userIdentifier, (applicationId, fileInfo))))
         .groupBy(_._1)
-        .map { case (userIdentifier, userWithFileInfoList) =>
+        .mapValues { userWithFileInfoList =>
           // collect resource consumed by each user on this worker
-          val userFileInfos = userWithFileInfoList.map(_._2)
-          (
-            userIdentifier,
-            resourceConsumption(
-              userFileInfos.map(_._2),
-              userFileInfos.groupBy(_._1).map {
-                case (applicationId, appWithFileInfoList) =>
-                  (applicationId, 
resourceConsumption(appWithFileInfoList.map(_._2)))
-              }.asJava))
-        }
+          val subResourceConsumption = 
userWithFileInfoList.map(_._2).groupBy(_._1).map {
+            case (applicationId, appWithFileInfoList) =>
+              (applicationId, 
resourceConsumption(appWithFileInfoList.map(_._2)))
+          }
+          subResourceConsumption.values.foldLeft(ResourceConsumption(0, 0, 0, 
0))(_ add _)
+            .withSubResourceConsumptions(subResourceConsumption.asJava)
+        }.toMap
     }
   }
 


Reply via email to