This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4625484d2 [CELEBORN-830] Check available workers in 
CelebornShuffleFallbackPolicyRunner
4625484d2 is described below

commit 4625484d2c129e32667ce8426170c0db00222730
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 29 16:56:56 2023 +0800

    [CELEBORN-830] Check available workers in 
CelebornShuffleFallbackPolicyRunner
    
    ### What changes were proposed in this pull request?
    
    `CelebornShuffleFallbackPolicyRunner` could not only check quota, but also 
check whether cluster has available workers. If there is no available workers, 
fallback to external shuffle.
    
    ### Why are the changes needed?
    
    `CelebornShuffleFallbackPolicyRunner` adds a check for available workers.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `SparkShuffleManagerSuite#testClusterNotAvailableWithAvailableWorkers`
    
    Closes #1814 from SteNicholas/CELEBORN-830.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../CelebornShuffleFallbackPolicyRunner.scala      | 25 +++++++++++++++++++---
 .../CelebornShuffleFallbackPolicyRunner.scala      | 15 ++++++++++++-
 .../apache/celeborn/client/LifecycleManager.scala  | 13 +++++++++++
 common/src/main/proto/TransportMessages.proto      |  9 ++++++++
 .../common/protocol/message/ControlMessages.scala  | 25 ++++++++++++++++++++++
 .../celeborn/service/deploy/master/Master.scala    |  7 ++++++
 6 files changed, 90 insertions(+), 4 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 904252e13..6248e08ff 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,14 +25,20 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
     applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
-    !checkQuota(lifecycleManager)
+    !checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
   }
 
   /**
    * if celeborn.shuffle.forceFallback.enabled is true, fallback to external 
shuffle
    * @return return celeborn.shuffle.forceFallback.enabled
    */
-  def applyForceFallbackPolicy(): Boolean = conf.shuffleForceFallbackEnabled
+  def applyForceFallbackPolicy(): Boolean = {
+    if (conf.shuffleForceFallbackEnabled) {
+      val conf = CelebornConf.SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED
+      logWarning(s"${conf.alternatives.foldLeft(conf.key)((x, y) => s"$x or 
$y")} is enabled, which will force fallback.")
+    }
+    conf.shuffleForceFallbackEnabled
+  }
 
   /**
    * if shuffle partitions > 
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external 
shuffle
@@ -43,7 +49,7 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) 
extends Logging {
     val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
     val needFallback = numPartitions >= confNumPartitions
     if (needFallback) {
-      logInfo(s"Shuffle num of partitions: $numPartitions" +
+      logWarning(s"Shuffle num of partitions: $numPartitions" +
         s" is bigger than the limit: $confNumPartitions," +
         s" need fallback to spark shuffle")
     }
@@ -67,4 +73,17 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
     }
     resp.isAvailable
   }
+
+  /**
+   * If celeborn cluster has no available workers, fallback to external 
shuffle.
+   *
+   * @return if celeborn cluster has available workers.
+   */
+  def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    val resp = lifecycleManager.checkWorkersAvailable()
+    if (!resp.getAvailable) {
+      logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
+    }
+    resp.getAvailable
+  }
 }
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 9158e9469..6248e08ff 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,7 +25,7 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) 
extends Logging {
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
     applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
-    !checkQuota(lifecycleManager)
+    !checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
   }
 
   /**
@@ -73,4 +73,17 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
     }
     resp.isAvailable
   }
+
+  /**
+   * If celeborn cluster has no available workers, fallback to external 
shuffle.
+   *
+   * @return if celeborn cluster has available workers.
+   */
+  def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    val resp = lifecycleManager.checkWorkersAvailable()
+    if (!resp.getAvailable) {
+      logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
+    }
+    resp.getAvailable
+  }
 }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 85748f4f8..33c56bee4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1090,6 +1090,19 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
