This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 41a0f7ae9 [CELEBORN-1526] Fix MR plugin can not run on Hadoop 3.1.0
41a0f7ae9 is described below

commit 41a0f7ae91037ae90da37e2602295b93cf0c727c
Author: mingji <[email protected]>
AuthorDate: Mon Jul 29 20:16:44 2024 +0800

    [CELEBORN-1526] Fix MR plugin can not run on Hadoop 3.1.0
    
    ### What changes were proposed in this pull request?
    To fix an NPE when using Celeborn on Hadoop 3.1.0
    
    ### Why are the changes needed?
    Adapt to API change.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    Cluster test.
    
    Closes #2647 from FMX/b1526.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../mapreduce/task/reduce/CelebornShuffleConsumer.java       | 12 +++++++++++-
 .../hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java | 12 +++++++++---
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
 
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
index ee48e3a30..052b340ab 100644
--- 
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
+++ 
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
@@ -102,7 +102,7 @@ public class CelebornShuffleConsumer<K, V>
   private ShuffleClientMetrics createMetrics(
       org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf)
       throws NoSuchMethodException {
-    // for hadoop 3
+    // for hadoop 3.1+ see MAPREDUCE-6861
     try {
       return DynMethods.builder("create")
           .impl(
@@ -114,6 +114,16 @@ public class CelebornShuffleConsumer<K, V>
     } catch (Exception e) {
       // ignore this exception because the createMetrics might use hadoop2
     }
+
+    // for hadoop 3.1 see MAPREDUCE-6526
+    try {
+      return DynMethods.builder("create")
+          .impl(ShuffleClientMetrics.class)
+          .buildStaticChecked()
+          .invoke(taskAttemptID, jobConf);
+    } catch (Exception e) {
+    }
+
     // for hadoop 2
     return DynConstructors.builder(ShuffleClientMetrics.class)
         .hiddenImpl(new Class[] 
{org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class})
diff --git 
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
 
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
index 3944869f8..9f807e165 100644
--- 
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
+++ 
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
@@ -78,13 +78,17 @@ public class CelebornShuffleFetcher<K, V> {
         // If merge is on, block
         merger.waitForResource();
         // Do shuffle
-        metrics.threadBusy();
+        if (metrics != null) {
+          metrics.threadBusy();
+        }
         // read blocks
         fetchToLocalAndMerge();
       } catch (Exception e) {
         logger.error("Celeborn shuffle fetcher fetch data failed.", e);
       } finally {
-        metrics.threadFree();
+        if (metrics != null) {
+          metrics.threadFree();
+        }
       }
     }
   }
@@ -134,7 +138,9 @@ public class CelebornShuffleFetcher<K, V> {
       reporter.progress();
     } else {
       celebornInputStream.close();
-      metrics.inputBytes(inputShuffleSize);
+      if (metrics != null) {
+        metrics.inputBytes(inputShuffleSize);
+      }
       logger.info("reduce task {} read {} bytes", reduceId, inputShuffleSize);
       stopped = true;
     }

Reply via email to