This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e0c00ecd3 [CELEBORN-839][MR] Support Hadoop MapReduce
e0c00ecd3 is described below
commit e0c00ecd3828abcf8a11d1fac39c183825db934d
Author: mingji <[email protected]>
AuthorDate: Thu Sep 14 14:12:53 2023 +0800
[CELEBORN-839][MR] Support Hadoop MapReduce
### What changes were proposed in this pull request?
1. Map side merge and push.
2. Support hadoop2 & 3.
3. Reduce in-memory merge.
4. Integrate LifecycleManager to RmApplicationMaster.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster.
I tested this PR on a cluster with a 4x 16 CPU 64G Mem 4ESSD cluster.
Hadoop 2.8.5
1TB Terasort, 8400 mappers, 1000 reducers
Celeborn 81min vs MR shuffle 89min


1GB wordcount, 8 mappers, 8 reducers
Celeborn 35s VS MR shuffle 38s


Closes #1830 from FMX/CELEBORN-839.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
README.md | 37 +--
build/make-distribution.sh | 50 ++-
client-mr/mr-shaded/pom.xml | 138 +++++++++
client-mr/mr/pom.xml | 58 ++++
.../mapreduce/v2/app/MRAppMasterWithCeleborn.java | 173 +++++++++++
.../java/org/apache/celeborn/util/HadoopUtils.java | 43 +++
.../hadoop/mapred/CelebornMapOutputCollector.java | 132 ++++++++
.../hadoop/mapred/CelebornSortBasedPusher.java | 336 +++++++++++++++++++++
.../task/reduce/CelebornShuffleConsumer.java | 165 ++++++++++
.../task/reduce/CelebornShuffleFetcher.java | 223 ++++++++++++++
.../celeborn/client/read/CelebornInputStream.java | 24 ++
common/pom.xml | 6 +-
.../org/apache/celeborn/common/CelebornConf.scala | 10 +
dev/reformat | 1 +
docs/configuration/client.md | 1 +
master/pom.xml | 14 +-
.../master/SlotsAllocatorRackAwareSuiteJ.java | 13 +-
.../master/network/CelebornRackResolverSuite.scala | 12 +-
pom.xml | 45 ++-
19 files changed, 1430 insertions(+), 51 deletions(-)
diff --git a/README.md b/README.md
index ed7c256bd..0d5866061 100644
--- a/README.md
+++ b/README.md
@@ -41,12 +41,12 @@ Celeborn Worker's slot count is decided by `total usable
disk size / average shu
Celeborn worker's slot count decreases when a partition is allocated and
increments when a partition is freed.
## Build
-1.Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4 and flink 1.14/1.15/1.17.
+1.Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4, Flink 1.14/1.15/1.17 and
Hadoop MapReduce 2/3.
2.Celeborn tested under Java 8 environment.
Build Celeborn
```shell
-./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17
+./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pmr
```
package apache-celeborn-${project.version}-bin.tgz will be generated.
@@ -54,7 +54,7 @@ package apache-celeborn-${project.version}-bin.tgz will be
generated.
### Package Details
Build procedure will create a compressed package.
-Spark package layout:
+General package layout:
```
├── RELEASE
├── bin
@@ -62,20 +62,10 @@ Spark package layout:
├── jars // common jars for master and worker
├── master-jars
├── worker-jars
- ├── sbin
- └── spark // Spark client jars
-```
-
-Flink package layout:
-```
- ├── RELEASE
- ├── bin
- ├── conf
- ├── jars // common jars for master and worker
- ├── master-jars
- ├── worker-jars
- ├── sbin
- └── flink // flink client jars
+ ├── spark // Spark client jars if spark profiles are activated
+ ├── flink // Flink client jars if flink profiles are activated
+ ├── mr // MapReduce client jars if mr profile is activated
+ └── sbin
```
### Compatibility
@@ -293,6 +283,19 @@ taskmanager.network.memory.buffers-per-channel: 0
taskmanager.memory.task.off-heap.size: 512m
```
+### Deploy mapreduce client
+Add $CELEBORN_HOME/mr/*.jar to to `mapreduce.application.classpath` and
`yarn.application.classpath`.
+And setting the following settings in YARN and MapReduce config.
+```bash
+-Dyarn.app.mapreduce.am.job.recovery.enable=false
+-Dmapreduce.job.reduce.slowstart.completedmaps=1
+-Dmapreduce.celeborn.master.endpoints=<master-1-1>:9097
+-Dyarn.app.mapreduce.am.command-opts=org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn
+-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
+-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer
+```
+
+
### Best Practice
If you want to set up a production-ready Celeborn cluster, your cluster should
have at least 3 masters and at least 4 workers.
Masters and works can be deployed on the same node but should not deploy
multiple masters or workers on the same node.
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 1b2037c9b..42a2d20ba 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -218,6 +218,24 @@ function build_flink_client {
cp
"$PROJECT_DIR"/client-flink/flink-$FLINK_BINARY_VERSION-shaded/target/celeborn-client-flink-${FLINK_BINARY_VERSION}-shaded_$SCALA_VERSION-$VERSION.jar
"$DIST_DIR/flink/"
}
+function build_mr_client {
+ VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null \
+ | grep -v "INFO" \
+ | grep -v "WARNING" \
+ | tail -n 1)
+ BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl
:celeborn-client-mr-shaded_${SCALA_VERSION} -am $@)
+
+ # Actually build the jar
+ echo -e "\nBuilding with..."
+ echo -e "\$ ${BUILD_COMMAND[@]}\n"
+
+ "${BUILD_COMMAND[@]}"
+
+ ## flink spark client jars
+ mkdir -p "$DIST_DIR/mr"
+ cp
"$PROJECT_DIR"/client-mr/mr-shaded/target/celeborn-client-mr-shaded_${SCALA_VERSION}-$VERSION.jar
"$DIST_DIR/mr/"
+}
+
if [ "$RELEASE" == "true" ]; then
build_service
build_spark_client -Pspark-2.4
@@ -225,19 +243,35 @@ if [ "$RELEASE" == "true" ]; then
build_flink_client -Pflink-1.14
build_flink_client -Pflink-1.15
build_flink_client -Pflink-1.17
+ build_mr_client mr
else
## build release package on demand
build_service $@
- if [[ $@ != *"spark"* && $@ != *"flink"* ]]; then
- echo "Skip building client."
- elif [[ $@ == *"spark"* && $@ != *"flink"* ]]; then
- build_spark_client $@
- elif [[ $@ == *"flink"* && $@ != *"spark"* ]]; then
- build_flink_client $@
- else
+ echo "build client with $@"
+ ENGINE_COUNT=0
+ ENGINES=("spark" "flink" "mr")
+ for single_engine in ${ENGINES[@]}
+ do
+ echo $single_engine
+ if [[ $@ == *"${single_engine}"* ]];then
+ ENGINE_COUNT=`expr ${ENGINE_COUNT} + 1`
+ fi
+ done
+ if [[ ${ENGINE_COUNT} -eq 0 ]]; then
+ echo "Skip building client."
+ elif [[ ${ENGINE_COUNT} -ge 2 ]]; then
echo "Error: unsupported build options: $@"
- echo " currently we do not support compiling Spark and Flink clients
at the same time."
+ echo " currently we do not support compiling different engine
clients at the same time."
exit -1
+ elif [[ $@ == *"spark"* ]]; then
+ echo "build spark clients"
+ build_spark_client $@
+ elif [[ $@ == *"flink"* ]]; then
+ echo "build flink clients"
+ build_flink_client $@
+ elif [[ $@ == *"mr"* ]]; then
+ echo "build mr clients"
+ build_mr_client $@
fi
fi
diff --git a/client-mr/mr-shaded/pom.xml b/client-mr/mr-shaded/pom.xml
new file mode 100644
index 000000000..c81a9c808
--- /dev/null
+++ b/client-mr/mr-shaded/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-mr-shaded_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Shaded Client for MapReduce</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client-mr_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.scala-lang</pattern>
+ <shadedPattern>${shading.prefix}.org.scala-lang</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.lz4</pattern>
+ <shadedPattern>${shading.prefix}.org.lz4</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>org.apache.celeborn:*</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>io.netty:*</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>org.scala-lang:scala-library</include>
+ <include>org.lz4:lz4-java</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.plugin.antrun.version}</version>
+ <executions>
+ <execution>
+ <id>rename-native-library</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <echo message="unpacking netty jar"></echo>
+ <unzip dest="${project.build.directory}/unpacked/"
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+ <echo message="renaming native epoll library"></echo>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_x86_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so"
type="glob"></mapper>
+ </move>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_aarch_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so"
type="glob"></mapper>
+ </move>
+ <echo message="deleting native kqueue library"></echo>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+ <echo message="repackaging netty jar"></echo>
+ <jar basedir="${project.build.directory}/unpacked"
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/client-mr/mr/pom.xml b/client-mr/mr/pom.xml
new file mode 100644
index 000000000..402ffa507
--- /dev/null
+++ b/client-mr/mr/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-mr_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for MapReduce</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
new file mode 100644
index 000000000..f6d06acdd
--- /dev/null
+++
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+ private static final Logger logger =
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+ public MRAppMasterWithCeleborn(
+ ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId,
+ String nmHost,
+ int nmPort,
+ int nmHttpPort,
+ long appSubmitTime,
+ JobConf jobConf)
+ throws CelebornIOException {
+ super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
appSubmitTime);
+
+ int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ if (numReducers > 0) {
+ CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+ LifecycleManager lifecycleManager =
+ new LifecycleManager(applicationAttemptId.toString(), conf);
+ String lmHost = lifecycleManager.getHost();
+ int lmPort = lifecycleManager.getPort();
+ logger.info("RMAppMaster initialized with {} {} {}", lmHost, lmPort,
applicationAttemptId);
+ JobConf lmConf = new JobConf();
+ lmConf.clear();
+ lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
+ lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + "");
+ lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID,
applicationAttemptId.toString());
+ writeLifecycleManagerConfToTask(jobConf, lmConf);
+ }
+ }
+
+ private void writeLifecycleManagerConfToTask(JobConf conf, JobConf lmConf)
+ throws CelebornIOException {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+ Path celebornConfPath = new Path(jobDirStr,
HadoopUtils.MR_CELEBORN_CONF);
+
+ try (FSDataOutputStream out =
+ FileSystem.create(
+ fs, celebornConfPath, new
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
+ lmConf.writeXml(out);
+ }
+ FileStatus status = fs.getFileStatus(celebornConfPath);
+ long currentTs = status.getModificationTime();
+ String uri = fs.getUri() + Path.SEPARATOR + celebornConfPath.toUri();
+ String files = conf.get(MRJobConfig.CACHE_FILES);
+ conf.set(MRJobConfig.CACHE_FILES, files == null ? uri : uri + "," +
files);
+ String ts = conf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+ conf.set(
+ MRJobConfig.CACHE_FILE_TIMESTAMPS,
+ ts == null ? String.valueOf(currentTs) : currentTs + "," + ts);
+ String vis = conf.get(MRJobConfig.CACHE_FILE_VISIBILITIES);
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, vis == null ? "false" :
"false" + "," + vis);
+ long size = status.getLen();
+ String sizes = conf.get(MRJobConfig.CACHE_FILES_SIZES);
+ conf.set(
+ MRJobConfig.CACHE_FILES_SIZES, sizes == null ? String.valueOf(size)
: size + "," + sizes);
+ } catch (IOException e) {
+ logger.error("Upload extra conf exception", e);
+ throw new CelebornIOException("Upload extra conf exception ", e);
+ }
+ }
+
+ private static String ensureGetSysEnv(String envName) throws IOException {
+ String value = System.getenv(envName);
+ if (value == null) {
+ String msg = envName + " is null";
+ logger.error(msg);
+ throw new CelebornIOException(msg);
+ }
+ return value;
+ }
+
+ public static void main(String[] args) {
+ JobConf rmAppConf = new JobConf(new YarnConfiguration());
+ rmAppConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+ try {
+ Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
+ String containerIdStr =
ensureGetSysEnv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ String nodeHostString =
ensureGetSysEnv(ApplicationConstants.Environment.NM_HOST.name());
+ String nodePortString =
ensureGetSysEnv(ApplicationConstants.Environment.NM_PORT.name());
+ String nodeHttpPortString =
+
ensureGetSysEnv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+ String appSubmitTimeStr = ensureGetSysEnv("APP_SUBMIT_TIME_ENV");
+ ContainerId containerId = ContainerId.fromString(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
+ if (applicationAttemptId != null) {
+ CallerContext.setCurrent(
+ (new CallerContext.Builder("mr_app_master_with_celeborn_" +
applicationAttemptId))
+ .build());
+ }
+
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+ MRAppMasterWithCeleborn appMaster =
+ new MRAppMasterWithCeleborn(
+ applicationAttemptId,
+ containerId,
+ nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString),
+ appSubmitTime,
+ rmAppConf);
+ ShutdownHookManager.get()
+ .addShutdownHook(
+ () -> {
+ logger.info("MRAppMasterWithCeleborn received stop signal.");
+ appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry);
+ appMaster.stop();
+ },
+ 30);
+ MRWebAppUtil.initialize(rmAppConf);
+ String systemPropsToLog = MRApps.getSystemPropertiesToLog(rmAppConf);
+ if (systemPropsToLog != null) {
+ logger.info(systemPropsToLog);
+ }
+ String jobUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
+ rmAppConf.set("mapreduce.job.user.name", jobUserName);
+ initAndStartAppMaster(appMaster, rmAppConf, jobUserName);
+ } catch (Throwable t) {
+ logger.error("Error starting MRAppMaster", t);
+ ExitUtil.terminate(1, t);
+ }
+ }
+}
diff --git
a/client-mr/mr/src/main/java/org/apache/celeborn/util/HadoopUtils.java
b/client-mr/mr/src/main/java/org/apache/celeborn/util/HadoopUtils.java
new file mode 100644
index 000000000..718d82d5a
--- /dev/null
+++ b/client-mr/mr/src/main/java/org/apache/celeborn/util/HadoopUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.util;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class HadoopUtils {
+ public static final String MR_CELEBORN_CONF = "celeborn.xml";
+ public static final String MR_CELEBORN_LM_HOST =
"celeborn.lifecycleManager.host";
+ public static final String MR_CELEBORN_LM_PORT =
"celeborn.lifecycleManager.port";
+ public static final String MR_CELEBORN_APPLICATION_ID =
"celeborn.applicationId";
+
+ public static CelebornConf fromYarnConf(JobConf conf) {
+ CelebornConf tmpCelebornConf = new CelebornConf();
+ for (Map.Entry<String, String> property : conf) {
+ String proName = property.getKey();
+ String proValue = property.getValue();
+ if (proName.startsWith("mapreduce.celeborn")) {
+ tmpCelebornConf.set(proName.substring("mapreduce.".length()),
proValue);
+ }
+ }
+ return tmpCelebornConf;
+ }
+}
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
new file mode 100644
index 000000000..fcecf85fb
--- /dev/null
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornMapOutputCollector<K extends Object, V extends Object>
+ implements MapOutputCollector<K, V> {
+ private static final Logger logger =
LoggerFactory.getLogger(CelebornMapOutputCollector.class);
+ private Class<K> keyClass;
+ private Class<V> valClass;
+ private Task.TaskReporter reporter;
+ private CelebornSortBasedPusher<K, V> celebornSortBasedPusher;
+ private int numReducers;
+
+ @Override
+ public void init(Context context) throws IOException {
+ JobConf jobConf = context.getJobConf();
+ reporter = context.getReporter();
+ keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
+ valClass = (Class<V>) jobConf.getMapOutputValueClass();
+ context.getMapTask().getTaskID().getId();
+ numReducers = jobConf.getNumReduceTasks();
+
+ int IOBufferSize = jobConf.getInt(JobContext.IO_SORT_MB, 100);
+ // Java bytebuffer cannot be larger than Integer.MAX_VALUE
+ if ((IOBufferSize & 0x7FF) != IOBufferSize) {
+ throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " +
IOBufferSize);
+ }
+
+ CelebornConf celebornConf = HadoopUtils.fromYarnConf(jobConf);
+ JobConf celebornAppendConf = new JobConf(HadoopUtils.MR_CELEBORN_CONF);
+ String lmHost = celebornAppendConf.get(HadoopUtils.MR_CELEBORN_LM_HOST);
+ int lmPort =
Integer.parseInt(celebornAppendConf.get(HadoopUtils.MR_CELEBORN_LM_PORT));
+ String applicationAttemptId =
celebornAppendConf.get(HadoopUtils.MR_CELEBORN_APPLICATION_ID);
+ logger.info(
+ "Mapper initialized with celeborn {} {} {} {}",
+ lmHost,
+ lmPort,
+ applicationAttemptId,
+ IOBufferSize);
+ UserIdentifier userIdentifier =
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName());
+
+ final float spiller = jobConf.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,
(float) 0.8);
+ int pushSize = (int) ((IOBufferSize << 20) * spiller);
+
+ SerializationFactory serializationFactory = new
SerializationFactory(jobConf);
+ celebornSortBasedPusher =
+ new CelebornSortBasedPusher<>(
+ jobConf.getNumMapTasks(),
+ jobConf.getNumReduceTasks(),
+ // this is map id
+ context.getMapTask().getTaskID().getTaskID().getId(),
+ // this is attempt id
+ context.getMapTask().getTaskID().getId(),
+ serializationFactory.getSerializer(keyClass),
+ serializationFactory.getSerializer(valClass),
+ IOBufferSize << 20,
+ pushSize,
+ jobConf.getOutputKeyComparator(),
+ reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES),
+ reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS),
+ ShuffleClient.get(applicationAttemptId, lmHost, lmPort,
celebornConf, userIdentifier),
+ celebornConf);
+ }
+
+ @Override
+ public void collect(K key, V value, int partition) throws IOException {
+ reporter.progress();
+ if (key.getClass() != keyClass) {
+ throw new IOException(
+ "Type mismatch in key from map: expected "
+ + keyClass.getName()
+ + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException(
+ "Type mismatch in value from map: expected "
+ + valClass.getName()
+ + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= numReducers) {
+ throw new IOException("Illegal partition for " + key + " (" + partition
+ ")");
+ }
+ celebornSortBasedPusher.checkException();
+ celebornSortBasedPusher.insert(key, value, partition);
+ }
+
+ @Override
+ public void close() {
+ logger.info("Mapper collector close");
+ reporter.progress();
+ celebornSortBasedPusher.close();
+ }
+
+ @Override
+ public void flush() {
+ logger.info("Mapper collector flush");
+ celebornSortBasedPusher.flush();
+ reporter.progress();
+ }
+}
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
new file mode 100644
index 000000000..7aa3857bd
--- /dev/null
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+ private final Logger logger =
LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+ private final int mapId;
+ private final int attempt;
+ private final int numMappers;
+ private final int numReducers;
+ private final ShuffleClient shuffleClient;
+ private final int maxIOBufferSize;
+ private final int spillIOBufferSize;
+ private final Serializer<K> kSer;
+ private final Serializer<V> vSer;
+ private final RawComparator<K> comparator;
+ private final AtomicReference<Exception> exception = new AtomicReference<>();
+ private final Counters.Counter mapOutputByteCounter;
+ private final Counters.Counter mapOutputRecordCounter;
+ private final Map<Integer, List<SerializedKV>> partitionedKVs;
+ private int writePos;
+ private byte[] serializedKV;
+ private final int maxPushDataSize;
+
+ public CelebornSortBasedPusher(
+ int numMappers,
+ int numReducers,
+ int mapId,
+ int attemptId,
+ Serializer<K> kSer,
+ Serializer<V> vSer,
+ int maxIOBufferSize,
+ int spillIOBufferSize,
+ RawComparator<K> comparator,
+ Counters.Counter mapOutputByteCounter,
+ Counters.Counter mapOutputRecordCounter,
+ ShuffleClient shuffleClient,
+ CelebornConf celebornConf) {
+ this.numMappers = numMappers;
+ this.numReducers = numReducers;
+ this.mapId = mapId;
+ this.attempt = attemptId;
+ this.kSer = kSer;
+ this.vSer = vSer;
+ this.maxIOBufferSize = maxIOBufferSize;
+ this.spillIOBufferSize = spillIOBufferSize;
+ this.mapOutputByteCounter = mapOutputByteCounter;
+ this.mapOutputRecordCounter = mapOutputRecordCounter;
+ this.comparator = comparator;
+ this.shuffleClient = shuffleClient;
+ partitionedKVs = new HashMap<>();
+ serializedKV = new byte[maxIOBufferSize];
+ maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+ logger.info(
+ "Sort based push initialized with"
+ + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+ + " maxIOBufferSize:{} spillIOBufferSize:{}",
+ numMappers,
+ numReducers,
+ mapId,
+ attemptId,
+ maxIOBufferSize,
+ spillIOBufferSize);
+ try {
+ kSer.open(this);
+ vSer.open(this);
+ } catch (IOException e) {
+ exception.compareAndSet(null, e);
+ }
+ }
+
+ public void insert(K key, V value, int partition) {
+ try {
+ if (writePos >= spillIOBufferSize) {
+ // needs to sort and flush data
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Data is large enough {}/{}/{}, trigger sort and flush",
+ Utils.bytesToString(writePos),
+ Utils.bytesToString(spillIOBufferSize),
+ Utils.bytesToString(maxIOBufferSize));
+ }
+ sortKVs();
+ sendKVAndUpdateWritePos();
+ }
+ int dataLen = insertRecordInternal(key, value, partition);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Sort based pusher insert into partition:{} with {} bytes",
partition, dataLen);
+ }
+ mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(dataLen);
+ } catch (IOException e) {
+ exception.compareAndSet(null, e);
+ }
+ }
+
+ private void sendKVAndUpdateWritePos() throws IOException {
+ Iterator<Map.Entry<Integer, List<SerializedKV>>> entryIter =
+ partitionedKVs.entrySet().iterator();
+ while (entryIter.hasNext()) {
+ Map.Entry<Integer, List<SerializedKV>> entry = entryIter.next();
+ entryIter.remove();
+ int partition = entry.getKey();
+ List<SerializedKV> kvs = entry.getValue();
+ List<SerializedKV> localKVs = new ArrayList<>();
+ int partitionKVTotalLen = 0;
+ // process buffers for specific partition
+ for (SerializedKV kv : kvs) {
+ partitionKVTotalLen += kv.kLen + kv.vLen;
+ localKVs.add(kv);
+ if (partitionKVTotalLen > maxPushDataSize) {
+ // limit max size of pushdata to avoid possible memory issue in
Celeborn worker
+ // data layout
+ // pushdata header (16) + pushDataLen(4) +
+ // [varKeyLen+varValLen+serializedRecord(x)][...]
+ sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+ localKVs.clear();
+ partitionKVTotalLen = 0;
+ }
+ }
+ if (!localKVs.isEmpty()) {
+ sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+ }
+ kvs.clear();
+ }
+ // all data sent
+ partitionedKVs.clear();
+ writePos = 0;
+ }
+
+ private void sendSortedBuffersPartition(
+ int partition, List<SerializedKV> localKVs, int partitionKVTotalLen)
throws IOException {
+ int extraSize = 0;
+ for (SerializedKV localKV : localKVs) {
+ extraSize += WritableUtils.getVIntSize(localKV.kLen);
+ extraSize += WritableUtils.getVIntSize(localKV.vLen);
+ }
+ // copied from hadoop logic
+ extraSize += WritableUtils.getVIntSize(-1);
+ extraSize += WritableUtils.getVIntSize(-1);
+ // whole buffer's size +
[(keyLen+valueLen)+(serializedKey+serializedValue)]
+ byte[] pkvs = new byte[4 + extraSize + partitionKVTotalLen];
+ int pkvsPos = 4;
+ Platform.putInt(pkvs, Platform.BYTE_ARRAY_OFFSET, partitionKVTotalLen +
extraSize);
+ for (SerializedKV kv : localKVs) {
+ int recordLen = kv.kLen + kv.vLen;
+ // write key len
+ pkvsPos = writeVLong(pkvs, pkvsPos, kv.kLen);
+ // write value len
+ pkvsPos = writeVLong(pkvs, pkvsPos, kv.vLen);
+ // write serialized record
+ System.arraycopy(serializedKV, kv.offset, pkvs, pkvsPos, recordLen);
+ pkvsPos += recordLen;
+ }
+ // finally write -1 two times
+ pkvsPos = writeVLong(pkvs, pkvsPos, -1);
+ writeVLong(pkvs, pkvsPos, -1);
+ int compressedSize =
+ shuffleClient.pushData(
+ // there is only 1 shuffle for a mr application
+ 0,
+ mapId,
+ attempt,
+ partition,
+ pkvs,
+ 0,
+ 4 + extraSize + partitionKVTotalLen,
+ numMappers,
+ numReducers);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Send sorted buffer mapId:{} attemptId:{} to partition:{}
uncompressed size:{} compressed size:{}",
+ mapId,
+ attempt,
+ partition,
+ Utils.bytesToString(4 + extraSize + partitionKVTotalLen),
+ Utils.bytesToString(compressedSize));
+ }
+ }
+
+ /**
+ * Write variable length int to array Modified from
+ * org.apache.hadoop.io.WritableUtils#writeVLong(java.io.DataOutput, long)
+ */
+ private int writeVLong(byte[] data, int offset, long dataInt) {
+ if (dataInt >= -112L && dataInt <= 127L) {
+ data[offset++] = (byte) ((int) dataInt);
+ return offset;
+ }
+
+ int len = -112;
+ if (dataInt < 0L) {
+ dataInt ^= -1L;
+ len = -120;
+ }
+
+ long tmp = dataInt;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ data[offset++] = (byte) len;
+
+ len = len < -120 ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; --idx) {
+ int shiftBits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftBits;
+ data[offset++] = ((byte) ((int) ((dataInt & mask) >> shiftBits)));
+ }
+ return offset;
+ }
+
+ private void sortKVs() {
+ for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
partitionedKVs.entrySet()) {
+ partitionKVEntry
+ .getValue()
+ .sort(
+ (o1, o2) ->
+ comparator.compare(
+ serializedKV, o1.offset, o1.kLen, serializedKV,
o2.offset, o2.kLen));
+ }
+ }
+
+ private int insertRecordInternal(K key, V value, int partition) throws
IOException {
+ int offset = writePos;
+ int keyLen;
+ int valLen;
+ kSer.serialize(key);
+ keyLen = writePos - offset;
+ vSer.serialize(value);
+ valLen = writePos - keyLen - offset;
+ List<SerializedKV> serializedKVs =
+ partitionedKVs.computeIfAbsent(partition, v -> new ArrayList<>());
+ serializedKVs.add(new SerializedKV(offset, keyLen, valLen));
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Pusher insert into buffer partition:{} offset:{} keyLen:{}
valueLen:{} size:{}",
+ partition,
+ offset,
+ keyLen,
+ valLen,
+ partitionedKVs.size());
+ }
+ return keyLen + valLen;
+ }
+
+ public void checkException() throws IOException {
+ if (exception.get() != null) {
+ throw new IOException("Write data to celeborn failed", exception.get());
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (writePos < maxIOBufferSize) {
+ serializedKV[writePos] = (byte) b;
+ writePos++;
+ } else {
+ logger.warn("Sort push memory high, write pos {} max size {}", writePos,
maxIOBufferSize);
+ throw new IOException("Sort pusher memory exhausted.");
+ }
+ }
+
+ public void flush() {
+ logger.info("Sort based pusher called flush");
+ try {
+ sortKVs();
+ sendKVAndUpdateWritePos();
+ } catch (IOException e) {
+ exception.compareAndSet(null, e);
+ }
+ }
+
+ public void close() {
+ flush();
+ try {
+ logger.info(
+ "Call mapper end shuffleId:{} mapId:{} attemptId:{} numMappers:{}",
+ 0,
+ mapId,
+ attempt,
+ numMappers);
+ shuffleClient.mapperEnd(0, mapId, attempt, numMappers);
+ } catch (IOException e) {
+ logger.error("Mapper end failed, data lost", e);
+ }
+ partitionedKVs.clear();
+ serializedKV = null;
+ }
+
+ static class SerializedKV {
+ final int offset;
+ final int kLen;
+ final int vLen;
+
+ public SerializedKV(int offset, int kLen, int vLen) {
+ this.offset = offset;
+ this.kLen = kLen;
+ this.vLen = vLen;
+ }
+ }
+}
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
new file mode 100644
index 000000000..b3292b600
--- /dev/null
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle.ShuffleError;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornShuffleConsumer<K, V>
+ implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
+ private static final Logger logger =
LoggerFactory.getLogger(CelebornShuffleConsumer.class);
+ private JobConf mrJobConf;
+ private MergeManager<K, V> merger;
+ private Throwable throwable = null;
+ private Progress copyPhase;
+ private TaskStatus taskStatus;
+ private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+ private TaskUmbilicalProtocol umbilical;
+ private Reporter reporter;
+ private ShuffleClientMetrics metrics;
+ private Task reduceTask;
+ private ShuffleClient shuffleClient;
+
+ @Override
+ public void init(Context<K, V> context) {
+
+ reduceId = context.getReduceId();
+ mrJobConf = context.getJobConf();
+ JobConf celebornJobConf = new JobConf(HadoopUtils.MR_CELEBORN_CONF);
+
+ umbilical = context.getUmbilical();
+ reporter = context.getReporter();
+ try {
+ this.metrics = createMetrics(reduceId, mrJobConf);
+ } catch (Exception e) {
+ logger.error("Fatal error occurred, failed to get shuffle client
metrics.", e);
+ reportException(e);
+ }
+ copyPhase = context.getCopyPhase();
+ taskStatus = context.getStatus();
+ reduceTask = context.getReduceTask();
+
+ String appId = celebornJobConf.get(HadoopUtils.MR_CELEBORN_APPLICATION_ID);
+ String lmHost = celebornJobConf.get(HadoopUtils.MR_CELEBORN_LM_HOST);
+ int lmPort =
Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LM_PORT));
+ logger.info("Reducer initialized with celeborn {} {} {}", appId, lmHost,
lmPort);
+ CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf);
+ shuffleClient =
+ ShuffleClient.get(
+ appId,
+ lmHost,
+ lmPort,
+ celebornConf,
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName()));
+ this.merger =
+ new MergeManagerImpl<>(
+ reduceId,
+ mrJobConf,
+ context.getLocalFS(),
+ context.getLocalDirAllocator(),
+ reporter,
+ context.getCodec(),
+ context.getCombinerClass(),
+ context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(),
+ this,
+ context.getMergePhase(),
+ context.getMapOutputFile());
+ }
+
+ private ShuffleClientMetrics createMetrics(
+ org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf)
+ throws NoSuchMethodException {
+ // for hadoop 3
+ try {
+ return DynMethods.builder("create")
+ .impl(
+ ShuffleClientMetrics.class,
+ org.apache.hadoop.mapreduce.TaskAttemptID.class,
+ JobConf.class)
+ .buildStaticChecked()
+ .invoke(taskAttemptID, jobConf);
+ } catch (Exception e) {
+ // ignore this exception because the createMetrics might use hadoop2
+ }
+ // for hadoop 2
+ return DynConstructors.builder(ShuffleClientMetrics.class)
+ .hiddenImpl(new Class[]
{org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class})
+ .buildChecked()
+ .invoke(null, taskAttemptID, jobConf);
+ }
+
+ @Override
+ public RawKeyValueIterator run() throws IOException {
+ logger.info(
+ "In reduce:{}, Celeborn mr client start to read shuffle data."
+ + " Create inputstream with params: shuffleId 0 reduceId:{}
attemptId:{}",
+ reduceId,
+ reduceId.getTaskID().getId(),
+ reduceId.getId());
+
+ CelebornInputStream shuffleInputStream =
+ shuffleClient.readPartition(
+ 0, reduceId.getTaskID().getId(), reduceId.getId(), 0,
Integer.MAX_VALUE);
+ CelebornShuffleFetcher<K, V> shuffleReader =
+ new CelebornShuffleFetcher(
+ reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleInputStream);
+ shuffleReader.fetchAndMerge();
+
+ copyPhase.complete();
+ taskStatus.setPhase(TaskStatus.Phase.SORT);
+ reduceTask.statusUpdate(umbilical);
+
+ RawKeyValueIterator kvIter;
+ try {
+ kvIter = merger.close();
+ } catch (Throwable e) {
+ throw new ShuffleError("Error while doing final merge ", e);
+ }
+
+ logger.info("In reduce: {} Celeborn mr client read shuffle data complete",
reduceId);
+
+ return kvIter;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void reportException(Throwable throwable) {
+ if (this.throwable == null) {
+ this.throwable = throwable;
+ }
+ }
+}
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
new file mode 100644
index 000000000..3944869f8
--- /dev/null
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.unsafe.Platform;
+
+public class CelebornShuffleFetcher<K, V> {
+ private static final Logger logger =
LoggerFactory.getLogger(CelebornShuffleFetcher.class);
+ private final TaskAttemptID reduceId;
+ private final Reporter reporter;
+ private final TaskStatus status;
+ private final MergeManager<K, V> merger;
+ private final Progress progress;
+ private final ShuffleClientMetrics metrics;
+ private final CelebornInputStream celebornInputStream;
+ private volatile boolean stopped = false;
+ private int uniqueMapId = 0;
+ private final Counters.Counter ioErrs;
+ private boolean hasPendingData = false;
+ private long inputShuffleSize;
+ private byte[] shuffleData;
+
+ public CelebornShuffleFetcher(
+ TaskAttemptID reduceId,
+ TaskStatus status,
+ MergeManager<K, V> merger,
+ Progress progress,
+ Reporter reporter,
+ ShuffleClientMetrics metrics,
+ CelebornInputStream input) {
+ this.reduceId = reduceId;
+ this.reporter = reporter;
+ this.status = status;
+ this.merger = merger;
+ this.progress = progress;
+ this.metrics = metrics;
+ this.celebornInputStream = input;
+
+ ioErrs = reporter.getCounter("Shuffle Errors", "IO_ERROR");
+ }
+
+ // fetch all push data and merge
+ public void fetchAndMerge() {
+ while (!stopped) {
+ try {
+ // If merge is on, block
+ merger.waitForResource();
+ // Do shuffle
+ metrics.threadBusy();
+ // read blocks
+ fetchToLocalAndMerge();
+ } catch (Exception e) {
+ logger.error("Celeborn shuffle fetcher fetch data failed.", e);
+ } finally {
+ metrics.threadFree();
+ }
+ }
+ }
+
+ private byte[] getShuffleBlock() throws IOException {
+ // get len
+ byte[] header = new byte[4];
+ int count = celebornInputStream.read(header);
+ if (count == -1) {
+ stopped = true;
+ return null;
+ }
+ while (count != header.length) {
+ count += celebornInputStream.read(header, count, 4 - count);
+ }
+
+ // get data
+ int blockLen = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
+ inputShuffleSize += blockLen;
+ byte[] shuffleData = new byte[blockLen];
+ count = celebornInputStream.read(shuffleData);
+ while (count != shuffleData.length) {
+ count += celebornInputStream.read(shuffleData, count, blockLen - count);
+ if (count == -1) {
+ // read shuffle is done.
+ stopped = true;
+ throw new CelebornIOException("Read mr shuffle failed.");
+ }
+ }
+ return shuffleData;
+ }
+
+ private void fetchToLocalAndMerge() throws IOException {
+ if (!hasPendingData) {
+ shuffleData = getShuffleBlock();
+ }
+
+ if (shuffleData != null) {
+ // start to merge
+ if (wrapMapOutput(shuffleData)) {
+ hasPendingData = false;
+ } else {
+ return;
+ }
+
+ updateStatus();
+ reporter.progress();
+ } else {
+ celebornInputStream.close();
+ metrics.inputBytes(inputShuffleSize);
+ logger.info("reduce task {} read {} bytes", reduceId, inputShuffleSize);
+ stopped = true;
+ }
+ }
+
+ private boolean wrapMapOutput(byte[] shuffleData) throws IOException {
+ // treat push data as mapoutput
+ TaskAttemptID mapId =
+ new TaskAttemptID(new TaskID(reduceId.getJobID(), TaskType.MAP,
uniqueMapId++), 0);
+ MapOutput<K, V> mapOutput = null;
+ try {
+ mapOutput = merger.reserve(mapId, shuffleData.length, 0);
+ } catch (IOException ioe) {
+ ioErrs.increment(1);
+ throw ioe;
+ }
+ if (mapOutput == null) {
+ logger.info(
+ "Celeborn fetcher returned status wait because reserve buffer for
shuffle get null");
+ hasPendingData = true;
+ return false;
+ }
+
+ // write data to mapOutput
+ try {
+ writeShuffle(mapOutput, shuffleData);
+ // let the merger knows this block is ready for merging
+ mapOutput.commit();
+ } catch (Throwable t) {
+ ioErrs.increment(1);
+ mapOutput.abort();
+ throw new CelebornIOException(
+ "Reduce: {} "
+ + reduceId
+ + " fetch failed to {} "
+ + mapOutput.getClass().getSimpleName()
+ + " due to: {} "
+ + t.getClass().getName());
+ }
+ return true;
+ }
+
+ private Decompressor getDecompressor(InMemoryMapOutput inMemoryMapOutput)
+ throws CelebornIOException {
+ try {
+ Class clazz = Class.forName(InMemoryMapOutput.class.getName());
+ Field deCompressorField = clazz.getDeclaredField("decompressor");
+ deCompressorField.setAccessible(true);
+ return (Decompressor) deCompressorField.get(inMemoryMapOutput);
+ } catch (Exception e) {
+ throw new CelebornIOException("Get Decompressor fail " + e.getMessage());
+ }
+ }
+
+ private void writeShuffle(MapOutput mapOutput, byte[] shuffle) throws
CelebornIOException {
+ if (mapOutput instanceof InMemoryMapOutput) {
+ InMemoryMapOutput inMemoryMapOutput = (InMemoryMapOutput) mapOutput;
+ CodecPool.returnDecompressor(getDecompressor(inMemoryMapOutput));
+ byte[] memory = inMemoryMapOutput.getMemory();
+ System.arraycopy(shuffle, 0, memory, 0, shuffle.length);
+ } else if (mapOutput instanceof OnDiskMapOutput) {
+ throw new IllegalStateException(
+ "Celeborn map reduce client do not support OnDiskMapOutput. Try to
increase mapreduce.reduce.shuffle.memory.limit.percent");
+ } else {
+ throw new IllegalStateException(
+ "Merger reserve unknown type of MapOutput: " +
mapOutput.getClass().getCanonicalName());
+ }
+ }
+
+ private void updateStatus() {
+ progress.set(
+ (float) celebornInputStream.partitionsRead() /
celebornInputStream.totalPartitionsToRead());
+ String statusString =
+ celebornInputStream.partitionsRead()
+ + " / "
+ + celebornInputStream.totalPartitionsToRead()
+ + " copied.";
+ status.setStateString(statusString);
+
+ progress.setStatus(
+ "copy("
+ + celebornInputStream.partitionsRead()
+ + " of "
+ + celebornInputStream.totalPartitionsToRead());
+ }
+}
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8645cc4c1..705792597 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -94,8 +94,22 @@ public abstract class CelebornInputStream extends
InputStream {
@Override
public void setCallback(MetricsCallback callback) {}
+
+ @Override
+ public int totalPartitionsToRead() {
+ return 0;
+ }
+
+ @Override
+ public int partitionsRead() {
+ return 0;
+ }
};
+ public abstract int totalPartitionsToRead();
+
+ public abstract int partitionsRead();
+
private static final class CelebornInputStreamImpl extends
CelebornInputStream {
private static final Random RAND = new Random();
@@ -592,5 +606,15 @@ public abstract class CelebornInputStream extends
InputStream {
}
return hasData;
}
+
+ @Override
+ public int totalPartitionsToRead() {
+ return locations.length;
+ }
+
+ @Override
+ public int partitionsRead() {
+ return fileIndex;
+ }
}
}
diff --git a/common/pom.xml b/common/pom.xml
index c9b0c5293..b88552ac6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -106,19 +106,22 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
+ <version>${hadoop.version}</version>
</dependency>
+
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
-
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
@@ -152,4 +155,5 @@
</extension>
</extensions>
</build>
+
</project>
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8134bbe38..1534a6921 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -708,6 +708,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientExcludedWorkerExpireTimeout: Long =
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
+ def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
// //////////////////////////////////////////////////////
// Shuffle Compression //
@@ -3804,6 +3805,15 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] =
+ buildConf("celeborn.client.mr.pushData.max")
+ .categories("client")
+ .version("0.4.0")
+ .doc("Max size for a push data sent from mr client.")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(nVal => nVal < 2147483548, "Max size for a push data should
be less than 2GB-20.")
+ .createWithDefaultString("32m")
+
val ACTIVE_STORAGE_TYPES: ConfigEntry[String] =
buildConf("celeborn.storage.activeTypes")
.categories("master", "worker")
diff --git a/dev/reformat b/dev/reformat
index e7ff64ab3..11b6e6775 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -24,3 +24,4 @@ ${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.15
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.17
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
+${PROJECT_DIR}/build/mvn spotless:apply -Pmr
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f3d8dca02..793794a4d 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -39,6 +39,7 @@ license: |
| celeborn.client.flink.resultPartition.memory | 64m | Memory reserved for a
result partition. | 0.3.0 |
| celeborn.client.flink.resultPartition.minMemory | 8m | Min memory reserved
for a result partition. | 0.3.0 |
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | Whether
to support floating buffer for result partitions. | 0.3.0 |
+| celeborn.client.mr.pushData.max | 32m | Max size for a push data sent from
mr client. | 0.4.0 |
| celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 |
| celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition
buffer memory for shuffle hash writer. The pushed data will be buffered in
memory before sending to Celeborn worker. For performance consideration keep
this buffer size higher than 32K. Example: If reducer amount is 2000, buffer
size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap
memory. | 0.3.0 |
| celeborn.client.push.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side push exclude workers on failures. | 0.3.0 |
diff --git a/master/pom.xml b/master/pom.xml
index 0f9abb0a3..8facb5585 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -82,16 +82,22 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- </dependency>
</dependencies>
<build>
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index bc7b54b33..8ca6d3100 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
@@ -29,8 +30,6 @@ import scala.Tuple2;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.TableMapping;
-import org.apache.hadoop.shaded.com.google.common.base.Charsets;
-import org.apache.hadoop.shaded.com.google.common.io.Files;
import org.junit.Assert;
import org.junit.Test;
@@ -47,10 +46,12 @@ public class SlotsAllocatorRackAwareSuiteJ {
conf.set(CelebornConf.CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED().key(),
"true");
File mapFile = File.createTempFile("testResolve1", ".txt");
- Files.asCharSink(mapFile, Charsets.UTF_8)
- .write(
- "host1 /default/rack1\nhost2 /default/rack1\nhost3
/default/rack1\n"
- + "host4 /default/rack2\nhost5 /default/rack2\nhost6
/default/rack2\n");
+ FileWriter mapFileWriter = new FileWriter(mapFile);
+ mapFileWriter.write(
+ "host1 /default/rack1\nhost2 /default/rack1\nhost3 /default/rack1\n"
+ + "host4 /default/rack2\nhost5 /default/rack2\nhost6
/default/rack2\n");
+ mapFileWriter.flush();
+ mapFileWriter.close();
mapFile.deleteOnExit();
conf.set(
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
index 5d1a94ebe..7c05dfa7d 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
@@ -17,12 +17,12 @@
package org.apache.celeborn.service.deploy.master.network
-import java.io.File
+import java.io.{File, FileWriter}
+import com.google.common.io.Files
+import org.apache.commons.io.Charsets
import
org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
import org.apache.hadoop.net.{Node, TableMapping}
-import org.apache.hadoop.shaded.com.google.common.base.Charsets
-import org.apache.hadoop.shaded.com.google.common.io.Files
import org.junit.Assert.assertEquals
import org.scalatest.funsuite.AnyFunSuite
@@ -34,8 +34,10 @@ class CelebornRackResolverSuite extends AnyFunSuite {
val hostName1 = "1.2.3.4"
val hostName2 = "5.6.7.8"
val mapFile: File = File.createTempFile(getClass.getSimpleName +
".testResolve1", ".txt")
- Files.asCharSink(mapFile, Charsets.UTF_8).write(
- hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n")
+ val mapFileWriter = new FileWriter(mapFile)
+ mapFileWriter.write(hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n")
+ mapFileWriter.flush()
+ mapFileWriter.close()
mapFile.deleteOnExit()
val conf = new CelebornConf
diff --git a/pom.xml b/pom.xml
index 3af1d24d1..2d3daa6ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,9 +64,11 @@
<maven.version>3.8.8</maven.version>
<flink.version>1.14.6</flink.version>
- <hadoop.version>3.2.4</hadoop.version>
<spark.version>3.3.2</spark.version>
+ <!-- use hadoop-3 as default -->
+ <hadoop.version>3.2.4</hadoop.version>
+
<codahale.metrics.version>3.2.6</codahale.metrics.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<commons-io.version>2.13.0</commons-io.version>
@@ -367,6 +369,16 @@
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>${roaringbitmap.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>${snakeyaml.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
@@ -378,15 +390,21 @@
<version>${hadoop.version}</version>
</dependency>
<dependency>
- <groupId>org.roaringbitmap</groupId>
- <artifactId>RoaringBitmap</artifactId>
- <version>${roaringbitmap.version}</version>
- </dependency>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>${snakeyaml.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -1104,6 +1122,14 @@
</properties>
</profile>
+ <profile>
+ <id>mr</id>
+ <modules>
+ <module>client-mr/mr</module>
+ <module>client-mr/mr-shaded</module>
+ </modules>
+ </profile>
+
<profile>
<id>google-mirror</id>
<properties>
@@ -1126,7 +1152,6 @@
<execution>
<id>source-release-assembly</id>
<phase>none</phase>
-
</execution>
</executions>
</plugin>