RexXiong commented on code in PR #2819:
URL: https://github.com/apache/celeborn/pull/2819#discussion_r1998856916


##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java:
##########
@@ -227,7 +227,6 @@ public ResourceResponse 
handleWriteRequest(ResourceProtos.ResourceRequest reques
               fetchPort,
               replicatePort,
               diskInfos,
-              userResourceConsumption,
               request.getWorkerHeartbeatRequest().getTime(),

Review Comment:
   line 202 and line 244 userResourceConsumption can be also removed



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -6051,4 +6053,84 @@ object CelebornConf extends Logging {
       .doubleConf
       .checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
       .createWithDefault(1)
+
+  val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)

Review Comment:
   ditto



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -6051,4 +6053,84 @@ object CelebornConf extends Logging {
       .doubleConf
       .checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
       .createWithDefault(1)
+
+  val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.user.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("User level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)

Review Comment:
   ditto



##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala:
##########
@@ -16,96 +16,393 @@
  */
 package org.apache.celeborn.service.deploy.master.quota
 
+import java.util.{Map => JMap}
+import java.util.concurrent.TimeUnit
+import java.util.function.Predicate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.quota.{Quota, ResourceConsumption}
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource
+import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource._
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse
+import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota}
+import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.service.config.ConfigService
+import org.apache.celeborn.service.deploy.master.MasterSource
+import 
org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME
+import 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager
+import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._
+
+class QuotaManager(
+    statusSystem: AbstractMetaManager,
+    masterSource: MasterSource,
+    resourceConsumptionSource: ResourceConsumptionSource,
+    celebornConf: CelebornConf,
+    configService: ConfigService) extends Logging {
 
-class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) 
extends Logging {
-  private val DEFAULT_QUOTA = Quota(
-    celebornConf.get(CelebornConf.QUOTA_DISK_BYTES_WRITTEN),
-    celebornConf.get(CelebornConf.QUOTA_DISK_FILE_COUNT),
-    celebornConf.get(CelebornConf.QUOTA_HDFS_BYTES_WRITTEN),
-    celebornConf.get(CelebornConf.QUOTA_HDFS_FILE_COUNT))
-  def getQuota(userIdentifier: UserIdentifier): Quota = {
-    if (configService != null) {
-      val config =
-        configService.getTenantUserConfigFromCache(userIdentifier.tenantId, 
userIdentifier.name)
-      config.getQuota
+  val userQuotaStatus: JMap[UserIdentifier, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  val tenantQuotaStatus: JMap[String, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  val resourceConsumptionMetricsEnabled = 
celebornConf.masterResourceConsumptionMetricsEnabled
+  @volatile
+  var clusterQuotaStatus: QuotaStatus = new QuotaStatus()
+  val appQuotaStatus: JMap[String, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  private val quotaChecker =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker")
+  quotaChecker.scheduleWithFixedDelay(
+    new Runnable {
+      override def run(): Unit = {
+        try {
+          updateResourceConsumption()
+        } catch {
+          case t: Throwable => logError("Update user resource consumption 
failed.", t)
+        }
+      }
+    },
+    0L,
+    celebornConf.masterResourceConsumptionInterval,
+    TimeUnit.MILLISECONDS)
+
+  def handleAppLost(appId: String): Unit = {
+    appQuotaStatus.remove(appId)
+  }
+
+  def checkUserQuotaStatus(user: UserIdentifier): CheckQuotaResponse = {
+    val quotaEnabled =

Review Comment:
   I think it's acceptable to verify 
`_.getSystemConfigFromCache().quotaEnabledForTenant/User/Cluster ` to determine 
if quota checking is enabled at the system level. Supporting configuration at 
the tenant or user level adds unnecessary complexity.



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -6051,4 +6053,84 @@ object CelebornConf extends Logging {
       .doubleConf
       .checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
       .createWithDefault(1)
+
+  val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.diskFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written disk file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsBytesWritten")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs bytes.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)
+
+  val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
+    buildConf("celeborn.quota.cluster.hdfsFileCount")
+      .categories("quota")
+      .dynamic
+      .doc("Cluster level quota dynamic configuration for written hdfs file 
count.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)

Review Comment:
   longConf~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to