SteNicholas commented on code in PR #2444:
URL: https://github.com/apache/celeborn/pull/2444#discussion_r1576085334


##########
client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala:
##########
@@ -19,25 +19,32 @@ package org.apache.spark.shuffle.celeborn
 
 import org.apache.celeborn.client.LifecycleManager
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornIOException
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.FallbackPolicy
 
 class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
-    applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
-    !checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
+    val needFallback =
+      applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
+        !checkQuota(lifecycleManager) || 
!checkWorkersAvailable(lifecycleManager)
+    if (needFallback && 
FallbackPolicy.NEVER.equals(conf.shuffleFallbackPolicy)) {
+      throw new CelebornIOException("Fallback to Spark's default shuffle is 
prohibited.")
+    }
+    needFallback

Review Comment:
   @littlexyw, could you take above comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to