This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 41a0f7ae9 [CELEBORN-1526] Fix MR plugin can not run on Hadoop 3.1.0
41a0f7ae9 is described below
commit 41a0f7ae91037ae90da37e2602295b93cf0c727c
Author: mingji <[email protected]>
AuthorDate: Mon Jul 29 20:16:44 2024 +0800
[CELEBORN-1526] Fix MR plugin can not run on Hadoop 3.1.0
### What changes were proposed in this pull request?
To fix an NPE when using Celeborn on Hadoop 3.1.0
### Why are the changes needed?
Adapt to API change.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster test.
Closes #2647 from FMX/b1526.
Authored-by: mingji <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../mapreduce/task/reduce/CelebornShuffleConsumer.java | 12 +++++++++++-
.../hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java | 12 +++++++++---
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
index ee48e3a30..052b340ab 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
@@ -102,7 +102,7 @@ public class CelebornShuffleConsumer<K, V>
private ShuffleClientMetrics createMetrics(
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf)
throws NoSuchMethodException {
- // for hadoop 3
+ // for hadoop 3.1+ see MAPREDUCE-6861
try {
return DynMethods.builder("create")
.impl(
@@ -114,6 +114,16 @@ public class CelebornShuffleConsumer<K, V>
} catch (Exception e) {
// ignore this exception because the createMetrics might use hadoop2
}
+
+ // for hadoop 3.1 see MAPREDUCE-6526
+ try {
+ return DynMethods.builder("create")
+ .impl(ShuffleClientMetrics.class)
+ .buildStaticChecked()
+ .invoke(taskAttemptID, jobConf);
+ } catch (Exception e) {
+ }
+
// for hadoop 2
return DynConstructors.builder(ShuffleClientMetrics.class)
.hiddenImpl(new Class[]
{org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class})
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
index 3944869f8..9f807e165 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
@@ -78,13 +78,17 @@ public class CelebornShuffleFetcher<K, V> {
// If merge is on, block
merger.waitForResource();
// Do shuffle
- metrics.threadBusy();
+ if (metrics != null) {
+ metrics.threadBusy();
+ }
// read blocks
fetchToLocalAndMerge();
} catch (Exception e) {
logger.error("Celeborn shuffle fetcher fetch data failed.", e);
} finally {
- metrics.threadFree();
+ if (metrics != null) {
+ metrics.threadFree();
+ }
}
}
}
@@ -134,7 +138,9 @@ public class CelebornShuffleFetcher<K, V> {
reporter.progress();
} else {
celebornInputStream.close();
- metrics.inputBytes(inputShuffleSize);
+ if (metrics != null) {
+ metrics.inputBytes(inputShuffleSize);
+ }
logger.info("reduce task {} read {} bytes", reduceId, inputShuffleSize);
stopped = true;
}