This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d39aab2e4 [CELEBORN-830] Check available workers in
CelebornShuffleFallbackPolicyRunner
d39aab2e4 is described below
commit d39aab2e4950e87980e39e2d5ee759765bb8b2af
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 29 16:56:56 2023 +0800
[CELEBORN-830] Check available workers in
CelebornShuffleFallbackPolicyRunner
### What changes were proposed in this pull request?
`CelebornShuffleFallbackPolicyRunner` could not only check quota, but also
check whether cluster has available workers. If there is no available workers,
fallback to external shuffle.
### Why are the changes needed?
`CelebornShuffleFallbackPolicyRunner` adds a check for available workers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `SparkShuffleManagerSuite#testClusterNotAvailableWithAvailableWorkers`
Closes #1814 from SteNicholas/CELEBORN-830.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../CelebornShuffleFallbackPolicyRunner.scala | 25 +++++++++++++++++++---
.../CelebornShuffleFallbackPolicyRunner.scala | 15 ++++++++++++-
.../apache/celeborn/client/LifecycleManager.scala | 13 +++++++++++
common/src/main/proto/TransportMessages.proto | 9 ++++++++
.../common/protocol/message/ControlMessages.scala | 25 ++++++++++++++++++++++
.../celeborn/service/deploy/master/Master.scala | 7 ++++++
6 files changed, 90 insertions(+), 4 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 904252e13..6248e08ff 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,14 +25,20 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
def applyAllFallbackPolicy(lifecycleManager: LifecycleManager,
numPartitions: Int): Boolean = {
applyForceFallbackPolicy() ||
applyShufflePartitionsFallbackPolicy(numPartitions) ||
- !checkQuota(lifecycleManager)
+ !checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
}
/**
* if celeborn.shuffle.forceFallback.enabled is true, fallback to external
shuffle
* @return return celeborn.shuffle.forceFallback.enabled
*/
- def applyForceFallbackPolicy(): Boolean = conf.shuffleForceFallbackEnabled
+ def applyForceFallbackPolicy(): Boolean = {
+ if (conf.shuffleForceFallbackEnabled) {
+ val conf = CelebornConf.SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED
+ logWarning(s"${conf.alternatives.foldLeft(conf.key)((x, y) => s"$x or
$y")} is enabled, which will force fallback.")
+ }
+ conf.shuffleForceFallbackEnabled
+ }
/**
* if shuffle partitions >
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external
shuffle
@@ -43,7 +49,7 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf)
extends Logging {
val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
val needFallback = numPartitions >= confNumPartitions
if (needFallback) {
- logInfo(s"Shuffle num of partitions: $numPartitions" +
+ logWarning(s"Shuffle num of partitions: $numPartitions" +
s" is bigger than the limit: $confNumPartitions," +
s" need fallback to spark shuffle")
}
@@ -67,4 +73,17 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
}
resp.isAvailable
}
+
+ /**
+ * If celeborn cluster has no available workers, fallback to external
shuffle.
+ *
+ * @return if celeborn cluster has available workers.
+ */
+ def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+ val resp = lifecycleManager.checkWorkersAvailable()
+ if (!resp.getAvailable) {
+ logWarning(s"No workers available for current user
${lifecycleManager.getUserIdentifier}.")
+ }
+ resp.getAvailable
+ }
}
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 9158e9469..6248e08ff 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,7 +25,7 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf)
extends Logging {
def applyAllFallbackPolicy(lifecycleManager: LifecycleManager,
numPartitions: Int): Boolean = {
applyForceFallbackPolicy() ||
applyShufflePartitionsFallbackPolicy(numPartitions) ||
- !checkQuota(lifecycleManager)
+ !checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
}
/**
@@ -73,4 +73,17 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
}
resp.isAvailable
}
+
+ /**
+ * If celeborn cluster has no available workers, fallback to external
shuffle.
+ *
+ * @return if celeborn cluster has available workers.
+ */
+ def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+ val resp = lifecycleManager.checkWorkersAvailable()
+ if (!resp.getAvailable) {
+ logWarning(s"No workers available for current user
${lifecycleManager.getUserIdentifier}.")
+ }
+ resp.getAvailable
+ }
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 85748f4f8..33c56bee4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1090,6 +1090,19 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
+ def checkWorkersAvailable(): PbCheckWorkersAvailableResponse = {
+ try {
+ masterClient.askSync[PbCheckWorkersAvailableResponse](
+ CheckWorkersAvailable(),
+ classOf[PbCheckWorkersAvailableResponse])
+ } catch {
+ case e: Exception =>
+ val msg = s"AskSync Cluster check workers available for
$userIdentifier failed."
+ logError(msg, e)
+ CheckWorkersAvailableResponse(false)
+ }
+ }
+
// Once a partition is released, it will be never needed anymore
def releasePartition(shuffleId: Int, partitionId: Int): Unit = {
commitManager.releasePartitionResource(shuffleId, partitionId)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 5051f34bf..405c8d100 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -73,6 +73,8 @@ enum MessageType {
CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
OPEN_STREAM = 51;
STREAM_HANDLER = 52;
+ CHECK_WORKERS_AVAILABLE = 53;
+ CHECK_WORKERS_AVAILABLE_RESPONSE = 54;
}
message PbStorageInfo {
@@ -329,6 +331,13 @@ message PbCheckQuotaResponse {
string reason = 2;
}
+message PbCheckWorkersAvailable {
+}
+
+message PbCheckWorkersAvailableResponse {
+ bool available = 1;
+}
+
message PbReportWorkerUnavailable {
repeated PbWorkerInfo unavailable = 1;
string requestId = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index ec8e951f1..725cf899b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -354,6 +354,19 @@ object ControlMessages extends Logging {
unavailable: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
+ object CheckWorkersAvailable {
+ def apply(): PbCheckWorkersAvailable = {
+ PbCheckWorkersAvailable.newBuilder().build()
+ }
+ }
+
+ object CheckWorkersAvailableResponse {
+ def apply(isAvailable: Boolean): PbCheckWorkersAvailableResponse =
+ PbCheckWorkersAvailableResponse.newBuilder()
+ .setAvailable(isAvailable)
+ .build()
+ }
+
/**
* ==========================================
* handled by worker
@@ -791,6 +804,12 @@ object ControlMessages extends Logging {
case OneWayMessageResponse =>
new TransportMessage(MessageType.ONE_WAY_MESSAGE_RESPONSE, null)
+
+ case pb: PbCheckWorkersAvailable =>
+ new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE, pb.toByteArray)
+
+ case pb: PbCheckWorkersAvailableResponse =>
+ new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE,
pb.toByteArray)
}
// TODO change return type to GeneratedMessageV3
@@ -1111,6 +1130,12 @@ object ControlMessages extends Logging {
case STAGE_END_RESPONSE_VALUE =>
val pbStageEndResponse =
PbStageEndResponse.parseFrom(message.getPayload)
StageEndResponse(Utils.toStatusCode(pbStageEndResponse.getStatus))
+
+ case CHECK_WORKERS_AVAILABLE_VALUE =>
+ PbCheckWorkersAvailable.parseFrom(message.getPayload)
+
+ case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
+ PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)
}
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 815b4d708..3bdb2fdbb 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -376,6 +376,9 @@ private[celeborn] class Master(
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier,
context))
+
+ case _: PbCheckWorkersAvailable =>
+ executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))
}
private def timeoutDeadWorkers() {
@@ -780,6 +783,10 @@ private[celeborn] class Master(
context.reply(CheckQuotaResponse(isAvailable, reason))
}
+ private def handleCheckWorkersAvailable(context: RpcCallContext): Unit = {
+ context.reply(CheckWorkersAvailableResponse(!workersAvailable().isEmpty))
+ }
+
private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty):
util.List[WorkerInfo] = {
workersSnapShot.asScala.filter { w =>