This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd2315c [#983] improvement(tez): Optimize tez client delivery
configuration (#985)
3bd2315c is described below
commit 3bd2315cf2f2fb8c9a8f094c7d0c53b7d6a6116d
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jul 4 11:16:55 2023 +0800
[#983] improvement(tez): Optimize tez client delivery configuration (#985)
### What changes were proposed in this pull request?
Three improvement about configuration will be done in this issue.
- 1 For now, tez client use rss_conf.xml to delivery configuration. As
https://github.com/apache/incubator-uniffle/pull/966 is applied, we can
delivery configuration by edge conf.
- 2 delivery dynamic configuration from coordinator, then override the tez
client configuration.
- 3 delivery configuration from client side.
### Why are the changes needed?
- 1. rss_conf.xml is unnecessary.
- 2. dynamic configuration from coordinator are not applied.
- 3. config in client side can not delivery to input/ouput.
### How was this patch tested?
integration test, unit test, test in yarn cluster, test in tez local mode.
---
.../java/org/apache/tez/common/RssTezConfig.java | 10 +-
.../java/org/apache/tez/common/RssTezUtils.java | 36 +++++
.../java/org/apache/tez/common/UmbilicalUtils.java | 25 +--
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 171 ++++++++-------------
.../tez/dag/app/TezRemoteShuffleManager.java | 6 +-
.../orderedgrouped/RssShuffleScheduler.java | 3 +-
.../output/RssOrderedPartitionedKVOutput.java | 21 +--
.../library/output/RssUnorderedKVOutput.java | 22 +--
.../output/RssUnorderedPartitionedKVOutput.java | 21 +--
.../org/apache/tez/common/RssTezUtilsTest.java | 27 ++++
.../apache/tez/dag/app/RssDAGAppMasterTest.java | 101 +++++++-----
.../tez/dag/app/TezRemoteShuffleManagerTest.java | 4 +-
12 files changed, 225 insertions(+), 222 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index 25a382c7..b575f2fc 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -18,7 +18,9 @@
package org.apache.tez.common;
import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.uniffle.client.util.RssClientConfig;
@@ -153,8 +155,6 @@ public class RssTezConfig {
public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE;
- public static final String RSS_CONF_FILE = "rss_conf.xml";
-
public static final String RSS_REMOTE_STORAGE_PATH =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
@@ -169,6 +169,12 @@ public class RssTezConfig {
public static final String RSS_REDUCE_INITIAL_MEMORY = TEZ_RSS_CONFIG_PREFIX
+ "rss.reduce.initial.memory";
+ public static final String RSS_ACCESS_TIMEOUT_MS = TEZ_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
+ public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE =
RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;
+
+ public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
+ ImmutableSet.of(RSS_STORAGE_TYPE, RSS_REMOTE_STORAGE_PATH);
+
public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index c980ddb8..81b334d3 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -393,4 +393,40 @@ public class RssTezUtils {
return className;
}
}
+
+ public static void applyDynamicClientConf(Configuration conf, Map<String,
String> confItems) {
+ if (conf == null) {
+ LOG.warn("Tez conf is null");
+ return;
+ }
+
+ if (confItems == null || confItems.isEmpty()) {
+ LOG.warn("Empty conf items");
+ return;
+ }
+
+ for (Map.Entry<String, String> kv : confItems.entrySet()) {
+ String tezConfKey = kv.getKey();
+ if (!tezConfKey.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
+ tezConfKey = RssTezConfig.TEZ_RSS_CONFIG_PREFIX + tezConfKey;
+ }
+ String tezConfVal = kv.getValue();
+ if (StringUtils.isEmpty(conf.get(tezConfKey, ""))
+ || RssTezConfig.RSS_MANDATORY_CLUSTER_CONF.contains(tezConfKey)) {
+ LOG.warn("Use conf dynamic conf {} = {}", tezConfKey, tezConfVal);
+ conf.set(tezConfKey, tezConfVal);
+ }
+ }
+ }
+
+ public static Configuration filterRssConf(Configuration extraConf) {
+ Configuration conf = new Configuration(false);
+ for (Map.Entry<String, String> entry : extraConf) {
+ String key = entry.getKey();
+ if (key.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return conf;
+ }
}
diff --git a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
index 152a013e..26e0dc31 100644
--- a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
@@ -23,11 +23,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -38,24 +35,15 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+
public class UmbilicalUtils {
private static final Logger LOG =
LoggerFactory.getLogger(UmbilicalUtils.class);
private UmbilicalUtils() {
}
- /**
- *
- * @return Get Application Master host and port from config file
- */
- private static Pair<String, Integer> getAmHostPort() {
- JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
- String host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null
host");
- int port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
- LOG.info("Got RssConf am info, host is: {}, port is: {}", host, port);
- return new ImmutablePair<>(host, port);
- }
-
/**
*
* @param applicationId Application Id of this task
@@ -74,8 +62,9 @@ public class UmbilicalUtils {
int shuffleId) throws IOException, InterruptedException,
TezException {
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(applicationId.toString());
- Pair<String, Integer> amHostPort = getAmHostPort();
- final InetSocketAddress address =
NetUtils.createSocketAddrForHost(amHostPort.getLeft(), amHostPort.getRight());
+ String host = conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+ int port = conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+ final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
.doAs(new
PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
@Override
@@ -92,7 +81,7 @@ public class UmbilicalUtils {
.getShuffleAssignmentsInfo()
.getPartitionToServers();
LOG.info("RequestShuffleServer applicationId:{}, taskAttemptId:{},
host:{}, port:{}, shuffleId:{}, worker:{}",
- applicationId, taskAttemptId, amHostPort.getLeft(),
amHostPort.getRight(), shuffleId, partitionToServers);
+ applicationId, taskAttemptId, host, port, shuffleId,
partitionToServers);
return partitionToServers;
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index cb6146bf..6e024354 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -17,10 +17,9 @@
package org.apache.tez.dag.app;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@@ -29,12 +28,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
@@ -42,7 +35,6 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -54,13 +46,12 @@ import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezClassLoader;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
@@ -80,15 +71,15 @@ import org.apache.uniffle.common.util.ThreadUtils;
import static org.apache.log4j.LogManager.CONFIGURATOR_CLASS_KEY;
import static org.apache.log4j.LogManager.DEFAULT_CONFIGURATION_KEY;
-import static org.apache.tez.common.TezCommonUtils.TEZ_SYSTEM_SUB_DIR;
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
public class RssDAGAppMaster extends DAGAppMaster {
private static final Logger LOG =
LoggerFactory.getLogger(RssDAGAppMaster.class);
private ShuffleWriteClient shuffleWriteClient;
private TezRemoteShuffleManager tezRemoteShuffleManager;
- private static final String rssConfFileLocalResourceName = "rss_conf.xml";
+ private Map<String, String> clusterClientConf;
- private DAGProtos.PlanLocalResource rssConfFileLocalResource;
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("AppHeartbeat")
);
@@ -101,6 +92,12 @@ public class RssDAGAppMaster extends DAGAppMaster {
workingDirectory, localDirs, logDirs, clientVersion, credentials,
jobUserName, pluginDescriptorProto);
}
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ initAndStartRSSClient(this, conf);
+ }
+
public ShuffleWriteClient getShuffleWriteClient() {
return shuffleWriteClient;
}
@@ -117,15 +114,17 @@ public class RssDAGAppMaster extends DAGAppMaster {
this.tezRemoteShuffleManager = tezRemoteShuffleManager;
}
+ public Map<String, String> getClusterClientConf() {
+ return clusterClientConf;
+ }
+
/**
* Init and Start Rss Client
* @param appMaster
* @param conf
- * @param applicationAttemptId
* @throws Exception
*/
- public static void initAndStartRSSClient(final RssDAGAppMaster appMaster,
Configuration conf,
- ApplicationAttemptId applicationAttemptId) throws Exception {
+ public static void initAndStartRSSClient(final RssDAGAppMaster appMaster,
Configuration conf) throws Exception {
ShuffleWriteClient client = RssTezUtils.createShuffleClient(conf);
appMaster.setShuffleWriteClient(client);
@@ -134,7 +133,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
LOG.info("Registering coordinators {}", coordinators);
client.registerCoordinators(coordinators);
- String strAppAttemptId = applicationAttemptId.toString();
+ String strAppAttemptId = appMaster.getAttemptID().toString();
long heartbeatInterval = conf.getLong(RssTezConfig.RSS_HEARTBEAT_INTERVAL,
RssTezConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout = conf.getLong(RssTezConfig.RSS_HEARTBEAT_TIMEOUT,
heartbeatInterval / 2);
@@ -158,15 +157,13 @@ public class RssDAGAppMaster extends DAGAppMaster {
appMaster.getTezRemoteShuffleManager().initialize();
appMaster.getTezRemoteShuffleManager().start();
- TezConfiguration extraConf = new TezConfiguration(false);
- extraConf.clear();
-
- String strAppId = applicationAttemptId.getApplicationId().toString();
- extraConf.set(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS,
- appMaster.getTezRemoteShuffleManager().address.getHostName());
- extraConf.setInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT,
- appMaster.getTezRemoteShuffleManager().address.getPort());
- writeExtraConf(appMaster, conf, extraConf, strAppId);
+ // apply dynamic configuration
+ boolean dynamicConfEnabled =
conf.getBoolean(RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
+ RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
+ if (dynamicConfEnabled) {
+ appMaster.clusterClientConf = client.fetchClientConf(
+ conf.getInt(RssTezConfig.RSS_ACCESS_TIMEOUT_MS,
RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
+ }
mayCloseTezSlowStart(conf);
}
@@ -174,52 +171,10 @@ public class RssDAGAppMaster extends DAGAppMaster {
@Override
protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
DAGImpl dag = createDAG(dagPB, null);
- registerStateEnteredCallback(dag);
+ registerStateEnteredCallback(dag, this);
return dag;
}
- @Override
- public String submitDAGToAppMaster(DAGProtos.DAGPlan dagPlan, Map<String,
LocalResource> additionalResources)
- throws TezException {
-
- addAdditionalResource(dagPlan, getRssConfFileLocalResource());
-
- return super.submitDAGToAppMaster(dagPlan, additionalResources);
- }
-
- public DAGProtos.PlanLocalResource getRssConfFileLocalResource() {
- return rssConfFileLocalResource;
- }
-
- public static void addAdditionalResource(DAGProtos.DAGPlan dagPlan,
DAGProtos.PlanLocalResource additionalResource)
- throws TezException {
- List<DAGProtos.PlanLocalResource> planLocalResourceList =
dagPlan.getLocalResourceList();
-
- if (planLocalResourceList == null) {
- LOG.warn("planLocalResourceList is null, add new list");
- planLocalResourceList = new ArrayList<>();
- } else {
- planLocalResourceList = new ArrayList<>(planLocalResourceList);
- }
-
- try {
- planLocalResourceList.add(additionalResource);
- Field field = DAGProtos.DAGPlan.class.getDeclaredField("localResource_");
- field.setAccessible(true);
- field.set(dagPlan, planLocalResourceList);
- field.setAccessible(false);
- } catch (Exception e) {
- LOG.error("submitDAGToAppMaster reflect error", e);
- throw new TezException(e.getMessage());
- }
-
- if (LOG.isDebugEnabled()) {
- for (DAGProtos.PlanLocalResource localResource :
dagPlan.getLocalResourceList()) {
- LOG.debug("localResource: {}", localResource.toString());
- }
- }
- }
-
/**
* main method
* @param args
@@ -321,7 +276,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
}
}
- initAndStartRSSClient(appMaster, conf, applicationAttemptId);
initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
LOG.error("Error starting RssDAGAppMaster", t);
@@ -329,37 +283,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
}
}
- static void writeExtraConf(final RssDAGAppMaster appMaster, Configuration
conf,
- TezConfiguration extraConf, String strAppId) {
- try {
- Path baseStagingPath = TezCommonUtils.getTezBaseStagingPath(conf);
- Path tezStagingDir = new Path(new Path(baseStagingPath,
TEZ_SYSTEM_SUB_DIR), strAppId);
-
- FileSystem fs = tezStagingDir.getFileSystem(conf);
- Path rssConfFilePath = new Path(tezStagingDir,
RssTezConfig.RSS_CONF_FILE);
-
- try (FSDataOutputStream out =
- FileSystem.create(fs, rssConfFilePath,
- new
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
- extraConf.writeXml(out);
- }
- FileStatus rsrcStat = fs.getFileStatus(rssConfFilePath);
-
- appMaster.rssConfFileLocalResource =
DAGProtos.PlanLocalResource.newBuilder()
- .setName(appMaster.rssConfFileLocalResourceName)
- .setUri(rsrcStat.getPath().toString())
- .setSize(rsrcStat.getLen())
- .setTimeStamp(rsrcStat.getModificationTime())
- .setType(DAGProtos.PlanLocalResourceType.FILE)
- .setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION)
- .build();
- LOG.info("Upload extra conf success!");
- } catch (Exception e) {
- LOG.error("Upload extra conf exception!", e);
- throw new RssException("Upload extra conf exception ", e);
- }
- }
-
static void mayCloseTezSlowStart(Configuration conf) {
if (!conf.getBoolean(RssTezConfig.RSS_AM_SLOW_START_ENABLE,
RssTezConfig.RSS_AM_SLOW_START_ENABLE_DEFAULT)) {
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
1.0f);
@@ -395,21 +318,41 @@ public class RssDAGAppMaster extends DAGAppMaster {
}
@VisibleForTesting
- public static void registerStateEnteredCallback(DAGImpl dag) {
- StateMachineTez
- stateMachine = (StateMachineTez) getPrivateField(dag, "stateMachine");
- stateMachine.registerStateEnteredCallback(DAGState.INITED, new
DagInitialCallback());
+ public static void registerStateEnteredCallback(DAGImpl dag, RssDAGAppMaster
appMaster) {
+ StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag,
"stateMachine");
+ stateMachine.registerStateEnteredCallback(DAGState.INITED, new
DagInitialCallback(appMaster));
}
static class DagInitialCallback implements OnStateChangedCallback<DAGState,
DAGImpl> {
+ private RssDAGAppMaster appMaster;
+
+ DagInitialCallback(RssDAGAppMaster appMaster) {
+ this.appMaster = appMaster;
+ }
+
@Override
public void onStateChanged(DAGImpl dag, DAGState dagState) {
try {
+ // get rss config from client
+ Configuration filterRssConf =
RssTezUtils.filterRssConf(appMaster.getConfig());
Map<String, Edge> edges = (Map<String, Edge>) getPrivateField(dag,
"edges");
for (Map.Entry<String, Edge> entry : edges.entrySet()) {
Edge edge = entry.getValue();
+ // add user defined config to edge source conf
+ Configuration edgeSourceConf =
+
TezUtils.createConfFromUserPayload(edge.getEdgeProperty().getEdgeSource().getUserPayload());
+ edgeSourceConf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS,
+
this.appMaster.getTezRemoteShuffleManager().getAddress().getHostName());
+ edgeSourceConf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT,
+
this.appMaster.getTezRemoteShuffleManager().getAddress().getPort());
+ edgeSourceConf.addResource(filterRssConf);
+ RssTezUtils.applyDynamicClientConf(edgeSourceConf,
this.appMaster.getClusterClientConf());
+ edge.getEdgeProperty().getEdgeSource()
+
.setUserPayload(TezUtils.createUserPayloadFromConf(edgeSourceConf));
+
+ // rename output class name
OutputDescriptor outputDescriptor =
edge.getEdgeProperty().getEdgeSource();
Field outputClassNameField =
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
outputClassNameField.setAccessible(true);
@@ -417,6 +360,19 @@ public class RssDAGAppMaster extends DAGAppMaster {
String rssOutputClassName =
RssTezUtils.replaceRssOutputClassName(outputClassName);
outputClassNameField.set(outputDescriptor, rssOutputClassName);
+ // add user defined config to edge destination conf
+ Configuration edgeDestinationConf =
+
TezUtils.createConfFromUserPayload(edge.getEdgeProperty().getEdgeSource().getUserPayload());
+ edgeDestinationConf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS,
+
this.appMaster.getTezRemoteShuffleManager().getAddress().getHostName());
+ edgeDestinationConf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT,
+
this.appMaster.getTezRemoteShuffleManager().getAddress().getPort());
+ edgeDestinationConf.addResource(filterRssConf);
+ RssTezUtils.applyDynamicClientConf(edgeDestinationConf,
this.appMaster.getClusterClientConf());
+ edge.getEdgeProperty().getEdgeDestination()
+
.setUserPayload(TezUtils.createUserPayloadFromConf(edgeDestinationConf));
+
+ // rename input class name
InputDescriptor inputDescriptor =
edge.getEdgeProperty().getEdgeDestination();
Field inputClassNameField =
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
inputClassNameField.setAccessible(true);
@@ -424,7 +380,8 @@ public class RssDAGAppMaster extends DAGAppMaster {
String rssInputClassName =
RssTezUtils.replaceRssInputClassName(inputClassName);
outputClassNameField.set(inputDescriptor, rssInputClassName);
}
- } catch (Exception e) {
+ } catch (IOException | IllegalAccessException | NoSuchFieldException e) {
+ LOG.error("Reconfigure failed after dag was inited, caused by {}", e);
throw new TezUncheckedException(e);
}
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 2fe9dc3f..09f33957 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -67,7 +67,7 @@ import static
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER
public class TezRemoteShuffleManager implements ServicePluginLifecycle {
private static final Logger LOG =
LoggerFactory.getLogger(TezRemoteShuffleManager.class);
- protected InetSocketAddress address;
+ private InetSocketAddress address;
protected volatile Server server;
private String tokenIdentifier;
@@ -102,6 +102,10 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
server.stop();
}
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
private class TezRemoteShuffleUmbilicalProtocolImpl implements
TezRemoteShuffleUmbilicalProtocol {
private Map<Integer, ShuffleAssignmentsInfo> shuffleIdToShuffleAssignsInfo
= new HashMap<>();
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index c1d816f8..5af487b1 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1609,7 +1609,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
shuffleServerInfoList,
readerJobConf,
new TezIdHelper(),
- expectedTaskIdsBitmapFilterEnable);
+ expectedTaskIdsBitmapFilterEnable,
+ RssTezConfig.toRssConf(conf));
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssTezShuffleDataFetcher fetcher = new RssTezShuffleDataFetcher(
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
index 5ccb9ad8..ac7a4a62 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.GetShuffleServerRequest;
import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,7 +63,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
-
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
/**
* {@link RssOrderedPartitionedKVOutput} is an {@link AbstractLogicalOutput}
which
@@ -112,18 +111,6 @@ public class RssOrderedPartitionedKVOutput extends
AbstractLogicalOutput {
LOG.info("Initialized RssOrderedPartitionedKVOutput.");
}
- private void getRssConf() {
- try {
- JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
- this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null
host");
- this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
- LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
- } catch (Exception e) {
- LOG.warn("debugRssConf error: ", e);
- }
- }
-
@Override
public List<Event> initialize() throws Exception {
this.startTime = System.nanoTime();
@@ -135,12 +122,12 @@ public class RssOrderedPartitionedKVOutput extends
AbstractLogicalOutput {
getContext().requestInitialMemory(memRequestSize,
memoryUpdateCallbackHandler);
LOG.info("Got initialMemory.");
- getRssConf();
-
this.sendEmptyPartitionDetails = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+ this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+ this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index b1cef02e..b40ef8e7 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.GetShuffleServerRequest;
import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,7 +63,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
-
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
/**
* {@link RssUnorderedKVOutput} is an {@link AbstractLogicalOutput} which
@@ -113,19 +112,6 @@ public class RssUnorderedKVOutput extends
AbstractLogicalOutput {
LOG.info("Initialized RssUnorderedKVOutput.");
}
- private void getRssConf() {
- try {
- JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
- this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null
host");
- this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
- LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
-
- } catch (Exception e) {
- LOG.warn("debugRssConf error: ", e);
- }
- }
-
-
@Override
public List<Event> initialize() throws Exception {
this.startTime = System.nanoTime();
@@ -137,12 +123,12 @@ public class RssUnorderedKVOutput extends
AbstractLogicalOutput {
getContext().requestInitialMemory(memRequestSize,
memoryUpdateCallbackHandler);
LOG.info("Got initialMemory.");
- getRssConf();
-
this.sendEmptyPartitionDetails = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+ this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+ this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
index 0eb26e0f..6cd8c1d2 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.GetShuffleServerRequest;
import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,6 +63,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
/**
* {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput}
which
@@ -112,18 +112,6 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
}
- private void getRssConf() {
- try {
- JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
- this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null
host");
- this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
- LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
- } catch (Exception e) {
- LOG.warn("debugRssConf error: ", e);
- }
- }
-
@Override
public List<Event> initialize() throws Exception {
this.startTime = System.nanoTime();
@@ -135,13 +123,12 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
getContext().requestInitialMemory(memRequestSize,
memoryUpdateCallbackHandler);
LOG.info("Got initialMemory.");
- getRssConf();
-
this.sendEmptyPartitionDetails = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
+ this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+ this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git
a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index abf03be2..a888baec 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -29,12 +29,15 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.util.StorageType;
+import static org.apache.tez.common.RssTezConfig.RSS_STORAGE_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -192,4 +195,28 @@ public class RssTezUtilsTest {
Integer[] expectPartitionArr = new Integer[]{0, 1, 2, 3, 4, 5};
assertTrue(Arrays.equals(expectPartitionArr,
rssWorker.keySet().toArray(new Integer[0])));
}
+
+ @Test
+ public void testApplyDynamicClientConf() {
+ Configuration conf = new Configuration(false);
+ conf.set("tez.config1", "value1");
+ conf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+ Map<String, String> dynamic = new HashMap<>();
+ dynamic.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+ dynamic.put("config2", "value2");
+ RssTezUtils.applyDynamicClientConf(conf, dynamic);
+ Assertions.assertEquals("value1", conf.get("tez.config1"));
+ Assertions.assertEquals("value2", conf.get("tez.config2"));
+ Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
+ }
+
+ @Test
+ public void testFilterRssConf() {
+ Configuration conf1 = new Configuration(false);
+ conf1.set("tez.config1", "value1");
+ conf1.set("config2", "value2");
+ Configuration conf2 = RssTezUtils.filterRssConf(conf1);
+ Assertions.assertEquals("value1", conf2.get("tez.config1"));
+ Assertions.assertNull(conf2.get("config2"));
+ }
}
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 72023953..535ba953 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -17,13 +17,16 @@
package org.apache.tez.dag.app;
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -41,13 +45,12 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
@@ -69,8 +72,13 @@ import
org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static org.apache.tez.common.RssTezConfig.RSS_STORAGE_TYPE;
+import static
org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES;
import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -78,37 +86,7 @@ import static org.mockito.Mockito.when;
public class RssDAGAppMasterTest {
@Test
- public void testAddAdditionalResource() throws TezException {
- DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.getDefaultInstance();
- List<DAGProtos.PlanLocalResource> originalResources =
dagPlan.getLocalResourceList();
- if (originalResources == null) {
- originalResources = new ArrayList<>();
- } else {
- originalResources = new ArrayList<>(originalResources);
- }
-
- DAGProtos.PlanLocalResource additionalResource =
DAGProtos.PlanLocalResource.newBuilder()
- .setName("rss_conf.xml")
- .setUri("/data1/test")
- .setSize(12)
- .setTimeStamp(System.currentTimeMillis())
- .setType(DAGProtos.PlanLocalResourceType.FILE)
- .setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION)
- .build();
-
- RssDAGAppMaster.addAdditionalResource(dagPlan, additionalResource);
- List<DAGProtos.PlanLocalResource> newResources =
dagPlan.getLocalResourceList();
-
- originalResources.add(additionalResource);
-
- assertEquals(originalResources.size(), newResources.size());
- for (int i = 0; i < originalResources.size(); i++) {
- assertEquals(originalResources.get(i), newResources.get(i));
- }
- }
-
- @Test
- public void testRenameRssIOClassName() throws Exception {
+ public void testHookAfterDagInited() throws Exception {
// 1 Init and mock some basic module
AppContext appContext = mock(AppContext.class);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
@@ -122,6 +100,20 @@ public class RssDAGAppMasterTest {
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
ACLManager aclManager = new ACLManager("amUser");
doReturn(aclManager).when(appContext).getAMACLManager();
+ RssDAGAppMaster appMaster = mock(RssDAGAppMaster.class);
+ TezRemoteShuffleManager shuffleManager =
mock(TezRemoteShuffleManager.class);
+ InetSocketAddress address = NetUtils.createSocketAddrForHost("host", 0);
+ when(shuffleManager.getAddress()).thenReturn(address);
+ when(appMaster.getTezRemoteShuffleManager()).thenReturn(shuffleManager);
+ Configuration clientConf = new Configuration(false);
+ clientConf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+ clientConf.set("tez.config1", "value1");
+ clientConf.set("config2", "value2");
+ Map<String, String> dynamicConf = new HashMap();
+ dynamicConf.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+ dynamicConf.put("tez.config3", "value3");
+ when(appMaster.getClusterClientConf()).thenReturn(dynamicConf);
+ when(appMaster.getConfig()).thenReturn(clientConf);
// 2 init dispatcher
AsyncDispatcher dispatcher = new AsyncDispatcher("core");
@@ -136,7 +128,7 @@ public class RssDAGAppMasterTest {
when(appContext.getCurrentDAG()).thenReturn(dagImpl);
// 4 register call back function
- RssDAGAppMaster.registerStateEnteredCallback(dagImpl);
+ RssDAGAppMaster.registerStateEnteredCallback(dagImpl, appMaster);
// 5 register DAGEvent, init and start dispatcher
EventHandler<DAGEvent> dagEventDispatcher = new EventHandler<DAGEvent>() {
@@ -164,19 +156,49 @@ public class RssDAGAppMasterTest {
verfiyInput(dagImpl, "vertex4", RssUnorderedKVInput.class.getName());
}
- public static void verfiyInput(DAGImpl dag, String name, String
expectedInputClassName) throws AMUserCodeException {
+ public static void verfiyInput(DAGImpl dag, String name, String
expectedInputClassName) throws Exception {
+ // 1 verfiy rename rss io class name
List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
Assertions.assertEquals(1, inputSpecs.size());
Assertions.assertEquals(expectedInputClassName,
inputSpecs.get(0).getInputDescriptor().getClassName());
+ // 2 verfiy the address and port of shuffle manager
+ UserPayload payload =
inputSpecs.get(0).getInputDescriptor().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+ Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+ Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+ // 3 verfiy the config
+ Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
+ Assertions.assertEquals("value1", conf.get("tez.config1"));
+ Assertions.assertEquals("value3", conf.get("tez.config3"));
+ Assertions.assertNull(conf.get("tez.config2"));
+ // TEZ_RUNTIME_IFILE_READAHEAD_BYTES is in getConfigurationKeySet, so the
config from client should deliver
+ // to Input/Output. But tez.config.from.client is not in
getConfigurationKeySet, so the config from client
+ // should not deliver to Input/Output.
+ Assertions.assertEquals(12345,
conf.getInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, -1));
+ Assertions.assertNull(conf.get("tez.config.from.client"));
}
- public static void verfiyOutput(DAGImpl dag, String name, String
expectedOutputClassName) throws AMUserCodeException {
+ public static void verfiyOutput(DAGImpl dag, String name, String
expectedOutputClassName) throws Exception {
+ // 1 verfiy rename rss io class name
List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
Assertions.assertEquals(1, outputSpecs.size());
Assertions.assertEquals(expectedOutputClassName,
outputSpecs.get(0).getOutputDescriptor().getClassName());
+ // 2 verfiy the address and port of shuffle manager
+ UserPayload payload =
outputSpecs.get(0).getOutputDescriptor().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+ Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+ Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+ // 3 verfiy the config
+ Assertions.assertEquals(StorageType.LOCALFILE.name(),
conf.get(RSS_STORAGE_TYPE));
+ Assertions.assertEquals("value1", conf.get("tez.config1"));
+ Assertions.assertEquals("value3", conf.get("tez.config3"));
+ Assertions.assertNull(conf.get("tez.config2"));
}
private static DAG createDAG(String dageName, Configuration conf) {
+ conf.setInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 12345);
+ conf.set("tez.config.from.client", "value.from.client");
+
DataSourceDescriptor dummyInput = DataSourceDescriptor.create(
InputDescriptor.create("dummyclass"),
InputInitializerDescriptor.create(""), null);
@@ -193,7 +215,8 @@ public class RssDAGAppMasterTest {
OrderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName(),
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
UnorderedKVEdgeConfig edgeConf23 =
- UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName()).build();
+ UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName())
+ .setFromConfiguration(conf).build();
UnorderedPartitionedKVEdgeConfig edgeConf34 =
UnorderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(),
NullWritable.class.getName(),
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
index 8d304e40..3af94aa0 100644
---
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
@@ -102,8 +102,8 @@ public class TezRemoteShuffleManagerTest {
tezRemoteShuffleManager.initialize();
tezRemoteShuffleManager.start();
- String host = tezRemoteShuffleManager.address.getHostString();
- int port = tezRemoteShuffleManager.address.getPort();
+ String host = tezRemoteShuffleManager.getAddress().getHostString();
+ int port = tezRemoteShuffleManager.getAddress().getPort();
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
String tokenIdentifier = appId.toString();