This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ddab27a1 [CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse 
(#1093)
ddab27a1 is described below

commit ddab27a1d777f7758fb59f95f5e4797c4617eb35
Author: nafiy <[email protected]>
AuthorDate: Thu Dec 15 18:16:34 2022 +0800

    [CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse (#1093)
    
    * [CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse
---
 .../celeborn/RssShuffleFallbackPolicyRunner.scala  | 11 ++--
 .../celeborn/RssShuffleFallbackPolicyRunner.scala  | 11 ++--
 .../apache/celeborn/client/LifecycleManager.scala  |  9 ++--
 common/src/main/proto/TransportMessages.proto      |  1 +
 .../common/protocol/message/ControlMessages.scala  |  9 ++--
 .../org/apache/celeborn/common/quota/Quota.scala   | 56 ++++++++++++--------
 .../apache/celeborn/common/quota/QuotaSuite.scala  | 61 ++++++++++++++++++++++
 .../celeborn/service/deploy/master/Master.scala    |  5 +-
 8 files changed, 123 insertions(+), 40 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
index 05e90ac1..107591bd 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
@@ -53,17 +53,18 @@ class RssShuffleFallbackPolicyRunner(conf: CelebornConf) 
extends Logging {
   /**
    * If rss cluster is exceed current user's quota, fallback to external 
shuffle
    *
-   * @return if rss cluster usage of current user's percent is overhead the 
limit
+   * @return if rss cluster have available space for current user
    */
   def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
     if (!conf.quotaEnabled) {
       return true
     }
 
-    val available = lifecycleManager.checkQuota()
-    if (!available) {
-      logWarning(s"Quota exceed for current user 
${lifecycleManager.getUserIdentifier}.")
+    val resp = lifecycleManager.checkQuota()
+    if (!resp.isAvailable) {
+      logWarning(
+        s"Quota exceed for current user ${lifecycleManager.getUserIdentifier}. 
Because: ${resp.reason}")
     }
-    available
+    resp.isAvailable
   }
 }
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
index b1b33524..a52f0b37 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
@@ -59,17 +59,18 @@ class RssShuffleFallbackPolicyRunner(conf: CelebornConf) 
extends Logging {
   /**
    * If rss cluster is exceed current user's quota, fallback to external 
shuffle
    *
-   * @return if rss cluster have available space for current user.
+   * @return if rss cluster have available space for current user
    */
   def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
     if (!conf.quotaEnabled) {
       return true
     }
 
-    val available = lifecycleManager.checkQuota()
-    if (!available) {
-      logWarning(s"Quota exceed for current user 
${lifecycleManager.getUserIdentifier}.")
+    val resp = lifecycleManager.checkQuota()
+    if (!resp.isAvailable) {
+      logWarning(
+        s"Quota exceed for current user ${lifecycleManager.getUserIdentifier}. 
Because: ${resp.reason}")
     }
-    available
+    resp.isAvailable
   }
 }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 06fbb2a5..d6e39d2c 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1291,15 +1291,16 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     }
   }
 
-  def checkQuota(): Boolean = {
+  def checkQuota(): CheckQuotaResponse = {
     try {
       rssHARetryClient.askSync[CheckQuotaResponse](
         CheckQuota(userIdentifier),
-        classOf[CheckQuotaResponse]).isAvailable
+        classOf[CheckQuotaResponse])
     } catch {
       case e: Exception =>
-        logError(s"AskSync Cluster check quota for $userIdentifier failed.", e)
-        false
+        val msg = s"AskSync Cluster check quota for $userIdentifier failed."
+        logError(msg, e)
+        CheckQuotaResponse(false, msg)
     }
   }
 
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index ae4f3816..a19ed793 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -295,6 +295,7 @@ message PbCheckQuota {
 
 message PbCheckQuotaResponse {
   bool available = 1;
+  string reason = 2;
 }
 
 message PbReportWorkerUnavailable {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index d8255005..a786807b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -327,7 +327,7 @@ object ControlMessages extends Logging {
 
   case class CheckQuota(userIdentifier: UserIdentifier) extends Message
 
-  case class CheckQuotaResponse(isAvailable: Boolean) extends Message
+  case class CheckQuotaResponse(isAvailable: Boolean, reason: String) extends 
Message
 
   case class ReportWorkerUnavailable(
       unavailable: util.List[WorkerInfo],
@@ -634,9 +634,10 @@ object ControlMessages extends Logging {
         MessageType.CHECK_QUOTA,
         builder.build().toByteArray)
 
-    case CheckQuotaResponse(available) =>
+    case CheckQuotaResponse(available, reason) =>
       val payload = PbCheckQuotaResponse.newBuilder()
         .setAvailable(available)
+        .setReason(reason)
         .build().toByteArray
       new TransportMessage(MessageType.CHECK_QUOTA_RESPONSE, payload)
 
@@ -950,7 +951,9 @@ object ControlMessages extends Logging {
       case CHECK_QUOTA_RESPONSE =>
         val pbCheckAvailableResponse = PbCheckQuotaResponse
           .parseFrom(message.getPayload)
-        CheckQuotaResponse(pbCheckAvailableResponse.getAvailable)
+        CheckQuotaResponse(
+          pbCheckAvailableResponse.getAvailable,
+          pbCheckAvailableResponse.getReason)
 
       case REPORT_WORKER_FAILURE =>
         val pbReportWorkerUnavailable = 
PbReportWorkerUnavailable.parseFrom(message.getPayload)
diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala 
b/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
index d2280f8d..6b3baa92 100644
--- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
@@ -39,47 +39,61 @@ case class Quota(
 
   def checkQuotaSpaceAvailable(
       userIdentifier: UserIdentifier,
-      resourceResumption: ResourceConsumption): Boolean = {
-    val exceed =
-      checkDiskBytesWritten(userIdentifier, 
resourceResumption.diskBytesWritten) ||
-        checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount) ||
-        checkHdfsBytesWritten(userIdentifier, 
resourceResumption.hdfsBytesWritten) ||
-        checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount)
-    !exceed
+      resourceResumption: ResourceConsumption): (Boolean, String) = {
+    val checkResults = Seq(
+      checkDiskBytesWritten(userIdentifier, 
resourceResumption.diskBytesWritten),
+      checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount),
+      checkHdfsBytesWritten(userIdentifier, 
resourceResumption.hdfsBytesWritten),
+      checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount))
+    val exceed = checkResults.foldLeft(false)(_ || _._1)
+    val reason = checkResults.foldLeft("")(_ + _._2)
+    (!exceed, reason)
   }
 
-  private def checkDiskBytesWritten(userIdentifier: UserIdentifier, value: 
Long): Boolean = {
+  private def checkDiskBytesWritten(
+      userIdentifier: UserIdentifier,
+      value: Long): (Boolean, String) = {
     val exceed = (diskBytesWritten > 0 && value >= diskBytesWritten)
+    var reason = ""
     if (exceed) {
-      logWarning(s"User $userIdentifier quota exceed diskBytesWritten, " +
-        s"${Utils.bytesToString(value)} >= 
${Utils.bytesToString(diskBytesWritten)}")
+      reason = s"User $userIdentifier used diskBytesWritten 
(${Utils.bytesToString(value)}) " +
+        s"exceeds quota (${Utils.bytesToString(diskBytesWritten)}). "
+      logWarning(reason)
     }
-    exceed
+    (exceed, reason)
   }
 
-  private def checkDiskFileCount(userIdentifier: UserIdentifier, value: Long): 
Boolean = {
+  private def checkDiskFileCount(userIdentifier: UserIdentifier, value: Long): 
(Boolean, String) = {
     val exceed = (diskFileCount > 0 && value >= diskFileCount)
+    var reason = ""
     if (exceed) {
-      logWarning(s"User $userIdentifier quota exceed diskFileCount, $value >= 
$diskFileCount")
+      reason = s"User $userIdentifier used diskFileCount($value) exceeds 
quota($diskFileCount). "
+      logWarning(reason)
     }
-    exceed
+    (exceed, reason)
   }
 
-  private def checkHdfsBytesWritten(userIdentifier: UserIdentifier, value: 
Long): Boolean = {
+  private def checkHdfsBytesWritten(
+      userIdentifier: UserIdentifier,
+      value: Long): (Boolean, String) = {
     val exceed = (hdfsBytesWritten > 0 && value >= hdfsBytesWritten)
+    var reason = ""
     if (exceed) {
-      logWarning(s"User $userIdentifier quota exceed hdfsBytesWritten, " +
-        s"${Utils.bytesToString(value)} >= 
${Utils.bytesToString(hdfsBytesWritten)}")
+      reason = s"User $userIdentifier used 
hdfsBytesWritten(${Utils.bytesToString(value)}) " +
+        s"exceeds quota(${Utils.bytesToString(hdfsBytesWritten)}). "
+      logWarning(reason)
     }
-    exceed
+    (exceed, reason)
   }
 
-  private def checkHdfsFileCount(userIdentifier: UserIdentifier, value: Long): 
Boolean = {
+  private def checkHdfsFileCount(userIdentifier: UserIdentifier, value: Long): 
(Boolean, String) = {
     val exceed = (hdfsFileCount > 0 && value >= hdfsFileCount)
+    var reason = ""
     if (exceed) {
-      logWarning(s"User $userIdentifier quota exceed hdfsFileCount, $value >= 
$hdfsFileCount")
+      reason = s"User $userIdentifier used hdfsFileCount($value) exceeds 
quota($hdfsFileCount). "
+      logWarning(reason)
     }
-    exceed
+    (exceed, reason)
   }
 
   override def toString: String = {
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala
new file mode 100644
index 00000000..11666ce2
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.quota
+
+import org.apache.celeborn.common.identity.UserIdentifier
+
+class QuotaSuite extends BaseQuotaManagerSuite {
+
+  val quota1 = Quota(-1, -1, 300, 400)
+  val quota2 = Quota(100, 200, 300, 400)
+  val user1 = UserIdentifier("mock", "mock")
+  val resourceConsumption1 = ResourceConsumption(10, 20, 30, 40)
+  val resourceConsumption2 = ResourceConsumption(10, 20, 600, 800)
+  val resourceConsumption3 = ResourceConsumption(1000, 2000, 3000, 4000)
+
+  test("test check quota return result") {
+    val res1 = quota1.checkQuotaSpaceAvailable(user1, resourceConsumption1)
+    val res2 = quota1.checkQuotaSpaceAvailable(user1, resourceConsumption2)
+    val res3 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption1)
+    val res4 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption2)
+    val res5 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption3)
+
+    val exp1 = (true, "")
+    val exp2 = (
+      false,
+      "User `mock`.`mock` used hdfsBytesWritten(600.0 B) exceeds quota(300.0 
B). " +
+        "User `mock`.`mock` used hdfsFileCount(800) exceeds quota(400). ")
+    val exp3 = (true, "")
+    val exp4 = (
+      false,
+      "User `mock`.`mock` used hdfsBytesWritten(600.0 B) exceeds quota(300.0 
B). " +
+        "User `mock`.`mock` used hdfsFileCount(800) exceeds quota(400). ")
+    val exp5 = (
+      false,
+      "User `mock`.`mock` used diskBytesWritten (1000.0 B) exceeds quota 
(100.0 B). " +
+        "User `mock`.`mock` used diskFileCount(2000) exceeds quota(200). " +
+        "User `mock`.`mock` used hdfsBytesWritten(2.9 KB) exceeds quota(300.0 
B). " +
+        "User `mock`.`mock` used hdfsFileCount(4000) exceeds quota(400). ")
+
+    assert(res1 == exp1)
+    assert(res2 == exp2)
+    assert(res3 == exp3)
+    assert(res4 == exp4)
+    assert(res5 == exp5)
+  }
+}
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a930c98f..62a7f113 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -631,8 +631,9 @@ private[celeborn] class Master(
       workerInfo.userResourceConsumption.asScala.get(userIdentifier)
     }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
     val quota = quotaManager.getQuota(userIdentifier)
-    val isAvailable = quota.checkQuotaSpaceAvailable(userIdentifier, 
userResourceConsumption)
-    context.reply(CheckQuotaResponse(isAvailable))
+    val (isAvailable, reason) =
+      quota.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption)
+    context.reply(CheckQuotaResponse(isAvailable, reason))
   }
 
   private def workersNotBlacklisted(

Reply via email to