This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6ad02f14a [CELEBORN-1577][PHASE1] Storage quota should support
interrupt shuffle
6ad02f14a is described below
commit 6ad02f14a9e71fcd036dd077138a0236374a6154
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Oct 30 16:28:09 2024 +0800
[CELEBORN-1577][PHASE1] Storage quota should support interrupt shuffle
### What changes were proposed in this pull request?
Support interrupt shuffle on client side.
I will develop the following functions in order
1. Client supports interrupt shuffle
2. Master supports calculating app-level shuffle usage
### Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot
limit the writing of existing shuffles. In our production environment, there is
such an scenario: the cluster is small, but the user's app single shuffle is
large which occupied disk resources, we want to interrupt those shuffle.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unable to test this part independently, Additional tests will be added
after completing the second part.
Closes #2801 from leixm/CELEBORN-1577-1.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../shuffle/celeborn/SparkShuffleManager.java | 1 +
.../apache/spark/shuffle/celeborn/SparkUtils.java | 24 ++++++++++++++++++++++
.../shuffle/celeborn/SparkShuffleManager.java | 1 +
.../apache/spark/shuffle/celeborn/SparkUtils.java | 23 +++++++++++++++++++++
.../celeborn/client/ApplicationHeartbeater.scala | 15 +++++++++++---
.../apache/celeborn/client/LifecycleManager.scala | 24 +++++++++++++++++++---
.../celeborn/client/WorkerStatusTrackerSuite.scala | 3 ++-
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 9 ++++++++
.../common/protocol/message/ControlMessages.scala | 13 +++++++++---
docs/configuration/client.md | 1 +
docs/configuration/quota.md | 1 +
.../celeborn/service/deploy/master/Master.scala | 3 ++-
13 files changed, 108 insertions(+), 11 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 52f383905..8c25dc5c2 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -97,6 +97,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4f38c9815..3f2e47097 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -22,14 +22,19 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.LongAdder;
+import scala.Option;
+import scala.Some;
import scala.Tuple2;
import org.apache.spark.BarrierTaskContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
+import org.apache.spark.SparkContext$;
import org.apache.spark.TaskContext;
+import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.scheduler.ShuffleMapStage;
import org.apache.spark.sql.execution.UnsafeRowSerializer;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.storage.BlockManagerId;
@@ -39,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.util.Utils;
+import org.apache.celeborn.reflect.DynFields;
public class SparkUtils {
private static final Logger logger =
LoggerFactory.getLogger(SparkUtils.class);
@@ -179,4 +185,22 @@ public class SparkUtils {
shuffleClient.reportBarrierTaskFailure(appShuffleId,
appShuffleIdentifier);
});
}
+
+ private static final DynFields.UnboundField shuffleIdToMapStage_FIELD =
+ DynFields.builder().hiddenImpl(DAGScheduler.class,
"shuffleIdToMapStage").build();
+
+ public static void cancelShuffle(int shuffleId, String reason) {
+ if (SparkContext$.MODULE$.getActive().nonEmpty()) {
+ DAGScheduler scheduler =
SparkContext$.MODULE$.getActive().get().dagScheduler();
+ scala.collection.mutable.Map<Integer, ShuffleMapStage>
shuffleIdToMapStageValue =
+ (scala.collection.mutable.Map<Integer, ShuffleMapStage>)
+ shuffleIdToMapStage_FIELD.bind(scheduler).get();
+ Option<ShuffleMapStage> shuffleMapStage =
shuffleIdToMapStageValue.get(shuffleId);
+ if (shuffleMapStage.nonEmpty()) {
+ scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason));
+ }
+ } else {
+ logger.error("Can not get active SparkContext, skip cancelShuffle.");
+ }
+ }
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8e04126a1..fdb13102d 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -139,6 +139,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 47317474e..d8a237bc4 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -19,15 +19,20 @@ package org.apache.spark.shuffle.celeborn;
import java.util.concurrent.atomic.LongAdder;
+import scala.Option;
+import scala.Some;
import scala.Tuple2;
import org.apache.spark.BarrierTaskContext;
import org.apache.spark.MapOutputTrackerMaster;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
+import org.apache.spark.SparkContext$;
import org.apache.spark.TaskContext;
+import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.scheduler.ShuffleMapStage;
import org.apache.spark.shuffle.ShuffleHandle;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.shuffle.ShuffleReader;
@@ -266,6 +271,9 @@ public class SparkUtils {
.orNoop()
.build();
+ private static final DynFields.UnboundField shuffleIdToMapStage_FIELD =
+ DynFields.builder().hiddenImpl(DAGScheduler.class,
"shuffleIdToMapStage").build();
+
public static void unregisterAllMapOutput(
MapOutputTrackerMaster mapOutputTracker, int shuffleId) {
if (!UnregisterAllMapAndMergeOutput_METHOD.isNoop()) {
@@ -296,4 +304,19 @@ public class SparkUtils {
shuffleClient.reportBarrierTaskFailure(appShuffleId,
appShuffleIdentifier);
});
}
+
+ public static void cancelShuffle(int shuffleId, String reason) {
+ if (SparkContext$.MODULE$.getActive().nonEmpty()) {
+ DAGScheduler scheduler =
SparkContext$.MODULE$.getActive().get().dagScheduler();
+ scala.collection.mutable.Map<Integer, ShuffleMapStage>
shuffleIdToMapStageValue =
+ (scala.collection.mutable.Map<Integer, ShuffleMapStage>)
+ shuffleIdToMapStage_FIELD.bind(scheduler).get();
+ Option<ShuffleMapStage> shuffleMapStage =
shuffleIdToMapStageValue.get(shuffleId);
+ if (shuffleMapStage.nonEmpty()) {
+ scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason));
+ }
+ } else {
+ LOG.error("Can not get active SparkContext, skip cancelShuffle.");
+ }
+ }
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index 4b90a8bca..b73582745 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -29,7 +29,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse
-import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
ApplicationLostResponse, HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
ApplicationLostResponse, CheckQuotaResponse, HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
@@ -39,7 +39,8 @@ class ApplicationHeartbeater(
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker,
- registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean])
extends Logging {
+ registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
+ cancelAllActiveStages: String => Unit) extends Logging {
private var stopped = false
private val reviseLostShuffles = conf.reviseLostShufflesEnabled
@@ -77,6 +78,7 @@ class ApplicationHeartbeater(
if (response.statusCode == StatusCode.SUCCESS) {
logDebug("Successfully send app heartbeat.")
workerStatusTracker.handleHeartbeatResponse(response)
+ checkQuotaExceeds(response.checkQuotaResponse)
// revise shuffle id if there are lost shuffles
if (reviseLostShuffles) {
val masterRecordedShuffleIds = response.registeredShuffles
@@ -132,7 +134,8 @@ class ApplicationHeartbeater(
List.empty.asJava,
List.empty.asJava,
List.empty.asJava,
- List.empty.asJava)
+ List.empty.asJava,
+ CheckQuotaResponse(isAvailable = true, ""))
}
}
@@ -149,6 +152,12 @@ class ApplicationHeartbeater(
}
}
+ private def checkQuotaExceeds(response: CheckQuotaResponse): Unit = {
+ if (conf.quotaInterruptShuffleEnabled && !response.isAvailable) {
+ cancelAllActiveStages(response.reason)
+ }
+ }
+
def stop(): Unit = {
stopped.synchronized {
if (!stopped) {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 38b9f28ba..92ccc66e2 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -22,9 +22,9 @@ import java.nio.ByteBuffer
import java.security.SecureRandom
import java.util
import java.util.{function, List => JList}
-import java.util.concurrent.{Callable, ConcurrentHashMap, LinkedBlockingQueue,
ScheduledFuture, TimeUnit}
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
-import java.util.function.Consumer
+import java.util.function.{BiConsumer, Consumer}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
@@ -211,7 +211,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
masterClient,
() => commitManager.commitMetrics(),
workerStatusTracker,
- registeredShuffle)
+ registeredShuffle,
+ reason => cancelAllActiveStages(reason))
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)
@@ -1763,6 +1764,11 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
appShuffleDeterminateMap.put(appShuffleId, determinate)
}
+ @volatile private var cancelShuffleCallback: Option[BiConsumer[Integer,
String]] = None
+ def registerCancelShuffleCallback(callback: BiConsumer[Integer, String]):
Unit = {
+ cancelShuffleCallback = Some(callback)
+ }
+
// Initialize at the end of LifecycleManager construction.
initialize()
@@ -1781,4 +1787,16 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
rnd.nextBytes(secretBytes)
JavaUtils.bytesToString(ByteBuffer.wrap(secretBytes))
}
+
+ def cancelAllActiveStages(reason: String): Unit = cancelShuffleCallback
match {
+ case Some(c) =>
+ shuffleAllocatedWorkers
+ .asScala
+ .keys
+ .filter(!commitManager.isStageEnd(_))
+ .foreach(c.accept(_, reason))
+
+ case _ =>
+ }
+
}
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index 27196e8d9..66219f957 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -197,7 +197,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
unknownWorkers,
shuttingWorkers,
availableWorkers,
- new util.ArrayList[Integer]())
+ new util.ArrayList[Integer](),
+ null)
}
private def mockWorkers(workerHosts: Array[String]):
util.ArrayList[WorkerInfo] = {
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 245b78dc0..3ed029676 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -452,6 +452,7 @@ message PbHeartbeatFromApplicationResponse {
repeated PbWorkerInfo shuttingWorkers = 4;
repeated int32 registeredShuffles = 5;
repeated PbWorkerInfo availableWorkers = 6;
+ PbCheckQuotaResponse checkQuotaResponse = 7;
}
message PbCheckQuota {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 97befab96..8253ec298 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -886,6 +886,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
def quotaUserSpecificTenant: String = get(QUOTA_USER_SPECIFIC_TENANT)
def quotaUserSpecificUserName: String = get(QUOTA_USER_SPECIFIC_USERNAME)
+ def quotaInterruptShuffleEnabled: Boolean =
get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)
// //////////////////////////////////////////////////////
// Client //
@@ -5333,6 +5334,14 @@ object CelebornConf extends Logging {
.longConf
.createWithDefault(Long.MaxValue)
+ val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.quota.interruptShuffle.enabled")
+ .categories("quota", "client")
+ .version("0.6.0")
+ .doc("Whether to enable interrupt shuffle when quota exceeds.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.columnarShuffle.enabled")
.withAlternative("celeborn.columnar.shuffle.enabled")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index e5ea22fd4..57086c035 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -423,7 +423,8 @@ object ControlMessages extends Logging {
unknownWorkers: util.List[WorkerInfo],
shuttingWorkers: util.List[WorkerInfo],
availableWorkers: util.List[WorkerInfo],
- registeredShuffles: util.List[Integer]) extends Message
+ registeredShuffles: util.List[Integer],
+ checkQuotaResponse: CheckQuotaResponse) extends Message
case class CheckQuota(userIdentifier: UserIdentifier) extends Message
@@ -832,7 +833,10 @@ object ControlMessages extends Logging {
unknownWorkers,
shuttingWorkers,
availableWorkers,
- registeredShuffles) =>
+ registeredShuffles,
+ checkQuotaResponse) =>
+ val pbCheckQuotaResponse =
PbCheckQuotaResponse.newBuilder().setAvailable(
+ checkQuotaResponse.isAvailable).setReason(checkQuotaResponse.reason)
val payload = PbHeartbeatFromApplicationResponse.newBuilder()
.setStatus(statusCode.getValue)
.addAllExcludedWorkers(
@@ -844,6 +848,7 @@ object ControlMessages extends Logging {
.addAllAvailableWorkers(
availableWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true,
true)).toList.asJava)
.addAllRegisteredShuffles(registeredShuffles)
+ .setCheckQuotaResponse(pbCheckQuotaResponse)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE,
payload)
@@ -1221,6 +1226,7 @@ object ControlMessages extends Logging {
case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE =>
val pbHeartbeatFromApplicationResponse =
PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload)
+ val pbCheckQuotaResponse =
pbHeartbeatFromApplicationResponse.getCheckQuotaResponse
HeartbeatFromApplicationResponse(
Utils.toStatusCode(pbHeartbeatFromApplicationResponse.getStatus),
pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala
@@ -1231,7 +1237,8 @@ object ControlMessages extends Logging {
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getAvailableWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
- pbHeartbeatFromApplicationResponse.getRegisteredShufflesList)
+ pbHeartbeatFromApplicationResponse.getRegisteredShufflesList,
+ CheckQuotaResponse(pbCheckQuotaResponse.getAvailable,
pbCheckQuotaResponse.getReason))
case CHECK_QUOTA_VALUE =>
val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 492bd8133..7dad000b0 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -126,6 +126,7 @@ license: |
| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0 [...]
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
+| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable
interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.
| 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md
index 4e79050e1..da3ff1bb3 100644
--- a/docs/configuration/quota.md
+++ b/docs/configuration/quota.md
@@ -23,6 +23,7 @@ license: |
| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0 [...]
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
+| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable
interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota
dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota
dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota
dynamic configuration for written hdfs bytes. | 0.5.0 | |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index bec47890e..368eb5c66 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1145,7 +1145,8 @@ private[celeborn] class Master(
new util.ArrayList[WorkerInfo](
(statusSystem.shutdownWorkers.asScala ++
statusSystem.decommissionWorkers.asScala).asJava),
availableWorksSentToClient,
- new util.ArrayList(appRelatedShuffles)))
+ new util.ArrayList(appRelatedShuffles),
+ CheckQuotaResponse(isAvailable = true, "")))
} else {
context.reply(OneWayMessageResponse)
}