pan3793 commented on code in PR #2494:
URL: https://github.com/apache/celeborn/pull/2494#discussion_r1600993486
##########
client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala:
##########
@@ -25,47 +25,51 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+ private val checkWorkerEnabled = conf.checkWorkerEnabled
def applyAllFallbackPolicy(lifecycleManager: LifecycleManager,
numPartitions: Int): Boolean = {
val needFallback =
applyForceFallbackPolicy() ||
applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) ||
!checkWorkersAvailable(lifecycleManager)
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
- throw new CelebornIOException("Fallback to Spark's default shuffle is
prohibited.")
+ throw new CelebornIOException(
+ "Fallback to spark built-in shuffle implementation is prohibited.")
}
needFallback
}
/**
- * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
external shuffle
- * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+ * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
spark built-in shuffle implementation
+ * @return return true if celeborn.client.spark.shuffle.fallback.policy is
ALWAYS, otherwise false
*/
def applyForceFallbackPolicy(): Boolean = {
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
logWarning(
- s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+ s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, " +
+ s"forcibly fallback to spark built-in shuffle implementation.")
}
FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
}
/**
- * if shuffle partitions >
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external
shuffle
+ * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold,
fallback to spark built-in
+ * shuffle implementation
* @param numPartitions shuffle partitions
- * @return return if shuffle partitions bigger than limit
+ * @return return true if shuffle partitions bigger than limit, otherwise
false
*/
def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
- val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
+ val confNumPartitions = conf.shuffleFallbackPartitionThreshold
Review Comment:
resolved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]