This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cd5609971 [CELEBORN-1434] Support MRAppMasterWithCeleborn to disable
job recovery and job reduce slow start by default
cd5609971 is described below
commit cd5609971f38b31b6f2cae6462c12bcb2c1aaa66
Author: SteNicholas <[email protected]>
AuthorDate: Wed May 22 15:32:41 2024 +0800
[CELEBORN-1434] Support MRAppMasterWithCeleborn to disable job recovery and
job reduce slow start by default
### What changes were proposed in this pull request?
`MRAppMasterWithCeleborn` disables
`yarn.app.mapreduce.am.job.recovery.enable` and sets
`mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
### Why are the changes needed?
MapReduce does not set the flag which indicates whether to keep containers
across application attempts in ApplicationSubmissionContext. Meanwhile, make
sure reduces are scheduled only after all map are completed. Therefore,
`MRAppMasterWithCeleborn` could disable
`yarn.app.mapreduce.am.job.recovery.enable` and set
`mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WordCountTest`
Closes #2525 from SteNicholas/CELEBORN-1434.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
README.md | 1 +
.../mapreduce/v2/app/MRAppMasterWithCeleborn.java | 18 +++++++++++++++++-
docs/README.md | 2 ++
docs/deploy.md | 1 +
.../org/apache/celeborn/tests/mr/WordCountTest.scala | 4 ----
5 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 912c2fccc..525b97894 100644
--- a/README.md
+++ b/README.md
@@ -345,6 +345,7 @@ Meanwhile, configure the following settings in YARN and
MapReduce config.
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer
```
+**Note**: `MRAppMasterWithCeleborn` disables
`yarn.app.mapreduce.am.job.recovery.enable` and sets
`mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
### Best Practice
If you want to set up a production-ready Celeborn cluster, your cluster should
have at least 3 masters and at least 4 workers.
diff --git
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
index 12483e562..fc5e0b3a4 100644
---
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
+++
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
@@ -68,7 +68,7 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
new LifecycleManager(applicationAttemptId.toString(), conf);
String lmHost = lifecycleManager.getHost();
int lmPort = lifecycleManager.getPort();
- logger.info("RMAppMaster initialized with {} {} {}", lmHost, lmPort,
applicationAttemptId);
+ logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort,
applicationAttemptId);
JobConf lmConf = new JobConf();
lmConf.clear();
lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
@@ -124,6 +124,7 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
public static void main(String[] args) {
JobConf rmAppConf = new JobConf(new YarnConfiguration());
rmAppConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+ checkJobConf(rmAppConf);
try {
Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
String containerIdStr =
ensureGetSysEnv(ApplicationConstants.Environment.CONTAINER_ID.name());
@@ -182,4 +183,19 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
ExitUtil.terminate(1, t);
}
}
+
+ public static void checkJobConf(JobConf conf) {
+ if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)) {
+ logger.warn("MRAppMaster disables job recovery.");
+ // MapReduce does not set the flag which indicates whether to keep
containers across
+ // application attempts in ApplicationSubmissionContext. Therefore,
there is no container
+ // shared between attempts.
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
+ }
+ if (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.05f)
!= 1.0f) {
+ logger.warn("MRAppMaster disables job reduce slow start.");
+ // Make sure reduces are scheduled only after all map are completed.
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+ }
+ }
}
diff --git a/docs/README.md b/docs/README.md
index 348318b0b..6fb0ffe31 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -207,6 +207,8 @@ cp $CELEBORN_HOME/mr/<Celeborn Client Jar>
<yarn.application.classpath>
</property>
</configuration>
```
+**Note**: `MRAppMasterWithCeleborn` disables
`yarn.app.mapreduce.am.job.recovery.enable` and sets
`mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
+
Then deploy the example word count to the running cluster for verifying
whether above configurations are correct.
```shell
cd $HADOOP_HOME
diff --git a/docs/deploy.md b/docs/deploy.md
index 7b1cff7c3..4d554460c 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -243,3 +243,4 @@ Meanwhile, configure the following settings in YARN and
MapReduce config.
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer
```
+**Note**: `MRAppMasterWithCeleborn` disables
`yarn.app.mapreduce.am.job.recovery.enable` and sets
`mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
diff --git
a/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
index be8726d51..221b9e103 100644
---
a/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
+++
b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
@@ -109,7 +109,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
try {
val conf = new Configuration(yarnCluster.getConfig)
// YARN config
- conf.set("yarn.app.mapreduce.am.job.recovery.enable", "false")
conf.set(
"yarn.app.mapreduce.am.command-opts",
"org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn")
@@ -118,7 +117,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
conf.set("mapreduce.framework.name", "yarn")
conf.set("mapreduce.job.user.classpath.first", "true")
- conf.set("mapreduce.job.reduce.slowstart.completedmaps", "1")
conf.set(
"mapreduce.celeborn.master.endpoints",
s"localhost:${master.conf.get(CelebornConf.MASTER_PORT)}")
@@ -196,7 +194,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
var exitCode = false
val conf = new Configuration(yarnCluster.getConfig)
// YARN config
- conf.set("yarn.app.mapreduce.am.job.recovery.enable", "false")
conf.set(
"yarn.app.mapreduce.am.command-opts",
"org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn")
@@ -205,7 +202,6 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
conf.set("mapreduce.framework.name", "yarn")
conf.set("mapreduce.job.user.classpath.first", "true")
- conf.set("mapreduce.job.reduce.slowstart.completedmaps", "1")
conf.set(
"mapreduce.celeborn.master.endpoints",
s"errorhost:${master.conf.get(CelebornConf.MASTER_PORT)}")