This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 5aa2eb5e9 [CELEBORN-922] Improve celeborn shuffle maanger fallback log 
message
5aa2eb5e9 is described below

commit 5aa2eb5e98bb7b22646f2d091f5165dcce6fc9fc
Author: Xiduo You <[email protected]>
AuthorDate: Sat Aug 26 13:26:26 2023 +0800

    [CELEBORN-922] Improve celeborn shuffle maanger fallback log message
    
    ### What changes were proposed in this pull request?
    
    When celeborn shuffle maanger fallback to vanilla Spark shuffle manager, we 
should make sure the DRA is disabled, otherwise the reduer task may fail when 
fetching block.
    
    This pr improves the log to use error level to print fallback message if 
DRA is enabled.
    
    ### Why are the changes needed?
    
    Improve the log message.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    PASS CI
    
    Closes #1842 from ulysses-you/log.
    
    Authored-by: Xiduo You <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit ae39a97548ef4c04a644525e54b738001bc25451)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 85ccea491..cb536c0e6 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -129,7 +129,15 @@ public class SparkShuffleManager implements ShuffleManager 
{
 
     if (fallbackPolicyRunner.applyAllFallbackPolicy(
         lifecycleManager, dependency.partitioner().numPartitions())) {
-      logger.warn("Fallback to SortShuffleManager!");
+      if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+        logger.error(
+            "DRA is enabled but we fallback to vanilla Spark 
SortShuffleManager for "
+                + "shuffle: {} due to fallback policy. It may cause block can 
not found when reducer "
+                + "task fetch data.",
+            shuffleId);
+      } else {
+        logger.warn("Fallback to vanilla Spark SortShuffleManager for shuffle: 
{}", shuffleId);
+      }
       sortShuffleIds.add(shuffleId);
       return sortShuffleManager().registerShuffle(shuffleId, dependency);
     } else {

Reply via email to