This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3590fa778 [CELEBORN-1545] Add Tez plugin skeleton and dag app master
3590fa778 is described below
commit 3590fa778e97c22feabc8a35c9aeb177c4369bcc
Author: mingji <[email protected]>
AuthorDate: Fri Nov 22 18:38:25 2024 +0800
[CELEBORN-1545] Add Tez plugin skeleton and dag app master
### What changes were proposed in this pull request?
1. Add directories for Apache Tez framework
2. Add a CelebornDagAppMaster with Lifecycmanager
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2939 from GH-Gloway/b1545-1.
Authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
---
client-tez/tez/pom.xml | 198 +++++++++++
.../celeborn/tez/plugin/util/CelebornTezUtils.java | 97 +++++
.../apache/tez/dag/app/CelebornDagAppMaster.java | 390 +++++++++++++++++++++
dev/reformat | 1 +
pom.xml | 152 ++++++++
5 files changed, 838 insertions(+)
diff --git a/client-tez/tez/pom.xml b/client-tez/tez/pom.xml
new file mode 100644
index 000000000..1211303c7
--- /dev/null
+++ b/client-tez/tez/pom.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-tez_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for Tez</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ <version>1.7.36</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/CelebornTezUtils.java
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/CelebornTezUtils.java
new file mode 100644
index 000000000..17cb1a62f
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/CelebornTezUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tez.plugin.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
+
+public class CelebornTezUtils {
+ public static final String TEZ_PREFIX = "tez.";
+ public static final String TEZ_CELEBORN_LM_HOST =
"celeborn.lifecycleManager.host";
+ public static final String TEZ_CELEBORN_LM_PORT =
"celeborn.lifecycleManager.port";
+ public static final String TEZ_CELEBORN_APPLICATION_ID =
"celeborn.applicationId";
+ public static final String TEZ_SHUFFLE_ID = "celeborn.tez.shuffle.id";
+ public static final String TEZ_BROADCAST_OR_ONETOONE =
"celeborn.tez.broadcastOrOneToOne";
+
+ public static final CelebornConf fromTezConfiguration(Configuration
tezConfig) {
+ CelebornConf tmpCelebornConf = new CelebornConf();
+ for (Map.Entry<String, String> property : tezConfig) {
+ String proName = property.getKey();
+ String proValue = property.getValue();
+ if (proName.startsWith(TEZ_PREFIX + "celeborn")) {
+ tmpCelebornConf.set(proName.substring(TEZ_PREFIX.length()), proValue);
+ }
+ }
+ return tmpCelebornConf;
+ }
+
+ public static Object getPrivateField(Object object, String name) {
+ try {
+ Field f = object.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(object);
+ } catch (Exception e) {
+ throw new CelebornRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Object getParentPrivateField(Object object, String name) {
+ try {
+ Field f = object.getClass().getSuperclass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(object);
+ } catch (Exception e) {
+ throw new CelebornRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static void setParentPrivateField(Object object, String name, Object
value) {
+ try {
+ Field f = object.getClass().getSuperclass().getDeclaredField(name);
+ f.setAccessible(true);
+ f.set(object, value);
+ } catch (Exception e) {
+ throw new CelebornRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
+ if (uniqueIdentifier == null) {
+ throw new CelebornRuntimeException("uniqueIdentifier should not be
null");
+ }
+ String[] ids = uniqueIdentifier.split("_");
+ return StringUtils.join(ids, "_", 0, 7);
+ }
+
+ public static String ensureGetSysEnv(String envName) throws IOException {
+ String value = System.getenv(envName);
+ if (value == null) {
+ String msg = envName + " is null";
+ throw new CelebornIOException(msg);
+ }
+ return value;
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
new file mode 100644
index 000000000..c9c51cbef
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static
org.apache.celeborn.tez.plugin.util.CelebornTezUtils.getPrivateField;
+import static
org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+public class CelebornDagAppMaster extends DAGAppMaster {
+
+ private static final Logger Logger =
LoggerFactory.getLogger(CelebornDagAppMaster.class);
+ private static final String MASTER_ENDPOINTS_ENV =
"CELEBORN_MASTER_ENDPOINTS";
+
+ private CelebornConf celebornConf;
+ private LifecycleManager lifecycleManager;
+ private ApplicationAttemptId appAttemptId;
+ private String lifecycleManagerHost;
+ private int lifecycleManagerPort;
+ private AtomicInteger shuffleIdGenerator = new AtomicInteger(0);
+
+ public CelebornDagAppMaster(
+ ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId,
+ String nmHost,
+ int nmPort,
+ int nmHttpPort,
+ Clock clock,
+ long appSubmitTime,
+ boolean isSession,
+ String workingDirectory,
+ String[] localDirs,
+ String[] logDirs,
+ String clientVersion,
+ Credentials credentials,
+ String jobUserName,
+ DAGProtos.AMPluginDescriptorProto pluginDescriptorProto) {
+ super(
+ applicationAttemptId,
+ containerId,
+ nmHost,
+ nmPort,
+ nmHttpPort,
+ clock,
+ appSubmitTime,
+ isSession,
+ workingDirectory,
+ localDirs,
+ logDirs,
+ clientVersion,
+ credentials,
+ jobUserName,
+ pluginDescriptorProto);
+ appAttemptId = applicationAttemptId;
+ }
+
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+ lifecycleManager = new LifecycleManager(appAttemptId.toString(),
celebornConf);
+ lifecycleManagerHost = lifecycleManager.getHost();
+ lifecycleManagerPort = lifecycleManager.getPort();
+ Logger.info("Init Celeborn lifecycle manager");
+ }
+
+ private static void validateInputParam(String value, String param) throws
IOException {
+ if (value == null) {
+ String msg = param + " is null";
+ Logger.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ @Override
+ protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
+ DAG dag = super.createDAG(dagPB);
+
+ List<Integer> currentDagShuffleIds = new ArrayList<>();
+
+ StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag,
"stateMachine");
+ stateMachine.registerStateEnteredCallback(
+ DAGState.INITED,
+ (OnStateChangedCallback<DAGState, DAGImpl>)
+ (tmpDag, dagState) -> {
+ try {
+ Map<String, Edge> edges = (Map<String, Edge>)
getPrivateField(tmpDag, "edges");
+ for (Map.Entry<String, Edge> entry : edges.entrySet()) {
+ Edge edge = entry.getValue();
+
+ EdgeProperty.DataMovementType dataMovementType =
+ edge.getEdgeProperty().getDataMovementType();
+
+ boolean broadCastOrOneToOne = false;
+ if (dataMovementType ==
EdgeProperty.DataMovementType.BROADCAST
+ || dataMovementType ==
EdgeProperty.DataMovementType.ONE_TO_ONE) {
+ broadCastOrOneToOne = true;
+ }
+
+ Configuration edgeSourceConf =
+ org.apache.tez.common.TezUtils.createConfFromUserPayload(
+
edge.getEdgeProperty().getEdgeSource().getUserPayload());
+ int shuffleId = shuffleIdGenerator.getAndIncrement();
+ currentDagShuffleIds.add(shuffleId);
+ edgeSourceConf.setInt(CelebornTezUtils.TEZ_SHUFFLE_ID,
shuffleId);
+ edgeSourceConf.set(
+ CelebornTezUtils.TEZ_CELEBORN_APPLICATION_ID,
appAttemptId.toString());
+ edgeSourceConf.set(CelebornTezUtils.TEZ_CELEBORN_LM_HOST,
lifecycleManagerHost);
+ edgeSourceConf.setInt(
+ CelebornTezUtils.TEZ_CELEBORN_LM_PORT,
lifecycleManagerPort);
+ edgeSourceConf.set(
+ CelebornTezUtils.TEZ_BROADCAST_OR_ONETOONE,
+ String.valueOf(broadCastOrOneToOne));
+ for (Tuple2<String, String> stringStringTuple2 :
celebornConf.getAll()) {
+ edgeSourceConf.set(stringStringTuple2._1,
stringStringTuple2._2);
+ }
+
+ edge.getEdgeProperty()
+ .getEdgeSource()
+ .setUserPayload(
+
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+ edge.getEdgeProperty()
+ .getEdgeDestination()
+ .setUserPayload(
+
org.apache.tez.common.TezUtils.createUserPayloadFromConf(edgeSourceConf));
+
+ // rename output class name
+ OutputDescriptor outputDescriptor =
edge.getEdgeProperty().getEdgeSource();
+ Field outputClassNameField =
+
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ outputClassNameField.setAccessible(true);
+ String outputClassName = (String)
outputClassNameField.get(outputDescriptor);
+ outputClassNameField.set(
+ outputDescriptor,
getNewOutputClassName(outputClassName));
+
+ // rename input class name
+ InputDescriptor inputDescriptor =
edge.getEdgeProperty().getEdgeDestination();
+ Field inputClassNameField =
+
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
+ inputClassNameField.setAccessible(true);
+ String inputClassName = (String)
outputClassNameField.get(inputDescriptor);
+ outputClassNameField.set(inputDescriptor,
getNewInputClassName(inputClassName));
+ }
+ } catch (IOException | IllegalAccessException |
NoSuchFieldException e) {
+ Logger.error("Reconfigure failed after dag was inited, caused
by {}", e);
+ throw new TezUncheckedException(e);
+ }
+ });
+
+ // process dag final status
+ List<DAGState> finalStates =
+ Arrays.asList(DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
DAGState.ERROR);
+ Map callbackMap = (Map) getPrivateField(stateMachine, "callbackMap");
+ finalStates.forEach(
+ finalState ->
+ callbackMap.put(
+ finalState,
+ (OnStateChangedCallback<DAGState, DAGImpl>)
+ (tmpDag, dagState) -> {
+ // clean all shuffle for this Dag
+ for (Integer shuffleId : currentDagShuffleIds) {
+ lifecycleManager.unregisterShuffle(shuffleId);
+ }
+ }));
+ try {
+ Logger.info("Tez with Celeborn dag :{}",
DAGUtils.generateSimpleJSONPlan(dagPB));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ return dag;
+ }
+
+ // tez-runtime-library may be shaded, so we need to use reflection to get
the class name
+ private static String getNewOutputClassName(String oldClassName) {
+ // todo
+ return "";
+ }
+
+ private static String getNewInputClassName(String oldClassName) {
+ // todo
+ return "";
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ lifecycleManager.stop();
+ super.serviceStop();
+ }
+
+ public static void main(String[] args) {
+ try {
+
+ boolean sessionModeCliOption = false;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("-D")) {
+ String[] property = args[i].split("=");
+ if (property.length < 2) {
+ System.setProperty(property[0].substring(2), "");
+ } else {
+ System.setProperty(property[0].substring(2), property[1]);
+ }
+ } else if (args[i].contains("--session") || args[i].contains("-s")) {
+ sessionModeCliOption = true;
+ }
+ }
+
+ Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
+ final String pid = System.getenv().get("JVM_PID");
+ String containerIdStr =
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ String nodeHostString =
System.getenv(ApplicationConstants.Environment.NM_HOST.name());
+ String nodePortString =
System.getenv(ApplicationConstants.Environment.NM_PORT.name());
+ String nodeHttpPortString =
+ System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+ String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+ String clientVersion =
System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
+ if (clientVersion == null) {
+ clientVersion = VersionInfo.UNKNOWN;
+ }
+
+ validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
+
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+
+ String jobUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
+
+ Logger.info(
+ "Creating CelebornDAGAppMaster for "
+ + "applicationId={}"
+ + ", attemptNum={}"
+ + ", AMContainerId={}"
+ + ", jvmPid={}"
+ + ", userFromEnv={}"
+ + ", cliSessionOption={}"
+ + ", pwd={}"
+ + ", localDirs={}"
+ + ", logDirs={}",
+ applicationAttemptId.getApplicationId(),
+ applicationAttemptId.getAttemptId(),
+ containerId,
+ pid,
+ jobUserName,
+ sessionModeCliOption,
+ System.getenv(ApplicationConstants.Environment.PWD.name()),
+ System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()),
+ System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
+
+ Configuration conf = new Configuration(new YarnConfiguration());
+
+ DAGProtos.ConfigurationProto confProto =
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(
+ System.getenv(ApplicationConstants.Environment.PWD.name()));
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(conf,
confProto.getConfKeyValuesList());
+
+ DAGProtos.AMPluginDescriptorProto amPluginDescriptorProto = null;
+ if (confProto.hasAmPluginDescriptor()) {
+ amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+ }
+
+ // disable tez slow start
+
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
1.0f);
+
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
1.0f);
+ // disable transfer shuffle from event
+
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
false);
+ conf.setBoolean(
+
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
false);
+ // disable pipelined shuffle
+
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED,
false);
+ // disable reschedule task on unhealthy nodes because shuffle data are
stored in Celeborn
+ conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
+
+ // support set celeborn master endpoints from env
+ String masterEndpointsKey =
+ CelebornTezUtils.TEZ_PREFIX + CelebornConf.MASTER_ENDPOINTS().key();
+ String masterEndpointsVal = conf.get(masterEndpointsKey);
+ if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
+ Logger.info(
+ "MRAppMaster sets {} via environment variable {}.",
+ masterEndpointsKey,
+ MASTER_ENDPOINTS_ENV);
+ conf.set(masterEndpointsKey,
CelebornTezUtils.ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
+ }
+
+ UserGroupInformation.setConfiguration(conf);
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+
+ TezUtilsInternal.setSecurityUtilConfigration(Logger, conf);
+
+ CelebornDagAppMaster appMaster =
+ new CelebornDagAppMaster(
+ applicationAttemptId,
+ containerId,
+ nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString),
+ new SystemClock(),
+ appSubmitTime,
+ sessionModeCliOption,
+ System.getenv(ApplicationConstants.Environment.PWD.name()),
+ TezCommonUtils.getTrimmedStrings(
+
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
+ TezCommonUtils.getTrimmedStrings(
+
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
+ clientVersion,
+ credentials,
+ jobUserName,
+ amPluginDescriptorProto);
+ ShutdownHookManager.get()
+ .addShutdownHook(new DAGAppMasterShutdownHook(appMaster),
SHUTDOWN_HOOK_PRIORITY);
+
+ // log the system properties
+ if (Logger.isInfoEnabled()) {
+ String systemPropsToLog =
TezCommonUtils.getSystemPropertiesToLog(conf);
+ if (systemPropsToLog != null) {
+ Logger.info(systemPropsToLog);
+ }
+ }
+
+ initAndStartAppMaster(appMaster, conf);
+
+ } catch (Throwable t) {
+ Logger.error("Error starting DAGAppMaster", t);
+ System.exit(1);
+ }
+ }
+}
diff --git a/dev/reformat b/dev/reformat
index b2ba562ae..0784c3c45 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -33,4 +33,5 @@ else
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
${PROJECT_DIR}/build/mvn spotless:apply -Pmr
+ ${PROJECT_DIR}/build/mvn spotless:apply -Ptez
fi
diff --git a/pom.xml b/pom.xml
index a8549b5fc..22bce25ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1800,6 +1800,158 @@
</modules>
</profile>
+ <profile>
+ <id>tez</id>
+ <modules>
+ <module>client-tez/tez</module>
+ </modules>
+ <properties>
+ <tez.version>0.10.3</tez.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
<profile>
<id>google-mirror</id>
<properties>