This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-tez-support
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-tez-support by this
push:
new ce0fee615 [CELEBORN-1545] Add Tez plugin skeleton and dag app master
(#2667)
ce0fee615 is described below
commit ce0fee6156baaea8520dfd719a8e178ccafe8478
Author: Ethan Feng <[email protected]>
AuthorDate: Tue Aug 13 16:43:01 2024 +0800
[CELEBORN-1545] Add Tez plugin skeleton and dag app master (#2667)
[CELEBORN-1545] Add Tez plugin skeleton and dag app master
---
.github/workflows/style.yml | 1 +
build/make-distribution.sh | 31 +-
client-tez/tez-shaded/pom.xml | 148 +++++++++
client-tez/tez/pom.xml | 193 +++++++++++
.../apache/celeborn/tez/plugin/util/TezUtils.java | 67 ++++
.../apache/tez/dag/app/CelebornDagAppMaster.java | 358 +++++++++++++++++++++
dev/reformat | 1 +
pom.xml | 154 +++++++++
project/CelebornBuild.scala | 204 +++++++++++-
tests/tez-it/pom.xml | 177 ++++++++++
10 files changed, 1326 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index 717e813ea..7e82a84e8 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -56,3 +56,4 @@ jobs:
build/mvn spotless:check -Pgoogle-mirror,spark-2.4
build/mvn spotless:check -Pgoogle-mirror,spark-3.3
build/mvn spotless:check -Pgoogle-mirror,mr
+ build/mvn spotless:check -Pgoogle-mirror,tez
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 1f093bd12..fafc945da 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -63,11 +63,6 @@ while (( "$#" )); do
echo "Error: $1 is not supported"
exit_with_usage
;;
- -P*)
- if [[ "$1" == *"hadoop-aws"* ]]; then
- HADOOP_AWS_ENABLED="true"
- fi
- ;;
-*)
break
;;
@@ -247,6 +242,23 @@ function build_mr_client {
cp
"$PROJECT_DIR"/client-mr/mr-shaded/target/celeborn-client-mr-shaded_${SCALA_VERSION}-$VERSION.jar
"$DIST_DIR/mr/"
}
+function build_tez_client {
+ VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null \
+ | grep -v "INFO" \
+ | grep -v "WARNING" \
+ | tail -n 1)
+ BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl
:celeborn-client-tez-shaded_${SCALA_VERSION} -am $@)
+
+ # Actually build the jar
+ echo -e "\nBuilding with..."
+ echo -e "\$ ${BUILD_COMMAND[@]}\n"
+
+ "${BUILD_COMMAND[@]}"
+
+ ## flink spark client jars
+ mkdir -p "$DIST_DIR/tez"
+ cp
"$PROJECT_DIR"/client-tez/tez-shaded/target/celeborn-client-tez-shaded_${SCALA_VERSION}-$VERSION.jar
"$DIST_DIR/tez/"
+}
#########################
# sbt functions #
@@ -322,10 +334,11 @@ if [ "$SBT_ENABLED" == "true" ]; then
sbt_build_client -Pflink-1.18
sbt_build_client -Pflink-1.19
sbt_build_client -Pmr
+ sbt_build_client -Ptez
else
echo "build client with $@"
ENGINE_COUNT=0
- ENGINES=("spark" "flink" "mr")
+ ENGINES=("spark" "flink" "mr" "tez")
for single_engine in ${ENGINES[@]}
do
echo $single_engine
@@ -356,12 +369,13 @@ else
build_flink_client -Pflink-1.18
build_flink_client -Pflink-1.19
build_mr_client -Pmr
+ build_tez_client -Ptez
else
## build release package on demand
build_service $@
echo "build client with $@"
ENGINE_COUNT=0
- ENGINES=("spark" "flink" "mr")
+ ENGINES=("spark" "flink" "mr" "tez")
for single_engine in ${ENGINES[@]}
do
echo $single_engine
@@ -384,6 +398,9 @@ else
elif [[ $@ == *"mr"* ]]; then
echo "build mr clients"
build_mr_client $@
+ elif [[ $@ == *"tez"* ]]; then
+ echo "build tez clients"
+ build_tez_client $@
fi
fi
fi
diff --git a/client-tez/tez-shaded/pom.xml b/client-tez/tez-shaded/pom.xml
new file mode 100644
index 000000000..db96af902
--- /dev/null
+++ b/client-tez/tez-shaded/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-tez-shaded_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Shaded Client for Tez</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client-tez_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.scala-lang</pattern>
+ <shadedPattern>${shading.prefix}.org.scala-lang</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.lz4</pattern>
+ <shadedPattern>${shading.prefix}.org.lz4</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>org.apache.celeborn:*</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.guava:failureaccess</include>
+ <include>io.netty:*</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>org.scala-lang:scala-library</include>
+ <include>org.lz4:lz4-java</include>
+ <include>com.github.luben:zstd-jni</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>**/*.proto</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/log4j.properties</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <exclude>META-INF/NOTICE.txt</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+ </transformers>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.plugin.antrun.version}</version>
+ <executions>
+ <execution>
+ <id>rename-native-library</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <echo message="unpacking netty jar"></echo>
+ <unzip dest="${project.build.directory}/unpacked/"
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+ <echo message="renaming native epoll library"></echo>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_x86_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so"
type="glob"></mapper>
+ </move>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_aarch_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so"
type="glob"></mapper>
+ </move>
+ <echo message="deleting native kqueue library"></echo>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+ <echo message="repackaging netty jar"></echo>
+ <jar basedir="${project.build.directory}/unpacked"
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/client-tez/tez/pom.xml b/client-tez/tez/pom.xml
new file mode 100644
index 000000000..a29306a83
--- /dev/null
+++ b/client-tez/tez/pom.xml
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-tez_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for Tez</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ <version>1.7.36</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
new file mode 100644
index 000000000..65aecdf11
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tez.plugin.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
+
+public class TezUtils {
+ public static final String TEZ_PREFIX = "tez.";
+ public static final String TEZ_CELEBORN_LM_HOST =
"celeborn.lifecycleManager.host";
+ public static final String TEZ_CELEBORN_LM_PORT =
"celeborn.lifecycleManager.port";
+ public static final String TEZ_CELEBORN_APPLICATION_ID =
"celeborn.applicationId";
+ public static final String TEZ_SHUFFLE_ID = "celeborn.tez.shuffle.id";
+
+ public static final CelebornConf fromTezConfiguration(Configuration
tezConfig) {
+ CelebornConf tmpCelebornConf = new CelebornConf();
+ for (Map.Entry<String, String> property : tezConfig) {
+ String proName = property.getKey();
+ String proValue = property.getValue();
+ if (proName.startsWith(TEZ_PREFIX + "celeborn")) {
+ tmpCelebornConf.set(proName.substring(TEZ_PREFIX.length()), proValue);
+ }
+ }
+ return tmpCelebornConf;
+ }
+
+ public static Object getPrivateField(Object object, String name) {
+ try {
+ Field f = object.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(object);
+ } catch (Exception e) {
+ throw new CelebornRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static String ensureGetSysEnv(String envName) throws IOException {
+ String value = System.getenv(envName);
+ if (value == null) {
+ String msg = envName + " is null";
+ throw new CelebornIOException(msg);
+ }
+ return value;
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
new file mode 100644
index 000000000..7f570db2b
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.apache.celeborn.tez.plugin.util.TezUtils.getPrivateField;
+import static
org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.*;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.tez.plugin.util.TezUtils;
+
+public class CelebornDagAppMaster extends DAGAppMaster {
+ private static final Logger Logger =
LoggerFactory.getLogger(CelebornDagAppMaster.class);
+ private static final String MASTER_ENDPOINTS_ENV =
"CELEBORN_MASTER_ENDPOINTS";
+
+ private CelebornConf celebornConf;
+ private LifecycleManager lifecycleManager;
+ private ApplicationAttemptId appAttemptId;
+ private String lifecycleManagerHost;
+ private int lifecycleManagerPort;
+ private AtomicInteger shuffleIdGenerator = new AtomicInteger(0);
+
+ public CelebornDagAppMaster(
+ ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId,
+ String nmHost,
+ int nmPort,
+ int nmHttpPort,
+ Clock clock,
+ long appSubmitTime,
+ boolean isSession,
+ String workingDirectory,
+ String[] localDirs,
+ String[] logDirs,
+ String clientVersion,
+ Credentials credentials,
+ String jobUserName,
+ DAGProtos.AMPluginDescriptorProto pluginDescriptorProto) {
+ super(
+ applicationAttemptId,
+ containerId,
+ nmHost,
+ nmPort,
+ nmHttpPort,
+ clock,
+ appSubmitTime,
+ isSession,
+ workingDirectory,
+ localDirs,
+ logDirs,
+ clientVersion,
+ credentials,
+ jobUserName,
+ pluginDescriptorProto);
+ appAttemptId = applicationAttemptId;
+ }
+
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ celebornConf = TezUtils.fromTezConfiguration(conf);
+ lifecycleManager = new LifecycleManager(appAttemptId.toString(),
celebornConf);
+ lifecycleManagerHost = lifecycleManager.getHost();
+ lifecycleManagerPort = lifecycleManager.getPort();
+ Logger.info("Init Celeborn lifecycle manager");
+ }
+
+ private static void validateInputParam(String value, String param) throws
IOException {
+ if (value == null) {
+ String msg = param + " is null";
+ Logger.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ @Override
+ protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
+ DAG dag = super.createDAG(dagPB);
+
+ List<Integer> currentDagShuffleIds = new ArrayList<>();
+
+ StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag,
"stateMachine");
+ stateMachine.registerStateEnteredCallback(
+ DAGState.INITED,
+ (OnStateChangedCallback<DAGState, DAGImpl>)
+ (tmpDag, dagState) -> {
+ try {
+ Map<String, Edge> edges = (Map<String, Edge>)
getPrivateField(tmpDag, "edges");
+ for (Map.Entry<String, Edge> entry : edges.entrySet()) {
+ Edge edge = entry.getValue();
+
+ Configuration edgeSourceConf =
+ org.apache.tez.common.TezUtils.createConfFromUserPayload(
+
edge.getEdgeProperty().getEdgeSource().getUserPayload());
+ int shuffleId = shuffleIdGenerator.getAndIncrement();
+ currentDagShuffleIds.add(shuffleId);
+ edgeSourceConf.setInt(TezUtils.TEZ_SHUFFLE_ID, shuffleId);
+ edgeSourceConf.set(TezUtils.TEZ_CELEBORN_APPLICATION_ID,
appAttemptId.toString());
+ edgeSourceConf.set(TezUtils.TEZ_CELEBORN_LM_HOST,
lifecycleManagerHost);
+ edgeSourceConf.setInt(TezUtils.TEZ_CELEBORN_LM_PORT,
lifecycleManagerPort);
+ for (Tuple2<String, String> stringStringTuple2 :
celebornConf.getAll()) {
+ edgeSourceConf.set(stringStringTuple2._1,
stringStringTuple2._2);
+ }
+
+ edge.getEdgeProperty()
+ .getEdgeSource()
+ .setUserPayload(
+
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+ edge.getEdgeProperty()
+ .getEdgeDestination()
+ .setUserPayload(
+
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+
+ EdgeProperty.DataMovementType dataMovementType =
+ edge.getEdgeProperty().getDataMovementType();
+
+ // rename output class name
+ OutputDescriptor outputDescriptor =
edge.getEdgeProperty().getEdgeSource();
+ Field outputClassNameField =
+
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ outputClassNameField.setAccessible(true);
+ String outputClassName = (String)
outputClassNameField.get(outputDescriptor);
+ outputClassNameField.set(
+ outputDescriptor,
getNewOutputClassName(dataMovementType, outputClassName));
+
+ // rename input class name
+ InputDescriptor inputDescriptor =
edge.getEdgeProperty().getEdgeDestination();
+ Field inputClassNameField =
+
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ inputClassNameField.setAccessible(true);
+ String inputClassName = (String)
outputClassNameField.get(inputDescriptor);
+ outputClassNameField.set(
+ inputDescriptor, getNewInputClassName(dataMovementType,
inputClassName));
+ }
+ } catch (IOException | IllegalAccessException |
NoSuchFieldException e) {
+ Logger.error("Reconfigure failed after dag was inited, caused
by {}", e);
+ throw new TezUncheckedException(e);
+ }
+ });
+
+ // process dag final status
+ List<DAGState> finalStates =
+ Arrays.asList(DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
DAGState.ERROR);
+ Map callbackMap = (Map) getPrivateField(stateMachine, "callbackMap");
+ finalStates.forEach(
+ finalState ->
+ callbackMap.put(
+ finalState,
+ (OnStateChangedCallback<DAGState, DAGImpl>)
+ (tmpDag, dagState) -> {
+ // clean all shuffle for this Dag
+ for (Integer shuffleId : currentDagShuffleIds) {
+ lifecycleManager.unregisterShuffle(shuffleId);
+ }
+ }));
+ return dag;
+ }
+
+ private static String getNewOutputClassName(
+ EdgeProperty.DataMovementType movementType, String oldClassName) {
+ // TODO: Implement real logic here
+ return "";
+ }
+
+ private static String getNewInputClassName(
+ EdgeProperty.DataMovementType movementType, String oldClassName) {
+ // TODO: Implement real logic here
+ return "";
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ lifecycleManager.stop();
+ super.serviceStop();
+ }
+
+ public static void main(String[] args) {
+ try {
+ Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
+ final String pid = System.getenv().get("JVM_PID");
+ String containerIdStr =
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ String nodeHostString =
System.getenv(ApplicationConstants.Environment.NM_HOST.name());
+ String nodePortString =
System.getenv(ApplicationConstants.Environment.NM_PORT.name());
+ String nodeHttpPortString =
+ System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+ String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+ String clientVersion =
System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
+ if (clientVersion == null) {
+ clientVersion = VersionInfo.UNKNOWN;
+ }
+
+ validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
+
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+
+ String jobUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
+
+ // Command line options
+ Options opts = new Options();
+ opts.addOption(
+ TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
+ false,
+ "Run Tez Application Master in Session mode");
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ boolean sessionModeCliOption =
cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+ Logger.info(
+ "Creating CelebornDAGAppMaster for "
+ + "applicationId={}"
+ + ", attemptNum={}"
+ + ", AMContainerId={}"
+ + ", jvmPid={}"
+ + ", userFromEnv={}"
+ + ", cliSessionOption={}"
+ + ", pwd={}"
+ + ", localDirs={}"
+ + ", logDirs={}",
+ applicationAttemptId.getApplicationId(),
+ applicationAttemptId.getAttemptId(),
+ containerId,
+ pid,
+ jobUserName,
+ sessionModeCliOption,
+ System.getenv(ApplicationConstants.Environment.PWD.name()),
+ System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()),
+ System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
+
+ // disable tez slow start
+ Configuration conf = new Configuration(new YarnConfiguration());
+
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
1.0f);
+
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
1.0f);
+ // disable reschedule task on unhealthy nodes because shuffle data are
stored in Celeborn
+ conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
+
+ // support set celeborn master endpoints from env
+ String masterEndpointsKey = TezUtils.TEZ_PREFIX +
CelebornConf.MASTER_ENDPOINTS().key();
+ String masterEndpointsVal = conf.get(masterEndpointsKey);
+ if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
+ Logger.info(
+ "MRAppMaster sets {} via environment variable {}.",
+ masterEndpointsKey,
+ MASTER_ENDPOINTS_ENV);
+ conf.set(masterEndpointsKey,
TezUtils.ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
+ }
+
+ DAGProtos.ConfigurationProto confProto =
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(
+ System.getenv(ApplicationConstants.Environment.PWD.name()));
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(conf,
confProto.getConfKeyValuesList());
+
+ DAGProtos.AMPluginDescriptorProto amPluginDescriptorProto = null;
+ if (confProto.hasAmPluginDescriptor()) {
+ amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+ }
+
+ UserGroupInformation.setConfiguration(conf);
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+
+ TezUtilsInternal.setSecurityUtilConfigration(Logger, conf);
+
+ CelebornDagAppMaster appMaster =
+ new CelebornDagAppMaster(
+ applicationAttemptId,
+ containerId,
+ nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString),
+ new SystemClock(),
+ appSubmitTime,
+ sessionModeCliOption,
+ System.getenv(ApplicationConstants.Environment.PWD.name()),
+ TezCommonUtils.getTrimmedStrings(
+
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
+ TezCommonUtils.getTrimmedStrings(
+
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
+ clientVersion,
+ credentials,
+ jobUserName,
+ amPluginDescriptorProto);
+ ShutdownHookManager.get()
+ .addShutdownHook(new DAGAppMasterShutdownHook(appMaster),
SHUTDOWN_HOOK_PRIORITY);
+
+ // log the system properties
+ if (Logger.isInfoEnabled()) {
+ String systemPropsToLog =
TezCommonUtils.getSystemPropertiesToLog(conf);
+ if (systemPropsToLog != null) {
+ Logger.info(systemPropsToLog);
+ }
+ }
+
+ initAndStartAppMaster(appMaster, conf);
+
+ } catch (Throwable t) {
+ Logger.error("Error starting DAGAppMaster", t);
+ System.exit(1);
+ }
+ }
+}
diff --git a/dev/reformat b/dev/reformat
index e848c189d..87f9bdddd 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -32,4 +32,5 @@ else
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
${PROJECT_DIR}/build/mvn spotless:apply -Pmr
+ ${PROJECT_DIR}/build/mvn spotless:apply -Ptez
fi
diff --git a/pom.xml b/pom.xml
index b4f822920..cd1f59acf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1744,6 +1744,160 @@
</modules>
</profile>
+ <profile>
+ <id>tez</id>
+ <modules>
+ <module>client-tez/tez</module>
+ <module>client-tez/tez-shaded</module>
+ <module>tests/mr-it</module>
+ </modules>
+ <properties>
+ <tez.version>0.9.1</tez.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
<profile>
<id>google-mirror</id>
<properties>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 0cd078e67..9645c9347 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -88,6 +88,9 @@ object Dependencies {
val protocVersion = "3.21.7"
val protoVersion = "3.21.7"
+ // Tez
+ val tezVersion = "0.9.1"
+
val apLoader = "me.bechberger" % "ap-loader-all" % apLoaderVersion
val commonsCompress = "org.apache.commons" % "commons-compress" %
commonsCompressVersion
val commonsCrypto = "org.apache.commons" % "commons-crypto" %
commonsCryptoVersion excludeAll(
@@ -203,6 +206,47 @@ object Dependencies {
// SSL support
val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" %
bouncycastleVersion % "test"
val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" %
bouncycastleVersion % "test"
+
+ // Tez support
+ val tezCommon = "org.apache.tez" % "tez-common" % tezVersion excludeAll(
+ ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-common")
+ )
+ val tezRuntimeLibrary = "org.apache.tez" % "tez-runtime-library" %
tezVersion excludeAll(
+ ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-common")
+ )
+ val tezRuntimeInternals = "org.apache.tez" % "tez-runtime-internals" %
tezVersion excludeAll(
+ ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-client"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-common"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
+ )
+ val tezDag = "org.apache.tez" % "tez-dag" % tezVersion excludeAll(
+ ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-client"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-common"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
+ )
+ val tezApi = "org.apache.tez" % "tez-api" % tezVersion excludeAll(
+ ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+ ExclusionRule("org.apache.hadoop", "hadoop-auth"),
+ ExclusionRule("org.apache.hadoop", "hadoop-hdfs"),
+ ExclusionRule("org.apache.hadoop", "hadoop-yarn-client")
+ )
+ val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % hadoopVersion
excludeAll(
+ ExclusionRule("com.sun.jersey", "jersey-json"),
+ ExclusionRule("org.apache.httpcomponents", "httpclient"),
+ ExclusionRule("org.slf4j", "slf4j-log4j12")
+ )
}
object CelebornCommonSettings {
@@ -355,7 +399,7 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornClient.client,
CelebornService.service,
CelebornWorker.worker,
- CelebornMaster.master) ++ maybeSparkClientModules ++
maybeFlinkClientModules ++ maybeMRClientModules
+ CelebornMaster.master) ++ maybeSparkClientModules ++
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeTezClientModules
}
// ThisBuild / parallelExecution := false
@@ -421,6 +465,15 @@ object Utils {
lazy val maybeMRClientModules: Seq[Project] =
mrClientProjects.map(_.modules).getOrElse(Seq.empty)
+ val TEZ_VERSION = profiles.filter(_.startsWith("tez")).headOption
+
+ lazy val tezClientProjects = TEZ_VERSION match {
+ case Some("tez") => Some(TezClientProjects)
+ case _ => None
+ }
+
+ lazy val maybeTezClientModules: Seq[Project] =
tezClientProjects.map(_.modules).getOrElse(Seq.empty)
+
def defaultScalaVersion(): String = {
// 1. Inherit the scala version of the spark project
// 2. if the spark profile not specified, using the DEFAULT_SCALA_VERSION
@@ -1286,6 +1339,155 @@ object MRClientProjects {
)
}
+////////////////////////////////////////////////////////
+// Tez Client //
+////////////////////////////////////////////////////////
+object TezClientProjects {
+
+ def tezClient: Project = {
+ Project("celeborn-client-tez", file("client-tez/tez"))
+ .dependsOn(CelebornCommon.common, CelebornClient.client)
+ .settings(
+ commonSettings,
+ libraryDependencies ++= Seq(
+ Dependencies.tezCommon,
+ Dependencies.tezRuntimeLibrary,
+ Dependencies.tezRuntimeInternals,
+ Dependencies.tezDag,
+ Dependencies.tezApi,
+ Dependencies.hadoopCommon
+ ) ++ commonUnitTestDependencies,
+ dependencyOverrides += Dependencies.commonsCompress
+ )
+ }
+
+ def tezIt: Project = {
+ Project("celeborn-tez-it", file("tests/tez-it"))
+ // ref:
https://www.scala-sbt.org/1.x/docs/Multi-Project.html#Classpath+dependencies
+ .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+ .dependsOn(CelebornClient.client % "test->test;compile->compile")
+ .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+ .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+ .dependsOn(tezClient % "test->test;compile->compile")
+ .settings(
+ commonSettings,
+ copyDepsSettings,
+ libraryDependencies ++= Seq(
+ ) ++ commonUnitTestDependencies
+ )
+ }
+
+ def tezClientShade: Project = {
+ Project("celeborn-client-tez-shaded", file("client-tez/tez-shaded"))
+ .dependsOn(tezClient)
+ .disablePlugins(AddMetaInfLicenseFiles)
+ .settings(
+ commonSettings,
+ releaseSettings,
+
+ // align final shaded jar name with maven.
+ (assembly / assemblyJarName) := {
+ val extension = artifact.value.extension
+
s"${moduleName.value}_${scalaBinaryVersion.value}-${version.value}.$extension"
+ },
+
+ (assembly / test) := {},
+
+ (assembly / logLevel) := Level.Info,
+
+ // include `scala-library` from assembly.
+ (assembly / assemblyPackageScala / assembleArtifact) := true,
+
+ (assembly / assemblyExcludedJars) := {
+ val cp = (assembly / fullClasspath).value
+ cp filter { v =>
+ val name = v.data.getName
+ !(name.startsWith("celeborn-") ||
+ name.startsWith("protobuf-java-") ||
+ name.startsWith("guava-") ||
+ name.startsWith("failureaccess-") ||
+ name.startsWith("netty-") ||
+ name.startsWith("commons-lang3-") ||
+ name.startsWith("RoaringBitmap-") ||
+ name.startsWith("lz4-java-") ||
+ name.startsWith("zstd-jni-") ||
+ name.startsWith("scala-library-"))
+ }
+ },
+
+ (assembly / assemblyShadeRules) := Seq(
+ ShadeRule.rename("com.google.protobuf.**" ->
"org.apache.celeborn.shaded.com.google.protobuf.@1").inAll,
+ ShadeRule.rename("com.google.common.**" ->
"org.apache.celeborn.shaded.com.google.common.@1").inAll,
+ ShadeRule.rename("io.netty.**" ->
"org.apache.celeborn.shaded.io.netty.@1").inAll,
+ ShadeRule.rename("org.apache.commons.**" ->
"org.apache.celeborn.shaded.org.apache.commons.@1").inAll,
+ ShadeRule.rename("org.roaringbitmap.**" ->
"org.apache.celeborn.shaded.org.roaringbitmap.@1").inAll
+ ),
+
+ (assembly / assemblyMergeStrategy) := {
+ case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") =>
MergeStrategy.discard
+ // For netty-3.x.y.Final.jar
+ case m if m.startsWith("META-INF/license/") => MergeStrategy.discard
+ // the LicenseAndNoticeMergeStrategy always picks the license/notice
file from the current project
+ case m@("META-INF/LICENSE" | "META-INF/NOTICE") =>
CustomMergeStrategy("LicenseAndNoticeMergeStrategy") { conflicts =>
+ val entry = conflicts.head
+ val projectLicenseFile = (Compile / resourceDirectory).value /
entry.target
+ val stream = () => new java.io.BufferedInputStream(new
java.io.FileInputStream(projectLicenseFile))
+ Right(Vector(JarEntry(entry.target, stream)))
+ }
+ case PathList(ps@_*) if Assembly.isLicenseFile(ps.last) =>
MergeStrategy.discard
+ // Drop all proto files that are not needed as artifacts of the
build.
+ case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") =>
MergeStrategy.discard
+ case m if
m.toLowerCase(Locale.ROOT).startsWith("meta-inf/native-image") =>
MergeStrategy.discard
+ // Drop netty jnilib
+ case m if m.toLowerCase(Locale.ROOT).endsWith(".jnilib") =>
MergeStrategy.discard
+ // rename netty native lib
+ case "META-INF/native/libnetty_transport_native_epoll_x86_64.so" =>
CustomMergeStrategy.rename(_ =>
"META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so")
+ case "META-INF/native/libnetty_transport_native_epoll_aarch_64.so"
=> CustomMergeStrategy.rename(_ =>
"META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so")
+ case _ => MergeStrategy.first
+ },
+
+ Compile / packageBin := assembly.value,
+ pomPostProcess := removeDependenciesTransformer
+ )
+ }
+
+ def modules: Seq[Project] = {
+ Seq(tezClient, tezIt, tezGroup, tezClientShade)
+ }
+
+ // for test only, don't use this group for any other projects
+ lazy val tezGroup = (project withId
"celeborn-tez-group").aggregate(tezClient, tezIt)
+
+ val copyDeps = TaskKey[Unit]("copyDeps", "Copies needed dependencies to the
build directory.")
+ val destPath = (Compile / crossTarget) {
+ _ / "mapreduce_lib"
+ }
+
+ lazy val copyDepsSettings = Seq(
+ copyDeps := {
+ val dest = destPath.value
+ if (!dest.isDirectory() && !dest.mkdirs()) {
+ throw new java.io.IOException("Failed to create jars directory.")
+ }
+
+ (Compile / dependencyClasspath).value.map(_.data)
+ .filter { jar => jar.isFile() }
+ .foreach { jar =>
+ val destJar = new File(dest, jar.getName())
+ if (destJar.isFile()) {
+ destJar.delete()
+ }
+ Files.copy(jar.toPath(), destJar.toPath())
+ }
+ },
+ (Test / compile) := {
+ copyDeps.value
+ (Test / compile).value
+ }
+ )
+}
+
+
object CelebornOpenApi {
val openApiSpecDir = "openapi/openapi-client/src/main/openapi3"
val openApiMasterInternalOutputDir =
"openapi/openapi-client/target/master/generated-sources/java"
diff --git a/tests/tez-it/pom.xml b/tests/tez-it/pom.xml
new file mode 100644
index 000000000..fc499e780
--- /dev/null
+++ b/tests/tez-it/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-tez-it_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn MapReduce Integration Test</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-master_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-tez-shaded_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-examples</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- The compile scope is to generate mapreduce_lib -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+
<outputDirectory>${project.build.directory}/mapreduce_lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>