This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ddab27a1 [CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse
(#1093)
ddab27a1 is described below
commit ddab27a1d777f7758fb59f95f5e4797c4617eb35
Author: nafiy <[email protected]>
AuthorDate: Thu Dec 15 18:16:34 2022 +0800
[CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse (#1093)
* [CELEBORN-145][REFACTOR] Add reason in CheckQuotaResponse
---
.../celeborn/RssShuffleFallbackPolicyRunner.scala | 11 ++--
.../celeborn/RssShuffleFallbackPolicyRunner.scala | 11 ++--
.../apache/celeborn/client/LifecycleManager.scala | 9 ++--
common/src/main/proto/TransportMessages.proto | 1 +
.../common/protocol/message/ControlMessages.scala | 9 ++--
.../org/apache/celeborn/common/quota/Quota.scala | 56 ++++++++++++--------
.../apache/celeborn/common/quota/QuotaSuite.scala | 61 ++++++++++++++++++++++
.../celeborn/service/deploy/master/Master.scala | 5 +-
8 files changed, 123 insertions(+), 40 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
index 05e90ac1..107591bd 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
@@ -53,17 +53,18 @@ class RssShuffleFallbackPolicyRunner(conf: CelebornConf)
extends Logging {
/**
* If rss cluster is exceed current user's quota, fallback to external
shuffle
*
- * @return if rss cluster usage of current user's percent is overhead the
limit
+ * @return if rss cluster have available space for current user
*/
def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
if (!conf.quotaEnabled) {
return true
}
- val available = lifecycleManager.checkQuota()
- if (!available) {
- logWarning(s"Quota exceed for current user
${lifecycleManager.getUserIdentifier}.")
+ val resp = lifecycleManager.checkQuota()
+ if (!resp.isAvailable) {
+ logWarning(
+ s"Quota exceed for current user ${lifecycleManager.getUserIdentifier}.
Because: ${resp.reason}")
}
- available
+ resp.isAvailable
}
}
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
index b1b33524..a52f0b37 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleFallbackPolicyRunner.scala
@@ -59,17 +59,18 @@ class RssShuffleFallbackPolicyRunner(conf: CelebornConf)
extends Logging {
/**
* If rss cluster is exceed current user's quota, fallback to external
shuffle
*
- * @return if rss cluster have available space for current user.
+ * @return if rss cluster have available space for current user
*/
def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
if (!conf.quotaEnabled) {
return true
}
- val available = lifecycleManager.checkQuota()
- if (!available) {
- logWarning(s"Quota exceed for current user
${lifecycleManager.getUserIdentifier}.")
+ val resp = lifecycleManager.checkQuota()
+ if (!resp.isAvailable) {
+ logWarning(
+ s"Quota exceed for current user ${lifecycleManager.getUserIdentifier}.
Because: ${resp.reason}")
}
- available
+ resp.isAvailable
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 06fbb2a5..d6e39d2c 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1291,15 +1291,16 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
}
}
- def checkQuota(): Boolean = {
+ def checkQuota(): CheckQuotaResponse = {
try {
rssHARetryClient.askSync[CheckQuotaResponse](
CheckQuota(userIdentifier),
- classOf[CheckQuotaResponse]).isAvailable
+ classOf[CheckQuotaResponse])
} catch {
case e: Exception =>
- logError(s"AskSync Cluster check quota for $userIdentifier failed.", e)
- false
+ val msg = s"AskSync Cluster check quota for $userIdentifier failed."
+ logError(msg, e)
+ CheckQuotaResponse(false, msg)
}
}
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index ae4f3816..a19ed793 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -295,6 +295,7 @@ message PbCheckQuota {
message PbCheckQuotaResponse {
bool available = 1;
+ string reason = 2;
}
message PbReportWorkerUnavailable {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index d8255005..a786807b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -327,7 +327,7 @@ object ControlMessages extends Logging {
case class CheckQuota(userIdentifier: UserIdentifier) extends Message
- case class CheckQuotaResponse(isAvailable: Boolean) extends Message
+ case class CheckQuotaResponse(isAvailable: Boolean, reason: String) extends
Message
case class ReportWorkerUnavailable(
unavailable: util.List[WorkerInfo],
@@ -634,9 +634,10 @@ object ControlMessages extends Logging {
MessageType.CHECK_QUOTA,
builder.build().toByteArray)
- case CheckQuotaResponse(available) =>
+ case CheckQuotaResponse(available, reason) =>
val payload = PbCheckQuotaResponse.newBuilder()
.setAvailable(available)
+ .setReason(reason)
.build().toByteArray
new TransportMessage(MessageType.CHECK_QUOTA_RESPONSE, payload)
@@ -950,7 +951,9 @@ object ControlMessages extends Logging {
case CHECK_QUOTA_RESPONSE =>
val pbCheckAvailableResponse = PbCheckQuotaResponse
.parseFrom(message.getPayload)
- CheckQuotaResponse(pbCheckAvailableResponse.getAvailable)
+ CheckQuotaResponse(
+ pbCheckAvailableResponse.getAvailable,
+ pbCheckAvailableResponse.getReason)
case REPORT_WORKER_FAILURE =>
val pbReportWorkerUnavailable =
PbReportWorkerUnavailable.parseFrom(message.getPayload)
diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
b/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
index d2280f8d..6b3baa92 100644
--- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala
@@ -39,47 +39,61 @@ case class Quota(
def checkQuotaSpaceAvailable(
userIdentifier: UserIdentifier,
- resourceResumption: ResourceConsumption): Boolean = {
- val exceed =
- checkDiskBytesWritten(userIdentifier,
resourceResumption.diskBytesWritten) ||
- checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount) ||
- checkHdfsBytesWritten(userIdentifier,
resourceResumption.hdfsBytesWritten) ||
- checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount)
- !exceed
+ resourceResumption: ResourceConsumption): (Boolean, String) = {
+ val checkResults = Seq(
+ checkDiskBytesWritten(userIdentifier,
resourceResumption.diskBytesWritten),
+ checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount),
+ checkHdfsBytesWritten(userIdentifier,
resourceResumption.hdfsBytesWritten),
+ checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount))
+ val exceed = checkResults.foldLeft(false)(_ || _._1)
+ val reason = checkResults.foldLeft("")(_ + _._2)
+ (!exceed, reason)
}
- private def checkDiskBytesWritten(userIdentifier: UserIdentifier, value:
Long): Boolean = {
+ private def checkDiskBytesWritten(
+ userIdentifier: UserIdentifier,
+ value: Long): (Boolean, String) = {
val exceed = (diskBytesWritten > 0 && value >= diskBytesWritten)
+ var reason = ""
if (exceed) {
- logWarning(s"User $userIdentifier quota exceed diskBytesWritten, " +
- s"${Utils.bytesToString(value)} >=
${Utils.bytesToString(diskBytesWritten)}")
+ reason = s"User $userIdentifier used diskBytesWritten
(${Utils.bytesToString(value)}) " +
+ s"exceeds quota (${Utils.bytesToString(diskBytesWritten)}). "
+ logWarning(reason)
}
- exceed
+ (exceed, reason)
}
- private def checkDiskFileCount(userIdentifier: UserIdentifier, value: Long):
Boolean = {
+ private def checkDiskFileCount(userIdentifier: UserIdentifier, value: Long):
(Boolean, String) = {
val exceed = (diskFileCount > 0 && value >= diskFileCount)
+ var reason = ""
if (exceed) {
- logWarning(s"User $userIdentifier quota exceed diskFileCount, $value >=
$diskFileCount")
+ reason = s"User $userIdentifier used diskFileCount($value) exceeds
quota($diskFileCount). "
+ logWarning(reason)
}
- exceed
+ (exceed, reason)
}
- private def checkHdfsBytesWritten(userIdentifier: UserIdentifier, value:
Long): Boolean = {
+ private def checkHdfsBytesWritten(
+ userIdentifier: UserIdentifier,
+ value: Long): (Boolean, String) = {
val exceed = (hdfsBytesWritten > 0 && value >= hdfsBytesWritten)
+ var reason = ""
if (exceed) {
- logWarning(s"User $userIdentifier quota exceed hdfsBytesWritten, " +
- s"${Utils.bytesToString(value)} >=
${Utils.bytesToString(hdfsBytesWritten)}")
+ reason = s"User $userIdentifier used
hdfsBytesWritten(${Utils.bytesToString(value)}) " +
+ s"exceeds quota(${Utils.bytesToString(hdfsBytesWritten)}). "
+ logWarning(reason)
}
- exceed
+ (exceed, reason)
}
- private def checkHdfsFileCount(userIdentifier: UserIdentifier, value: Long):
Boolean = {
+ private def checkHdfsFileCount(userIdentifier: UserIdentifier, value: Long):
(Boolean, String) = {
val exceed = (hdfsFileCount > 0 && value >= hdfsFileCount)
+ var reason = ""
if (exceed) {
- logWarning(s"User $userIdentifier quota exceed hdfsFileCount, $value >=
$hdfsFileCount")
+ reason = s"User $userIdentifier used hdfsFileCount($value) exceeds
quota($hdfsFileCount). "
+ logWarning(reason)
}
- exceed
+ (exceed, reason)
}
override def toString: String = {
diff --git
a/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala
new file mode 100644
index 00000000..11666ce2
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/quota/QuotaSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.quota
+
+import org.apache.celeborn.common.identity.UserIdentifier
+
+class QuotaSuite extends BaseQuotaManagerSuite {
+
+ val quota1 = Quota(-1, -1, 300, 400)
+ val quota2 = Quota(100, 200, 300, 400)
+ val user1 = UserIdentifier("mock", "mock")
+ val resourceConsumption1 = ResourceConsumption(10, 20, 30, 40)
+ val resourceConsumption2 = ResourceConsumption(10, 20, 600, 800)
+ val resourceConsumption3 = ResourceConsumption(1000, 2000, 3000, 4000)
+
+ test("test check quota return result") {
+ val res1 = quota1.checkQuotaSpaceAvailable(user1, resourceConsumption1)
+ val res2 = quota1.checkQuotaSpaceAvailable(user1, resourceConsumption2)
+ val res3 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption1)
+ val res4 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption2)
+ val res5 = quota2.checkQuotaSpaceAvailable(user1, resourceConsumption3)
+
+ val exp1 = (true, "")
+ val exp2 = (
+ false,
+ "User `mock`.`mock` used hdfsBytesWritten(600.0 B) exceeds quota(300.0
B). " +
+ "User `mock`.`mock` used hdfsFileCount(800) exceeds quota(400). ")
+ val exp3 = (true, "")
+ val exp4 = (
+ false,
+ "User `mock`.`mock` used hdfsBytesWritten(600.0 B) exceeds quota(300.0
B). " +
+ "User `mock`.`mock` used hdfsFileCount(800) exceeds quota(400). ")
+ val exp5 = (
+ false,
+ "User `mock`.`mock` used diskBytesWritten (1000.0 B) exceeds quota
(100.0 B). " +
+ "User `mock`.`mock` used diskFileCount(2000) exceeds quota(200). " +
+ "User `mock`.`mock` used hdfsBytesWritten(2.9 KB) exceeds quota(300.0
B). " +
+ "User `mock`.`mock` used hdfsFileCount(4000) exceeds quota(400). ")
+
+ assert(res1 == exp1)
+ assert(res2 == exp2)
+ assert(res3 == exp3)
+ assert(res4 == exp4)
+ assert(res5 == exp5)
+ }
+}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a930c98f..62a7f113 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -631,8 +631,9 @@ private[celeborn] class Master(
workerInfo.userResourceConsumption.asScala.get(userIdentifier)
}.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
val quota = quotaManager.getQuota(userIdentifier)
- val isAvailable = quota.checkQuotaSpaceAvailable(userIdentifier,
userResourceConsumption)
- context.reply(CheckQuotaResponse(isAvailable))
+ val (isAvailable, reason) =
+ quota.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption)
+ context.reply(CheckQuotaResponse(isAvailable, reason))
}
private def workersNotBlacklisted(