This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-tez-support
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-tez-support by this 
push:
     new ce0fee615 [CELEBORN-1545] Add Tez plugin skeleton and dag app master 
(#2667)
ce0fee615 is described below

commit ce0fee6156baaea8520dfd719a8e178ccafe8478
Author: Ethan Feng <[email protected]>
AuthorDate: Tue Aug 13 16:43:01 2024 +0800

    [CELEBORN-1545] Add Tez plugin skeleton and dag app master (#2667)
    
    [CELEBORN-1545] Add Tez plugin skeleton and dag app master
---
 .github/workflows/style.yml                        |   1 +
 build/make-distribution.sh                         |  31 +-
 client-tez/tez-shaded/pom.xml                      | 148 +++++++++
 client-tez/tez/pom.xml                             | 193 +++++++++++
 .../apache/celeborn/tez/plugin/util/TezUtils.java  |  67 ++++
 .../apache/tez/dag/app/CelebornDagAppMaster.java   | 358 +++++++++++++++++++++
 dev/reformat                                       |   1 +
 pom.xml                                            | 154 +++++++++
 project/CelebornBuild.scala                        | 204 +++++++++++-
 tests/tez-it/pom.xml                               | 177 ++++++++++
 10 files changed, 1326 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index 717e813ea..7e82a84e8 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -56,3 +56,4 @@ jobs:
           build/mvn spotless:check -Pgoogle-mirror,spark-2.4
           build/mvn spotless:check -Pgoogle-mirror,spark-3.3
           build/mvn spotless:check -Pgoogle-mirror,mr
+          build/mvn spotless:check -Pgoogle-mirror,tez
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 1f093bd12..fafc945da 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -63,11 +63,6 @@ while (( "$#" )); do
       echo "Error: $1 is not supported"
       exit_with_usage
       ;;
-    -P*)
-      if [[ "$1" == *"hadoop-aws"* ]]; then
-        HADOOP_AWS_ENABLED="true"
-      fi
-      ;;
     -*)
       break
       ;;
@@ -247,6 +242,23 @@ function build_mr_client {
     cp 
"$PROJECT_DIR"/client-mr/mr-shaded/target/celeborn-client-mr-shaded_${SCALA_VERSION}-$VERSION.jar
 "$DIST_DIR/mr/"
 }
 
+function build_tez_client {
+  VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null \
+        | grep -v "INFO" \
+        | grep -v "WARNING" \
+        | tail -n 1)
+  BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl 
:celeborn-client-tez-shaded_${SCALA_VERSION} -am $@)
+
+    # Actually build the jar
+    echo -e "\nBuilding with..."
+    echo -e "\$ ${BUILD_COMMAND[@]}\n"
+
+    "${BUILD_COMMAND[@]}"
+
+    ## flink spark client jars
+    mkdir -p "$DIST_DIR/tez"
+    cp 
"$PROJECT_DIR"/client-tez/tez-shaded/target/celeborn-client-tez-shaded_${SCALA_VERSION}-$VERSION.jar
 "$DIST_DIR/tez/"
+}
 
 #########################
 #     sbt functions     #
@@ -322,10 +334,11 @@ if [ "$SBT_ENABLED" == "true" ]; then
     sbt_build_client -Pflink-1.18
     sbt_build_client -Pflink-1.19
     sbt_build_client -Pmr
+    sbt_build_client -Ptez
   else
     echo "build client with $@"
     ENGINE_COUNT=0
-    ENGINES=("spark" "flink" "mr")
+    ENGINES=("spark" "flink" "mr" "tez")
     for single_engine in ${ENGINES[@]}
     do
       echo $single_engine
@@ -356,12 +369,13 @@ else
     build_flink_client -Pflink-1.18
     build_flink_client -Pflink-1.19
     build_mr_client -Pmr
+    build_tez_client -Ptez
   else
     ## build release package on demand
     build_service $@
     echo "build client with $@"
     ENGINE_COUNT=0
