pan3793 commented on code in PR #2494:
URL: https://github.com/apache/celeborn/pull/2494#discussion_r1600993486


##########
client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala:
##########
@@ -25,47 +25,51 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
 
 class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
   private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+  private val checkWorkerEnabled = conf.checkWorkerEnabled
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
     val needFallback =
       applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
         !checkQuota(lifecycleManager) || 
!checkWorkersAvailable(lifecycleManager)
     if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
-      throw new CelebornIOException("Fallback to Spark's default shuffle is 
prohibited.")
+      throw new CelebornIOException(
+        "Fallback to spark built-in shuffle implementation is prohibited.")
     }
     needFallback
   }
 
   /**
-   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
external shuffle
-   * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
spark built-in shuffle implementation
+   * @return return true if celeborn.client.spark.shuffle.fallback.policy is 
ALWAYS, otherwise false
    */
   def applyForceFallbackPolicy(): Boolean = {
     if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
       logWarning(
-        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, " +
+          s"forcibly fallback to spark built-in shuffle implementation.")
     }
     FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
   }
 
   /**
-   * if shuffle partitions > 
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external 
shuffle
+   * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold, 
fallback to spark built-in
+   * shuffle implementation
    * @param numPartitions shuffle partitions
-   * @return return if shuffle partitions bigger than limit
+   * @return return true if shuffle partitions bigger than limit, otherwise 
false
    */
   def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
-    val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
+    val confNumPartitions = conf.shuffleFallbackPartitionThreshold

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to