+  def checkWorkersAvailable(): PbCheckWorkersAvailableResponse = {
+    try {
+      masterClient.askSync[PbCheckWorkersAvailableResponse](
+        CheckWorkersAvailable(),
+        classOf[PbCheckWorkersAvailableResponse])
+    } catch {
+      case e: Exception =>
+        val msg = s"AskSync Cluster check workers available for 
$userIdentifier failed."
+        logError(msg, e)
+        CheckWorkersAvailableResponse(false)
+    }
+  }
+
   // Once a partition is released, it will be never needed anymore
   def releasePartition(shuffleId: Int, partitionId: Int): Unit = {
     commitManager.releasePartitionResource(shuffleId, partitionId)
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 74b043598..b303e6482 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -73,6 +73,8 @@ enum MessageType {
   CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
   OPEN_STREAM = 51;
   STREAM_HANDLER = 52;
+  CHECK_WORKERS_AVAILABLE = 53;
+  CHECK_WORKERS_AVAILABLE_RESPONSE = 54;
 }
 
 message PbStorageInfo {
@@ -319,6 +321,13 @@ message PbCheckQuotaResponse {
   string reason = 2;
 }
 
+message PbCheckWorkersAvailable {
+}
+
+message PbCheckWorkersAvailableResponse {
+  bool available = 1;
+}
+
 message PbReportWorkerUnavailable {
   repeated PbWorkerInfo unavailable = 1;
   string requestId = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 18dc2675d..0c8f2abf9 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -347,6 +347,19 @@ object ControlMessages extends Logging {
       unavailable: util.List[WorkerInfo],
       override var requestId: String = ZERO_UUID) extends MasterRequestMessage
 
+  object CheckWorkersAvailable {
+    def apply(): PbCheckWorkersAvailable = {
+      PbCheckWorkersAvailable.newBuilder().build()
+    }
+  }
+
+  object CheckWorkersAvailableResponse {
+    def apply(isAvailable: Boolean): PbCheckWorkersAvailableResponse =
+      PbCheckWorkersAvailableResponse.newBuilder()
+        .setAvailable(isAvailable)
+        .build()
+  }
+
   /**
    * ==========================================
    *         handled by worker
@@ -765,6 +778,12 @@ object ControlMessages extends Logging {
 
     case OneWayMessageResponse =>
       new TransportMessage(MessageType.ONE_WAY_MESSAGE_RESPONSE, null)
+
+    case pb: PbCheckWorkersAvailable =>
+      new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE, pb.toByteArray)
+
+    case pb: PbCheckWorkersAvailableResponse =>
+      new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE, 
pb.toByteArray)
   }
 
   // TODO change return type to GeneratedMessageV3
@@ -1070,6 +1089,12 @@ object ControlMessages extends Logging {
       case STAGE_END_RESPONSE_VALUE =>
         val pbStageEndResponse = 
PbStageEndResponse.parseFrom(message.getPayload)
         StageEndResponse(Utils.toStatusCode(pbStageEndResponse.getStatus))
+
+      case CHECK_WORKERS_AVAILABLE_VALUE =>
+        PbCheckWorkersAvailable.parseFrom(message.getPayload)
+
+      case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
+        PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)
     }
   }
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index c3dd27125..39e030e98 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -373,6 +373,9 @@ private[celeborn] class Master(
 
     case CheckQuota(userIdentifier) =>
       executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, 
context))
+
+    case _: PbCheckWorkersAvailable =>
+      executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))
   }
 
   private def timeoutDeadWorkers() {
@@ -768,6 +771,10 @@ private[celeborn] class Master(
     context.reply(CheckQuotaResponse(isAvailable, reason))
   }
 
+  private def handleCheckWorkersAvailable(context: RpcCallContext): Unit = {
+    context.reply(CheckWorkersAvailableResponse(!workersAvailable().isEmpty))
+  }
+
   private def workersAvailable(
       tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): 
util.List[WorkerInfo] = {
     workersSnapShot.asScala.filter { w =>

Reply via email to