-    ENGINES=("spark" "flink" "mr")
+    ENGINES=("spark" "flink" "mr" "tez")
     for single_engine in ${ENGINES[@]}
     do
       echo $single_engine
@@ -384,6 +398,9 @@ else
     elif [[  $@ == *"mr"* ]]; then
       echo "build mr clients"
       build_mr_client $@
+    elif [[  $@ == *"tez"* ]]; then
+      echo "build tez clients"
+      build_tez_client $@
     fi
   fi
 fi
diff --git a/client-tez/tez-shaded/pom.xml b/client-tez/tez-shaded/pom.xml
new file mode 100644
index 000000000..db96af902
--- /dev/null
+++ b/client-tez/tez-shaded/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-client-tez-shaded_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Shaded Client for Tez</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client-tez_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <relocations>
+            <relocation>
+              <pattern>com.google.protobuf</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>com.google.common</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>io.netty</pattern>
+              <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons</pattern>
+              
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.scala-lang</pattern>
+              <shadedPattern>${shading.prefix}.org.scala-lang</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.lz4</pattern>
+              <shadedPattern>${shading.prefix}.org.lz4</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.roaringbitmap</pattern>
+              
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+            </relocation>
+          </relocations>
+          <artifactSet>
+            <includes>
+              <include>org.apache.celeborn:*</include>
+              <include>com.google.protobuf:protobuf-java</include>
+              <include>com.google.guava:guava</include>
+              <include>com.google.guava:failureaccess</include>
+              <include>io.netty:*</include>
+              <include>org.apache.commons:commons-lang3</include>
+              <include>org.scala-lang:scala-library</include>
+              <include>org.lz4:lz4-java</include>
+              <include>com.github.luben:zstd-jni</include>
+              <include>org.roaringbitmap:RoaringBitmap</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>**/*.proto</exclude>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+                <exclude>**/log4j.properties</exclude>
+                <exclude>META-INF/LICENSE.txt</exclude>
+                <exclude>META-INF/NOTICE.txt</exclude>
+                <exclude>LICENSE.txt</exclude>
+                <exclude>NOTICE.txt</exclude>
+              </excludes>
+            </filter>
+          </filters>
+          <transformers>
+            <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+          </transformers>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>${maven.plugin.antrun.version}</version>
+        <executions>
+          <execution>
+            <id>rename-native-library</id>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <target>
+                <echo message="unpacking netty jar"></echo>
+                <unzip dest="${project.build.directory}/unpacked/" 
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+                <echo message="renaming native epoll library"></echo>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_x86_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so" 
type="glob"></mapper>
+                </move>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_aarch_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so" 
type="glob"></mapper>
+                </move>
+                <echo message="deleting native kqueue library"></echo>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+                <echo message="repackaging netty jar"></echo>
+                <jar basedir="${project.build.directory}/unpacked" 
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/client-tez/tez/pom.xml b/client-tez/tez/pom.xml
new file mode 100644
index 000000000..a29306a83
--- /dev/null
+++ b/client-tez/tez/pom.xml
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-client-tez_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Client for Tez</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-reload4j</artifactId>
+      <version>1.7.36</version>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.reload4j</groupId>
+          <artifactId>reload4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+      <version>${tez.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>${tez.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git 
a/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
 
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
new file mode 100644
index 000000000..65aecdf11
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/TezUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tez.plugin.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
+
+public class TezUtils {
+  public static final String TEZ_PREFIX = "tez.";
+  public static final String TEZ_CELEBORN_LM_HOST = 
"celeborn.lifecycleManager.host";
+  public static final String TEZ_CELEBORN_LM_PORT = 
"celeborn.lifecycleManager.port";
+  public static final String TEZ_CELEBORN_APPLICATION_ID = 
"celeborn.applicationId";
+  public static final String TEZ_SHUFFLE_ID = "celeborn.tez.shuffle.id";
+
+  public static final CelebornConf fromTezConfiguration(Configuration 
tezConfig) {
+    CelebornConf tmpCelebornConf = new CelebornConf();
+    for (Map.Entry<String, String> property : tezConfig) {
+      String proName = property.getKey();
+      String proValue = property.getValue();
+      if (proName.startsWith(TEZ_PREFIX + "celeborn")) {
+        tmpCelebornConf.set(proName.substring(TEZ_PREFIX.length()), proValue);
+      }
+    }
+    return tmpCelebornConf;
+  }
+
+  public static Object getPrivateField(Object object, String name) {
+    try {
+      Field f = object.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.get(object);
+    } catch (Exception e) {
+      throw new CelebornRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  public static String ensureGetSysEnv(String envName) throws IOException {
+    String value = System.getenv(envName);
+    if (value == null) {
+      String msg = envName + " is null";
+      throw new CelebornIOException(msg);
+    }
+    return value;
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java 
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
new file mode 100644
index 000000000..7f570db2b
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.apache.celeborn.tez.plugin.util.TezUtils.getPrivateField;
+import static 
org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.*;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.tez.plugin.util.TezUtils;
+
+public class CelebornDagAppMaster extends DAGAppMaster {
+  private static final Logger Logger = 
LoggerFactory.getLogger(CelebornDagAppMaster.class);
+  private static final String MASTER_ENDPOINTS_ENV = 
"CELEBORN_MASTER_ENDPOINTS";
+
+  private CelebornConf celebornConf;
+  private LifecycleManager lifecycleManager;
+  private ApplicationAttemptId appAttemptId;
+  private String lifecycleManagerHost;
+  private int lifecycleManagerPort;
+  private AtomicInteger shuffleIdGenerator = new AtomicInteger(0);
+
+  public CelebornDagAppMaster(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      Clock clock,
+      long appSubmitTime,
+      boolean isSession,
+      String workingDirectory,
+      String[] localDirs,
+      String[] logDirs,
+      String clientVersion,
+      Credentials credentials,
+      String jobUserName,
+      DAGProtos.AMPluginDescriptorProto pluginDescriptorProto) {
+    super(
+        applicationAttemptId,
+        containerId,
+        nmHost,
+        nmPort,
+        nmHttpPort,
+        clock,
+        appSubmitTime,
+        isSession,
+        workingDirectory,
+        localDirs,
+        logDirs,
+        clientVersion,
+        credentials,
+        jobUserName,
+        pluginDescriptorProto);
+    appAttemptId = applicationAttemptId;
+  }
+
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+
+    celebornConf = TezUtils.fromTezConfiguration(conf);
+    lifecycleManager = new LifecycleManager(appAttemptId.toString(), 
celebornConf);
+    lifecycleManagerHost = lifecycleManager.getHost();
+    lifecycleManagerPort = lifecycleManager.getPort();
+    Logger.info("Init Celeborn lifecycle manager");
+  }
+
+  private static void validateInputParam(String value, String param) throws 
IOException {
+    if (value == null) {
+      String msg = param + " is null";
+      Logger.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  @Override
+  protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
+    DAG dag = super.createDAG(dagPB);
+
+    List<Integer> currentDagShuffleIds = new ArrayList<>();
+
+    StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag, 
"stateMachine");
+    stateMachine.registerStateEnteredCallback(
+        DAGState.INITED,
+        (OnStateChangedCallback<DAGState, DAGImpl>)
+            (tmpDag, dagState) -> {
+              try {
+                Map<String, Edge> edges = (Map<String, Edge>) 
getPrivateField(tmpDag, "edges");
+                for (Map.Entry<String, Edge> entry : edges.entrySet()) {
+                  Edge edge = entry.getValue();
+
+                  Configuration edgeSourceConf =
+                      org.apache.tez.common.TezUtils.createConfFromUserPayload(
+                          
edge.getEdgeProperty().getEdgeSource().getUserPayload());
+                  int shuffleId = shuffleIdGenerator.getAndIncrement();
+                  currentDagShuffleIds.add(shuffleId);
+                  edgeSourceConf.setInt(TezUtils.TEZ_SHUFFLE_ID, shuffleId);
+                  edgeSourceConf.set(TezUtils.TEZ_CELEBORN_APPLICATION_ID, 
appAttemptId.toString());
+                  edgeSourceConf.set(TezUtils.TEZ_CELEBORN_LM_HOST, 
lifecycleManagerHost);
+                  edgeSourceConf.setInt(TezUtils.TEZ_CELEBORN_LM_PORT, 
lifecycleManagerPort);
+                  for (Tuple2<String, String> stringStringTuple2 : 
celebornConf.getAll()) {
+                    edgeSourceConf.set(stringStringTuple2._1, 
stringStringTuple2._2);
+                  }
+
+                  edge.getEdgeProperty()
+                      .getEdgeSource()
+                      .setUserPayload(
+                          
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+                  edge.getEdgeProperty()
+                      .getEdgeDestination()
+                      .setUserPayload(
+                          
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+
+                  EdgeProperty.DataMovementType dataMovementType =
+                      edge.getEdgeProperty().getDataMovementType();
+
+                  // rename output class name
+                  OutputDescriptor outputDescriptor = 
edge.getEdgeProperty().getEdgeSource();
+                  Field outputClassNameField =
+                      
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+                  outputClassNameField.setAccessible(true);
+                  String outputClassName = (String) 
outputClassNameField.get(outputDescriptor);
+                  outputClassNameField.set(
+                      outputDescriptor, 
getNewOutputClassName(dataMovementType, outputClassName));
+
+                  // rename input class name
+                  InputDescriptor inputDescriptor = 
edge.getEdgeProperty().getEdgeDestination();
+                  Field inputClassNameField =
+                      
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+                  inputClassNameField.setAccessible(true);
+                  String inputClassName = (String) 
outputClassNameField.get(inputDescriptor);
+                  outputClassNameField.set(
+                      inputDescriptor, getNewInputClassName(dataMovementType, 
inputClassName));
+                }
+              } catch (IOException | IllegalAccessException | 
NoSuchFieldException e) {
+                Logger.error("Reconfigure failed after dag was inited, caused 
by {}", e);
+                throw new TezUncheckedException(e);
+              }
+            });
+
+    // process dag final status
+    List<DAGState> finalStates =
+        Arrays.asList(DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED, 
DAGState.ERROR);
+    Map callbackMap = (Map) getPrivateField(stateMachine, "callbackMap");
+    finalStates.forEach(
+        finalState ->
+            callbackMap.put(
+                finalState,
+                (OnStateChangedCallback<DAGState, DAGImpl>)
+                    (tmpDag, dagState) -> {
+                      // clean all shuffle for this Dag
+                      for (Integer shuffleId : currentDagShuffleIds) {
+                        lifecycleManager.unregisterShuffle(shuffleId);
+                      }
+                    }));
+    return dag;
+  }
+
+  private static String getNewOutputClassName(
+      EdgeProperty.DataMovementType movementType, String oldClassName) {
+    // TODO: Implement real logic here
+    return "";
+  }
+
+  private static String getNewInputClassName(
+      EdgeProperty.DataMovementType movementType, String oldClassName) {
+    // TODO: Implement real logic here
+    return "";
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    lifecycleManager.stop();
+    super.serviceStop();
+  }
+
+  public static void main(String[] args) {
+    try {
+      Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
+      final String pid = System.getenv().get("JVM_PID");
+      String containerIdStr = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+      String nodeHostString = 
System.getenv(ApplicationConstants.Environment.NM_HOST.name());
+      String nodePortString = 
System.getenv(ApplicationConstants.Environment.NM_PORT.name());
+      String nodeHttpPortString =
+          System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+      String appSubmitTimeStr = 
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      String clientVersion = 
System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
+      if (clientVersion == null) {
+        clientVersion = VersionInfo.UNKNOWN;
+      }
+
+      validateInputParam(appSubmitTimeStr, 
ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+      ApplicationAttemptId applicationAttemptId = 
containerId.getApplicationAttemptId();
+
+      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+
+      String jobUserName = 
System.getenv(ApplicationConstants.Environment.USER.name());
+
+      // Command line options
+      Options opts = new Options();
+      opts.addOption(
+          TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
+          false,
+          "Run Tez Application Master in Session mode");
+
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+      boolean sessionModeCliOption = 
cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+      Logger.info(
+          "Creating CelebornDAGAppMaster for "
+              + "applicationId={}"
+              + ", attemptNum={}"
+              + ", AMContainerId={}"
+              + ", jvmPid={}"
+              + ", userFromEnv={}"
+              + ", cliSessionOption={}"
+              + ", pwd={}"
+              + ", localDirs={}"
+              + ", logDirs={}",
+          applicationAttemptId.getApplicationId(),
+          applicationAttemptId.getAttemptId(),
+          containerId,
+          pid,
+          jobUserName,
+          sessionModeCliOption,
+          System.getenv(ApplicationConstants.Environment.PWD.name()),
+          System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()),
+          System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
+
+      // disable tez slow start
+      Configuration conf = new Configuration(new YarnConfiguration());
+      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 
1.0f);
+      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 
1.0f);
+      // disable reschedule task on unhealthy nodes because shuffle data are 
stored in Celeborn
+      conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
+
+      // support set celeborn master endpoints from env
+      String masterEndpointsKey = TezUtils.TEZ_PREFIX + 
CelebornConf.MASTER_ENDPOINTS().key();
+      String masterEndpointsVal = conf.get(masterEndpointsKey);
+      if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
+        Logger.info(
+            "MRAppMaster sets {} via environment variable {}.",
+            masterEndpointsKey,
+            MASTER_ENDPOINTS_ENV);
+        conf.set(masterEndpointsKey, 
TezUtils.ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
+      }
+
+      DAGProtos.ConfigurationProto confProto =
+          TezUtilsInternal.readUserSpecifiedTezConfiguration(
+              System.getenv(ApplicationConstants.Environment.PWD.name()));
+      TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, 
confProto.getConfKeyValuesList());
+
+      DAGProtos.AMPluginDescriptorProto amPluginDescriptorProto = null;
+      if (confProto.hasAmPluginDescriptor()) {
+        amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+      }
+
+      UserGroupInformation.setConfiguration(conf);
+      Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+
+      TezUtilsInternal.setSecurityUtilConfigration(Logger, conf);
+
+      CelebornDagAppMaster appMaster =
+          new CelebornDagAppMaster(
+              applicationAttemptId,
+              containerId,
+              nodeHostString,
+              Integer.parseInt(nodePortString),
+              Integer.parseInt(nodeHttpPortString),
+              new SystemClock(),
+              appSubmitTime,
+              sessionModeCliOption,
+              System.getenv(ApplicationConstants.Environment.PWD.name()),
+              TezCommonUtils.getTrimmedStrings(
+                  
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
+              TezCommonUtils.getTrimmedStrings(
+                  
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
+              clientVersion,
+              credentials,
+              jobUserName,
+              amPluginDescriptorProto);
+      ShutdownHookManager.get()
+          .addShutdownHook(new DAGAppMasterShutdownHook(appMaster), 
SHUTDOWN_HOOK_PRIORITY);
+
+      // log the system properties
+      if (Logger.isInfoEnabled()) {
+        String systemPropsToLog = 
TezCommonUtils.getSystemPropertiesToLog(conf);
+        if (systemPropsToLog != null) {
+          Logger.info(systemPropsToLog);
+        }
+      }
+
+      initAndStartAppMaster(appMaster, conf);
+
+    } catch (Throwable t) {
+      Logger.error("Error starting DAGAppMaster", t);
+      System.exit(1);
+    }
+  }
+}
diff --git a/dev/reformat b/dev/reformat
index e848c189d..87f9bdddd 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -32,4 +32,5 @@ else
   ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
   ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
   ${PROJECT_DIR}/build/mvn spotless:apply -Pmr
+  ${PROJECT_DIR}/build/mvn spotless:apply -Ptez
 fi
diff --git a/pom.xml b/pom.xml
index b4f822920..cd1f59acf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1744,6 +1744,160 @@
       </modules>
     </profile>
 
+    <profile>
+      <id>tez</id>
+      <modules>
+        <module>client-tez/tez</module>
+        <module>client-tez/tez-shaded</module>
+        <module>tests/mr-it</module>
+      </modules>
+      <properties>
+        <tez.version>0.9.1</tez.version>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-annotations</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-annotations</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-internals</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-annotations</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-annotations</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-client</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-server-common</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-annotations</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-auth</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-hdfs</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-client</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-json</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpclient</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
     <profile>
       <id>google-mirror</id>
       <properties>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 0cd078e67..9645c9347 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -88,6 +88,9 @@ object Dependencies {
   val protocVersion = "3.21.7"
   val protoVersion = "3.21.7"
 
+  // Tez
+  val tezVersion = "0.9.1"
+
   val apLoader = "me.bechberger" % "ap-loader-all" % apLoaderVersion
   val commonsCompress = "org.apache.commons" % "commons-compress" % 
commonsCompressVersion
   val commonsCrypto = "org.apache.commons" % "commons-crypto" % 
commonsCryptoVersion excludeAll(
@@ -203,6 +206,47 @@ object Dependencies {
   // SSL support
   val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % 
bouncycastleVersion % "test"
   val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" % 
bouncycastleVersion % "test"
+
+  // Tez support
+  val tezCommon = "org.apache.tez" % "tez-common" % tezVersion excludeAll(
+    ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-common")
+  )
+  val tezRuntimeLibrary = "org.apache.tez" % "tez-runtime-library" % 
tezVersion excludeAll(
+    ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-common")
+  )
+  val tezRuntimeInternals = "org.apache.tez" % "tez-runtime-internals" % 
tezVersion excludeAll(
+    ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-client"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-common"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
+  )
+  val tezDag = "org.apache.tez" % "tez-dag" % tezVersion excludeAll(
+    ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-client"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-common"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
+  )
+  val tezApi = "org.apache.tez" % "tez-api" % tezVersion excludeAll(
+    ExclusionRule("org.apache.hadoop", "hadoop-annotations"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-api"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-common"),
+    ExclusionRule("org.apache.hadoop", "hadoop-auth"),
+    ExclusionRule("org.apache.hadoop", "hadoop-hdfs"),
+    ExclusionRule("org.apache.hadoop", "hadoop-yarn-client")
+  )
+  val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % hadoopVersion 
excludeAll(
+    ExclusionRule("com.sun.jersey", "jersey-json"),
+    ExclusionRule("org.apache.httpcomponents", "httpclient"),
+    ExclusionRule("org.slf4j", "slf4j-log4j12")
+  )
 }
 
 object CelebornCommonSettings {
@@ -355,7 +399,7 @@ object CelebornBuild extends sbt.internal.BuildDef {
       CelebornClient.client,
       CelebornService.service,
       CelebornWorker.worker,
-      CelebornMaster.master) ++ maybeSparkClientModules ++ 
maybeFlinkClientModules ++ maybeMRClientModules
+      CelebornMaster.master) ++ maybeSparkClientModules ++ 
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeTezClientModules
   }
 
   // ThisBuild / parallelExecution := false
@@ -421,6 +465,15 @@ object Utils {
 
   lazy val maybeMRClientModules: Seq[Project] = 
mrClientProjects.map(_.modules).getOrElse(Seq.empty)
 
+  val TEZ_VERSION = profiles.filter(_.startsWith("tez")).headOption
+
+  lazy val tezClientProjects = TEZ_VERSION match {
+    case Some("tez") => Some(TezClientProjects)
+    case _ => None
+  }
+
+  lazy val maybeTezClientModules: Seq[Project] = 
tezClientProjects.map(_.modules).getOrElse(Seq.empty)
+
   def defaultScalaVersion(): String = {
     // 1. Inherit the scala version of the spark project
     // 2. if the spark profile not specified, using the DEFAULT_SCALA_VERSION
@@ -1286,6 +1339,155 @@ object MRClientProjects {
   )
 }
 
+////////////////////////////////////////////////////////
+//                   Tez Client                        //
+////////////////////////////////////////////////////////
+object TezClientProjects {
+
+  def tezClient: Project = {
+    Project("celeborn-client-tez", file("client-tez/tez"))
+      .dependsOn(CelebornCommon.common, CelebornClient.client)
+      .settings(
+        commonSettings,
+        libraryDependencies ++= Seq(
+          Dependencies.tezCommon,
+          Dependencies.tezRuntimeLibrary,
+          Dependencies.tezRuntimeInternals,
+          Dependencies.tezDag,
+          Dependencies.tezApi,
+          Dependencies.hadoopCommon
+        ) ++ commonUnitTestDependencies,
+        dependencyOverrides += Dependencies.commonsCompress
+      )
+  }
+
+  def tezIt: Project = {
+    Project("celeborn-tez-it", file("tests/tez-it"))
+      // ref: 
https://www.scala-sbt.org/1.x/docs/Multi-Project.html#Classpath+dependencies
+      .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+      .dependsOn(CelebornClient.client % "test->test;compile->compile")
+      .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+      .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+      .dependsOn(tezClient % "test->test;compile->compile")
+      .settings(
+        commonSettings,
+        copyDepsSettings,
+        libraryDependencies ++= Seq(
+        ) ++ commonUnitTestDependencies
+      )
+  }
+
+  def tezClientShade: Project = {
+    Project("celeborn-client-tez-shaded", file("client-tez/tez-shaded"))
+      .dependsOn(tezClient)
+      .disablePlugins(AddMetaInfLicenseFiles)
+      .settings(
+        commonSettings,
+        releaseSettings,
+
+        // align final shaded jar name with maven.
+        (assembly / assemblyJarName) := {
+          val extension = artifact.value.extension
+          
s"${moduleName.value}_${scalaBinaryVersion.value}-${version.value}.$extension"
+        },
+
+        (assembly / test) := {},
+
+        (assembly / logLevel) := Level.Info,
+
+        // include `scala-library` from assembly.
+        (assembly / assemblyPackageScala / assembleArtifact) := true,
+
+        (assembly / assemblyExcludedJars) := {
+          val cp = (assembly / fullClasspath).value
+          cp filter { v =>
+            val name = v.data.getName
+            !(name.startsWith("celeborn-") ||
+              name.startsWith("protobuf-java-") ||
+              name.startsWith("guava-") ||
+              name.startsWith("failureaccess-") ||
+              name.startsWith("netty-") ||
+              name.startsWith("commons-lang3-") ||
+              name.startsWith("RoaringBitmap-") ||
+              name.startsWith("lz4-java-") ||
+              name.startsWith("zstd-jni-") ||
+              name.startsWith("scala-library-"))
+          }
+        },
+
+        (assembly / assemblyShadeRules) := Seq(
+          ShadeRule.rename("com.google.protobuf.**" -> 
"org.apache.celeborn.shaded.com.google.protobuf.@1").inAll,
+          ShadeRule.rename("com.google.common.**" -> 
"org.apache.celeborn.shaded.com.google.common.@1").inAll,
+          ShadeRule.rename("io.netty.**" -> 
"org.apache.celeborn.shaded.io.netty.@1").inAll,
+          ShadeRule.rename("org.apache.commons.**" -> 
"org.apache.celeborn.shaded.org.apache.commons.@1").inAll,
+          ShadeRule.rename("org.roaringbitmap.**" -> 
"org.apache.celeborn.shaded.org.roaringbitmap.@1").inAll
+        ),
+
+        (assembly / assemblyMergeStrategy) := {
+          case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") => 
MergeStrategy.discard
+          // For netty-3.x.y.Final.jar
+          case m if m.startsWith("META-INF/license/") => MergeStrategy.discard
+          // the LicenseAndNoticeMergeStrategy always picks the license/notice 
file from the current project
+          case m@("META-INF/LICENSE" | "META-INF/NOTICE") => 
CustomMergeStrategy("LicenseAndNoticeMergeStrategy") { conflicts =>
+            val entry = conflicts.head
+            val projectLicenseFile = (Compile / resourceDirectory).value / 
entry.target
+            val stream = () => new java.io.BufferedInputStream(new 
java.io.FileInputStream(projectLicenseFile))
+            Right(Vector(JarEntry(entry.target, stream)))
+          }
+          case PathList(ps@_*) if Assembly.isLicenseFile(ps.last) => 
MergeStrategy.discard
+          // Drop all proto files that are not needed as artifacts of the 
build.
+          case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => 
MergeStrategy.discard
+          case m if 
m.toLowerCase(Locale.ROOT).startsWith("meta-inf/native-image") => 
MergeStrategy.discard
+          // Drop netty jnilib
+          case m if m.toLowerCase(Locale.ROOT).endsWith(".jnilib") => 
MergeStrategy.discard
+          // rename netty native lib
+          case "META-INF/native/libnetty_transport_native_epoll_x86_64.so" => 
CustomMergeStrategy.rename(_ => 
"META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so")
+          case "META-INF/native/libnetty_transport_native_epoll_aarch_64.so" 
=> CustomMergeStrategy.rename(_ => 
"META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so")
+          case _ => MergeStrategy.first
+        },
+
+        Compile / packageBin := assembly.value,
+        pomPostProcess := removeDependenciesTransformer
+      )
+  }
+
+  def modules: Seq[Project] = {
+    Seq(tezClient, tezIt, tezGroup, tezClientShade)
+  }
+
+  // for test only, don't use this group for any other projects
+  lazy val tezGroup = (project withId 
"celeborn-tez-group").aggregate(tezClient, tezIt)
+
+  val copyDeps = TaskKey[Unit]("copyDeps", "Copies needed dependencies to the 
build directory.")
+  val destPath = (Compile / crossTarget) {
+    _ / "mapreduce_lib"
+  }
+
+  lazy val copyDepsSettings = Seq(
+    copyDeps := {
+      val dest = destPath.value
+      if (!dest.isDirectory() && !dest.mkdirs()) {
+        throw new java.io.IOException("Failed to create jars directory.")
+      }
+
+      (Compile / dependencyClasspath).value.map(_.data)
+        .filter { jar => jar.isFile() }
+        .foreach { jar =>
+          val destJar = new File(dest, jar.getName())
+          if (destJar.isFile()) {
+            destJar.delete()
+          }
+          Files.copy(jar.toPath(), destJar.toPath())
+        }
+    },
+    (Test / compile) := {
+      copyDeps.value
+      (Test / compile).value
+    }
+  )
+}
+
+
 object CelebornOpenApi {
   val openApiSpecDir = "openapi/openapi-client/src/main/openapi3"
   val openApiMasterInternalOutputDir = 
"openapi/openapi-client/target/master/generated-sources/java"
diff --git a/tests/tez-it/pom.xml b/tests/tez-it/pom.xml
new file mode 100644
index 000000000..fc499e780
--- /dev/null
+++ b/tests/tez-it/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-tez-it_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn MapReduce Integration Test</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-master_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-tez-shaded_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.xerial.snappy</groupId>
+          <artifactId>snappy-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-examples</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- The compile scope is to generate mapreduce_lib -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/mapreduce_lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Reply via email to