This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ae39a9754 [CELEBORN-922] Improve celeborn shuffle maanger fallback log
message
ae39a9754 is described below
commit ae39a97548ef4c04a644525e54b738001bc25451
Author: Xiduo You <[email protected]>
AuthorDate: Sat Aug 26 13:26:26 2023 +0800
[CELEBORN-922] Improve celeborn shuffle maanger fallback log message
### What changes were proposed in this pull request?
When celeborn shuffle maanger fallback to vanilla Spark shuffle manager, we
should make sure the DRA is disabled, otherwise the reduer task may fail when
fetching block.
This pr improves the log to use error level to print fallback message if
DRA is enabled.
### Why are the changes needed?
Improve the log message.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
PASS CI
Closes #1842 from ulysses-you/log.
Authored-by: Xiduo You <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 85ccea491..cb536c0e6 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -129,7 +129,15 @@ public class SparkShuffleManager implements ShuffleManager
{
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
- logger.warn("Fallback to SortShuffleManager!");
+ if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+ logger.error(
+ "DRA is enabled but we fallback to vanilla Spark
SortShuffleManager for "
+ + "shuffle: {} due to fallback policy. It may cause block can
not found when reducer "
+ + "task fetch data.",
+ shuffleId);
+ } else {
+ logger.warn("Fallback to vanilla Spark SortShuffleManager for shuffle:
{}", shuffleId);
+ }
sortShuffleIds.add(shuffleId);
return sortShuffleManager().registerShuffle(shuffleId, dependency);
} else {