This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd2315c [#983] improvement(tez): Optimize tez client delivery 
configuration (#985)
3bd2315c is described below

commit 3bd2315cf2f2fb8c9a8f094c7d0c53b7d6a6116d
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jul 4 11:16:55 2023 +0800

    [#983] improvement(tez): Optimize tez client delivery configuration (#985)
    
    ### What changes were proposed in this pull request?
    
    Three improvement about configuration will be done in this issue.
    
    - 1 For now, tez client use rss_conf.xml to delivery configuration. As 
https://github.com/apache/incubator-uniffle/pull/966 is applied, we can 
delivery configuration by edge conf.
    - 2 delivery dynamic configuration from coordinator, then override the tez 
client configuration.
    - 3 delivery configuration from client side.
    
    ### Why are the changes needed?
    
    - 1. rss_conf.xml is unnecessary.
    - 2. dynamic configuration from coordinator are not applied.
    - 3. config in client side can not delivery to input/ouput.
    
    ### How was this patch tested?
    
    integration test, unit test, test in yarn cluster, test in tez local mode.
---
 .../java/org/apache/tez/common/RssTezConfig.java   |  10 +-
 .../java/org/apache/tez/common/RssTezUtils.java    |  36 +++++
 .../java/org/apache/tez/common/UmbilicalUtils.java |  25 +--
 .../org/apache/tez/dag/app/RssDAGAppMaster.java    | 171 ++++++++-------------
 .../tez/dag/app/TezRemoteShuffleManager.java       |   6 +-
 .../orderedgrouped/RssShuffleScheduler.java        |   3 +-
 .../output/RssOrderedPartitionedKVOutput.java      |  21 +--
 .../library/output/RssUnorderedKVOutput.java       |  22 +--
 .../output/RssUnorderedPartitionedKVOutput.java    |  21 +--
 .../org/apache/tez/common/RssTezUtilsTest.java     |  27 ++++
 .../apache/tez/dag/app/RssDAGAppMasterTest.java    | 101 +++++++-----
 .../tez/dag/app/TezRemoteShuffleManagerTest.java   |   4 +-
 12 files changed, 225 insertions(+), 222 deletions(-)

diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index 25a382c7..b575f2fc 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -18,7 +18,9 @@
 package org.apache.tez.common;
 
 import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.uniffle.client.util.RssClientConfig;
@@ -153,8 +155,6 @@ public class RssTezConfig {
   public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
       RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE;
 
-  public static final String RSS_CONF_FILE = "rss_conf.xml";
-
   public static final String RSS_REMOTE_STORAGE_PATH =
       TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
 
@@ -169,6 +169,12 @@ public class RssTezConfig {
 
   public static final String RSS_REDUCE_INITIAL_MEMORY = TEZ_RSS_CONFIG_PREFIX 
+ "rss.reduce.initial.memory";
 
+  public static final String RSS_ACCESS_TIMEOUT_MS = TEZ_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
+  public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = 
RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;
+
+  public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
+      ImmutableSet.of(RSS_STORAGE_TYPE, RSS_REMOTE_STORAGE_PATH);
+
   public static RssConf toRssConf(Configuration jobConf) {
     RssConf rssConf = new RssConf();
     for (Map.Entry<String, String> entry : jobConf) {
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index c980ddb8..81b334d3 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -393,4 +393,40 @@ public class RssTezUtils {
       return className;
     }
   }
+
+  public static void applyDynamicClientConf(Configuration conf, Map<String, 
String> confItems) {
+    if (conf == null) {
+      LOG.warn("Tez conf is null");
+      return;
+    }
+
+    if (confItems == null || confItems.isEmpty()) {
+      LOG.warn("Empty conf items");
+      return;
+    }
+
+    for (Map.Entry<String, String> kv : confItems.entrySet()) {
+      String tezConfKey = kv.getKey();
+      if (!tezConfKey.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
+        tezConfKey = RssTezConfig.TEZ_RSS_CONFIG_PREFIX + tezConfKey;
+      }
+      String tezConfVal = kv.getValue();
+      if (StringUtils.isEmpty(conf.get(tezConfKey, ""))
+          || RssTezConfig.RSS_MANDATORY_CLUSTER_CONF.contains(tezConfKey)) {
+        LOG.warn("Use conf dynamic conf {} = {}", tezConfKey, tezConfVal);
+        conf.set(tezConfKey, tezConfVal);
+      }
+    }
+  }
+
+  public static Configuration filterRssConf(Configuration extraConf) {
+    Configuration conf = new Configuration(false);
+    for (Map.Entry<String, String> entry : extraConf) {
+      String key = entry.getKey();
+      if (key.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
+        conf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return conf;
+  }
 }
diff --git a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java 
b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
index 152a013e..26e0dc31 100644
--- a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
@@ -23,11 +23,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -38,24 +35,15 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+
 public class UmbilicalUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(UmbilicalUtils.class);
 
   private UmbilicalUtils() {
   }
 
-  /**
-   *
-   * @return Get Application Master host and port from config file
-   */
-  private static Pair<String, Integer> getAmHostPort() {
-    JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
-    String host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null 
host");
-    int port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-    LOG.info("Got RssConf am info, host is: {}, port is: {}", host, port);
-    return new ImmutablePair<>(host, port);
-  }
-
   /**
    *
    * @param applicationId Application Id of this task
@@ -74,8 +62,9 @@ public class UmbilicalUtils {
             int shuffleId) throws IOException, InterruptedException, 
TezException {
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(applicationId.toString());
 
-    Pair<String, Integer> amHostPort = getAmHostPort();
-    final InetSocketAddress address = 
NetUtils.createSocketAddrForHost(amHostPort.getLeft(), amHostPort.getRight());
+    String host = conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+    int port = conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
     TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
         .doAs(new 
PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
           @Override
@@ -92,7 +81,7 @@ public class UmbilicalUtils {
         .getShuffleAssignmentsInfo()
         .getPartitionToServers();
     LOG.info("RequestShuffleServer applicationId:{}, taskAttemptId:{}, 
host:{}, port:{}, shuffleId:{}, worker:{}",
-        applicationId, taskAttemptId, amHostPort.getLeft(), 
amHostPort.getRight(), shuffleId, partitionToServers);
+        applicationId, taskAttemptId, host, port, shuffleId, 
partitionToServers);
     return partitionToServers;
   }
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index cb6146bf..6e024354 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -17,10 +17,9 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Executors;
@@ -29,12 +28,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -42,7 +35,6 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -54,13 +46,12 @@ import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.common.TezClassLoader;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
@@ -80,15 +71,15 @@ import org.apache.uniffle.common.util.ThreadUtils;
 
 import static org.apache.log4j.LogManager.CONFIGURATOR_CLASS_KEY;
 import static org.apache.log4j.LogManager.DEFAULT_CONFIGURATION_KEY;
-import static org.apache.tez.common.TezCommonUtils.TEZ_SYSTEM_SUB_DIR;
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
 
 public class RssDAGAppMaster extends DAGAppMaster {
   private static final Logger LOG = 
LoggerFactory.getLogger(RssDAGAppMaster.class);
   private ShuffleWriteClient shuffleWriteClient;
   private TezRemoteShuffleManager tezRemoteShuffleManager;
-  private static final String rssConfFileLocalResourceName = "rss_conf.xml";
+  private Map<String, String> clusterClientConf;
 
-  private DAGProtos.PlanLocalResource rssConfFileLocalResource;
   final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
           ThreadUtils.getThreadFactory("AppHeartbeat")
   );
@@ -101,6 +92,12 @@ public class RssDAGAppMaster extends DAGAppMaster {
             workingDirectory, localDirs, logDirs, clientVersion, credentials, 
jobUserName, pluginDescriptorProto);
   }
 
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    initAndStartRSSClient(this, conf);
+  }
+
   public ShuffleWriteClient getShuffleWriteClient() {
     return shuffleWriteClient;
   }
@@ -117,15 +114,17 @@ public class RssDAGAppMaster extends DAGAppMaster {
     this.tezRemoteShuffleManager = tezRemoteShuffleManager;
   }
 
+  public Map<String, String> getClusterClientConf() {
+    return clusterClientConf;
+  }
+
   /**
    * Init and Start Rss Client
    * @param appMaster
    * @param conf
-   * @param applicationAttemptId
    * @throws Exception
    */
-  public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, 
Configuration conf,
-          ApplicationAttemptId applicationAttemptId) throws Exception {
+  public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, 
Configuration conf) throws Exception {
 
     ShuffleWriteClient client = RssTezUtils.createShuffleClient(conf);
     appMaster.setShuffleWriteClient(client);
@@ -134,7 +133,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
     LOG.info("Registering coordinators {}", coordinators);
     client.registerCoordinators(coordinators);
 
-    String strAppAttemptId = applicationAttemptId.toString();
+    String strAppAttemptId = appMaster.getAttemptID().toString();
     long heartbeatInterval = conf.getLong(RssTezConfig.RSS_HEARTBEAT_INTERVAL,
             RssTezConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
     long heartbeatTimeout = conf.getLong(RssTezConfig.RSS_HEARTBEAT_TIMEOUT, 
heartbeatInterval / 2);
@@ -158,15 +157,13 @@ public class RssDAGAppMaster extends DAGAppMaster {
     appMaster.getTezRemoteShuffleManager().initialize();
     appMaster.getTezRemoteShuffleManager().start();
 
-    TezConfiguration extraConf = new TezConfiguration(false);
-    extraConf.clear();
-
-    String strAppId = applicationAttemptId.getApplicationId().toString();
-    extraConf.set(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS,
-            appMaster.getTezRemoteShuffleManager().address.getHostName());
-    extraConf.setInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT,
-            appMaster.getTezRemoteShuffleManager().address.getPort());
-    writeExtraConf(appMaster, conf, extraConf, strAppId);
+    // apply dynamic configuration
+    boolean dynamicConfEnabled = 
conf.getBoolean(RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
+        RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
+    if (dynamicConfEnabled) {
+      appMaster.clusterClientConf = client.fetchClientConf(
+          conf.getInt(RssTezConfig.RSS_ACCESS_TIMEOUT_MS, 
RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
+    }
 
     mayCloseTezSlowStart(conf);
   }
@@ -174,52 +171,10 @@ public class RssDAGAppMaster extends DAGAppMaster {
   @Override
   protected DAG createDAG(DAGProtos.DAGPlan dagPB) {
     DAGImpl dag = createDAG(dagPB, null);
-    registerStateEnteredCallback(dag);
+    registerStateEnteredCallback(dag, this);
     return dag;
   }
 
-  @Override
-  public String submitDAGToAppMaster(DAGProtos.DAGPlan dagPlan, Map<String, 
LocalResource> additionalResources)
-          throws TezException {
-
-    addAdditionalResource(dagPlan, getRssConfFileLocalResource());
-
-    return super.submitDAGToAppMaster(dagPlan, additionalResources);
-  }
-
-  public DAGProtos.PlanLocalResource getRssConfFileLocalResource() {
-    return rssConfFileLocalResource;
-  }
-
-  public static void addAdditionalResource(DAGProtos.DAGPlan dagPlan, 
DAGProtos.PlanLocalResource additionalResource)
-          throws TezException {
-    List<DAGProtos.PlanLocalResource> planLocalResourceList = 
dagPlan.getLocalResourceList();
-
-    if (planLocalResourceList == null) {
-      LOG.warn("planLocalResourceList is null, add new list");
-      planLocalResourceList = new ArrayList<>();
-    } else {
-      planLocalResourceList = new ArrayList<>(planLocalResourceList);
-    }
-
-    try {
-      planLocalResourceList.add(additionalResource);
-      Field field = DAGProtos.DAGPlan.class.getDeclaredField("localResource_");
-      field.setAccessible(true);
-      field.set(dagPlan, planLocalResourceList);
-      field.setAccessible(false);
-    } catch (Exception e) {
-      LOG.error("submitDAGToAppMaster reflect error", e);
-      throw new TezException(e.getMessage());
-    }
-
-    if (LOG.isDebugEnabled()) {
-      for (DAGProtos.PlanLocalResource localResource : 
dagPlan.getLocalResourceList()) {
-        LOG.debug("localResource: {}", localResource.toString());
-      }
-    }
-  }
-
   /**
    * main method
    * @param args
@@ -321,7 +276,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
         }
       }
 
-      initAndStartRSSClient(appMaster, conf, applicationAttemptId);
       initAndStartAppMaster(appMaster, conf);
     } catch (Throwable t) {
       LOG.error("Error starting RssDAGAppMaster", t);
@@ -329,37 +283,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
     }
   }
 
-  static void writeExtraConf(final RssDAGAppMaster appMaster, Configuration 
conf,
-          TezConfiguration extraConf, String strAppId) {
-    try {
-      Path baseStagingPath = TezCommonUtils.getTezBaseStagingPath(conf);
-      Path tezStagingDir = new Path(new Path(baseStagingPath, 
TEZ_SYSTEM_SUB_DIR), strAppId);
-
-      FileSystem fs = tezStagingDir.getFileSystem(conf);
-      Path rssConfFilePath = new Path(tezStagingDir, 
RssTezConfig.RSS_CONF_FILE);
-
-      try (FSDataOutputStream out =
-                   FileSystem.create(fs, rssConfFilePath,
-                           new 
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
-        extraConf.writeXml(out);
-      }
-      FileStatus rsrcStat = fs.getFileStatus(rssConfFilePath);
-
-      appMaster.rssConfFileLocalResource = 
DAGProtos.PlanLocalResource.newBuilder()
-              .setName(appMaster.rssConfFileLocalResourceName)
-              .setUri(rsrcStat.getPath().toString())
-              .setSize(rsrcStat.getLen())
-              .setTimeStamp(rsrcStat.getModificationTime())
-              .setType(DAGProtos.PlanLocalResourceType.FILE)
-              .setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION)
-              .build();
-      LOG.info("Upload extra conf success!");
-    } catch (Exception e) {
-      LOG.error("Upload extra conf exception!", e);
-      throw new RssException("Upload extra conf exception ", e);
-    }
-  }
-
   static void mayCloseTezSlowStart(Configuration conf) {
     if (!conf.getBoolean(RssTezConfig.RSS_AM_SLOW_START_ENABLE, 
RssTezConfig.RSS_AM_SLOW_START_ENABLE_DEFAULT)) {
       
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 
1.0f);
@@ -395,21 +318,41 @@ public class RssDAGAppMaster extends DAGAppMaster {
   }
 
   @VisibleForTesting
-  public static void registerStateEnteredCallback(DAGImpl dag) {
-    StateMachineTez
-        stateMachine = (StateMachineTez) getPrivateField(dag, "stateMachine");
-    stateMachine.registerStateEnteredCallback(DAGState.INITED, new 
DagInitialCallback());
+  public static void registerStateEnteredCallback(DAGImpl dag, RssDAGAppMaster 
appMaster) {
+    StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag, 
"stateMachine");
+    stateMachine.registerStateEnteredCallback(DAGState.INITED, new 
DagInitialCallback(appMaster));
   }
 
   static class DagInitialCallback implements OnStateChangedCallback<DAGState, 
DAGImpl> {
 
+    private RssDAGAppMaster appMaster;
+    
+    DagInitialCallback(RssDAGAppMaster appMaster) {
+      this.appMaster = appMaster;
+    } 
+
     @Override
     public void onStateChanged(DAGImpl dag, DAGState dagState) {
       try {
+        // get rss config from client
+        Configuration filterRssConf = 
RssTezUtils.filterRssConf(appMaster.getConfig());
         Map<String, Edge> edges = (Map<String, Edge>) getPrivateField(dag, 
"edges");
         for (Map.Entry<String, Edge> entry : edges.entrySet()) {
           Edge edge = entry.getValue();
 
+          // add user defined config to edge source conf
+          Configuration edgeSourceConf =
+              
TezUtils.createConfFromUserPayload(edge.getEdgeProperty().getEdgeSource().getUserPayload());
+          edgeSourceConf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS,
+              
this.appMaster.getTezRemoteShuffleManager().getAddress().getHostName());
+          edgeSourceConf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT,
+              
this.appMaster.getTezRemoteShuffleManager().getAddress().getPort());
+          edgeSourceConf.addResource(filterRssConf);
+          RssTezUtils.applyDynamicClientConf(edgeSourceConf, 
this.appMaster.getClusterClientConf());
+          edge.getEdgeProperty().getEdgeSource()
+              
.setUserPayload(TezUtils.createUserPayloadFromConf(edgeSourceConf));
+
+          // rename output class name
           OutputDescriptor outputDescriptor = 
edge.getEdgeProperty().getEdgeSource();
           Field outputClassNameField = 
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
           outputClassNameField.setAccessible(true);
@@ -417,6 +360,19 @@ public class RssDAGAppMaster extends DAGAppMaster {
           String rssOutputClassName = 
RssTezUtils.replaceRssOutputClassName(outputClassName);
           outputClassNameField.set(outputDescriptor, rssOutputClassName);
 
+          // add user defined config to edge destination conf
+          Configuration edgeDestinationConf =
+              
TezUtils.createConfFromUserPayload(edge.getEdgeProperty().getEdgeSource().getUserPayload());
+          edgeDestinationConf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS,
+              
this.appMaster.getTezRemoteShuffleManager().getAddress().getHostName());
+          edgeDestinationConf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT,
+              
this.appMaster.getTezRemoteShuffleManager().getAddress().getPort());
+          edgeDestinationConf.addResource(filterRssConf);
+          RssTezUtils.applyDynamicClientConf(edgeDestinationConf, 
this.appMaster.getClusterClientConf());
+          edge.getEdgeProperty().getEdgeDestination()
+              
.setUserPayload(TezUtils.createUserPayloadFromConf(edgeDestinationConf));
+
+          // rename input class name
           InputDescriptor inputDescriptor = 
edge.getEdgeProperty().getEdgeDestination();
           Field inputClassNameField = 
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
           inputClassNameField.setAccessible(true);
@@ -424,7 +380,8 @@ public class RssDAGAppMaster extends DAGAppMaster {
           String rssInputClassName = 
RssTezUtils.replaceRssInputClassName(inputClassName);
           outputClassNameField.set(inputDescriptor, rssInputClassName);
         }
-      } catch (Exception e) {
+      } catch (IOException | IllegalAccessException | NoSuchFieldException e) {
+        LOG.error("Reconfigure failed after dag was inited, caused by {}", e);
         throw new TezUncheckedException(e);
       }
     }
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 2fe9dc3f..09f33957 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -67,7 +67,7 @@ import static 
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER
 public class TezRemoteShuffleManager implements ServicePluginLifecycle {
   private static final Logger LOG = 
LoggerFactory.getLogger(TezRemoteShuffleManager.class);
 
-  protected InetSocketAddress address;
+  private InetSocketAddress address;
 
   protected volatile Server server;
   private String tokenIdentifier;
@@ -102,6 +102,10 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
     server.stop();
   }
 
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
   private class TezRemoteShuffleUmbilicalProtocolImpl implements 
TezRemoteShuffleUmbilicalProtocol {
     private Map<Integer, ShuffleAssignmentsInfo> shuffleIdToShuffleAssignsInfo 
= new HashMap<>();
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index c1d816f8..5af487b1 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1609,7 +1609,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
           shuffleServerInfoList,
           readerJobConf,
           new TezIdHelper(),
-          expectedTaskIdsBitmapFilterEnable);
+          expectedTaskIdsBitmapFilterEnable,
+          RssTezConfig.toRssConf(conf));
 
       ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
       RssTezShuffleDataFetcher fetcher = new RssTezShuffleDataFetcher(
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
index 5ccb9ad8..ac7a4a62 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.GetShuffleServerRequest;
 import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,7 +63,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 
-
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
 
 /**
  * {@link RssOrderedPartitionedKVOutput} is an {@link AbstractLogicalOutput} 
which
@@ -112,18 +111,6 @@ public class RssOrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     LOG.info("Initialized RssOrderedPartitionedKVOutput.");
   }
 
-  private void getRssConf() {
-    try {
-      JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
-      this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null 
host");
-      this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
-      LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
-    } catch (Exception e) {
-      LOG.warn("debugRssConf error: ", e);
-    }
-  }
-
   @Override
   public List<Event> initialize() throws Exception {
     this.startTime = System.nanoTime();
@@ -135,12 +122,12 @@ public class RssOrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     getContext().requestInitialMemory(memRequestSize, 
memoryUpdateCallbackHandler);
     LOG.info("Got initialMemory.");
 
-    getRssConf();
-
     this.sendEmptyPartitionDetails = conf.getBoolean(
         
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
         
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
 
+    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
     final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
 
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index b1cef02e..b40ef8e7 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.GetShuffleServerRequest;
 import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,7 +63,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 
-
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
 
 /**
  * {@link RssUnorderedKVOutput} is an {@link AbstractLogicalOutput} which
@@ -113,19 +112,6 @@ public class RssUnorderedKVOutput extends 
AbstractLogicalOutput {
     LOG.info("Initialized RssUnorderedKVOutput.");
   }
 
-  private void getRssConf() {
-    try {
-      JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
-      this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null 
host");
-      this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-      LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
-
-    } catch (Exception e) {
-      LOG.warn("debugRssConf error: ", e);
-    }
-  }
-
-
   @Override
   public List<Event> initialize() throws Exception {
     this.startTime = System.nanoTime();
@@ -137,12 +123,12 @@ public class RssUnorderedKVOutput extends 
AbstractLogicalOutput {
     getContext().requestInitialMemory(memRequestSize, 
memoryUpdateCallbackHandler);
     LOG.info("Got initialMemory.");
 
-    getRssConf();
-
     this.sendEmptyPartitionDetails = conf.getBoolean(
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
 
+    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
     final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
 
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
index 0eb26e0f..6cd8c1d2 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.GetShuffleServerRequest;
 import org.apache.tez.common.GetShuffleServerResponse;
-import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
@@ -65,6 +63,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
 
 /**
  * {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput} 
which
@@ -112,18 +112,6 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
   }
 
-  private void getRssConf() {
-    try {
-      JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
-      this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null 
host");
-      this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
-      LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
-    } catch (Exception e) {
-      LOG.warn("debugRssConf error: ", e);
-    }
-  }
-
   @Override
   public List<Event> initialize() throws Exception {
     this.startTime = System.nanoTime();
@@ -135,13 +123,12 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     getContext().requestInitialMemory(memRequestSize, 
memoryUpdateCallbackHandler);
     LOG.info("Got initialMemory.");
 
-    getRssConf();
-
     this.sendEmptyPartitionDetails = conf.getBoolean(
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
 
-
+    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
+    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
     final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
 
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(this.applicationId.toString());
diff --git 
a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java 
b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index abf03be2..a888baec 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -29,12 +29,15 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.util.StorageType;
 
+import static org.apache.tez.common.RssTezConfig.RSS_STORAGE_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -192,4 +195,28 @@ public class RssTezUtilsTest {
     Integer[] expectPartitionArr = new Integer[]{0, 1, 2, 3, 4, 5};
     assertTrue(Arrays.equals(expectPartitionArr, 
rssWorker.keySet().toArray(new Integer[0])));
   }
+
+  @Test
+  public void testApplyDynamicClientConf() {
+    Configuration conf = new Configuration(false);
+    conf.set("tez.config1", "value1");
+    conf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+    Map<String, String> dynamic = new HashMap<>();
+    dynamic.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    dynamic.put("config2", "value2");
+    RssTezUtils.applyDynamicClientConf(conf, dynamic);
+    Assertions.assertEquals("value1", conf.get("tez.config1"));
+    Assertions.assertEquals("value2", conf.get("tez.config2"));
+    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
+  }
+
+  @Test
+  public void testFilterRssConf() {
+    Configuration conf1 = new Configuration(false);
+    conf1.set("tez.config1", "value1");
+    conf1.set("config2", "value2");
+    Configuration conf2 = RssTezUtils.filterRssConf(conf1);
+    Assertions.assertEquals("value1", conf2.get("tez.config1"));
+    Assertions.assertNull(conf2.get("config2"));
+  }
 }
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java 
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 72023953..535ba953 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -17,13 +17,16 @@
 
 package org.apache.tez.dag.app;
 
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -41,13 +45,12 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
@@ -69,8 +72,13 @@ import 
org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.storage.util.StorageType;
+
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static org.apache.tez.common.RssTezConfig.RSS_STORAGE_TYPE;
+import static 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES;
 import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -78,37 +86,7 @@ import static org.mockito.Mockito.when;
 public class RssDAGAppMasterTest {
 
   @Test
-  public void testAddAdditionalResource() throws TezException {
-    DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.getDefaultInstance();
-    List<DAGProtos.PlanLocalResource> originalResources = 
dagPlan.getLocalResourceList();
-    if (originalResources == null) {
-      originalResources = new ArrayList<>();
-    } else {
-      originalResources = new ArrayList<>(originalResources);
-    }
-
-    DAGProtos.PlanLocalResource additionalResource = 
DAGProtos.PlanLocalResource.newBuilder()
-            .setName("rss_conf.xml")
-            .setUri("/data1/test")
-            .setSize(12)
-            .setTimeStamp(System.currentTimeMillis())
-            .setType(DAGProtos.PlanLocalResourceType.FILE)
-            .setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION)
-            .build();
-
-    RssDAGAppMaster.addAdditionalResource(dagPlan, additionalResource);
-    List<DAGProtos.PlanLocalResource> newResources = 
dagPlan.getLocalResourceList();
-
-    originalResources.add(additionalResource);
-
-    assertEquals(originalResources.size(), newResources.size());
-    for (int i = 0; i < originalResources.size(); i++) {
-      assertEquals(originalResources.get(i), newResources.get(i));
-    }
-  }
-
-  @Test
-  public void testRenameRssIOClassName() throws Exception {
+  public void testHookAfterDagInited() throws Exception {
     // 1 Init and mock some basic module
     AppContext appContext = mock(AppContext.class);
     ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
@@ -122,6 +100,20 @@ public class RssDAGAppMasterTest {
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     ACLManager aclManager = new ACLManager("amUser");
     doReturn(aclManager).when(appContext).getAMACLManager();
+    RssDAGAppMaster appMaster = mock(RssDAGAppMaster.class);
+    TezRemoteShuffleManager shuffleManager = 
mock(TezRemoteShuffleManager.class);
+    InetSocketAddress address = NetUtils.createSocketAddrForHost("host", 0);
+    when(shuffleManager.getAddress()).thenReturn(address);
+    when(appMaster.getTezRemoteShuffleManager()).thenReturn(shuffleManager);
+    Configuration clientConf = new Configuration(false);
+    clientConf.set(RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+    clientConf.set("tez.config1", "value1");
+    clientConf.set("config2", "value2");
+    Map<String, String> dynamicConf = new HashMap();
+    dynamicConf.put(RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    dynamicConf.put("tez.config3", "value3");
+    when(appMaster.getClusterClientConf()).thenReturn(dynamicConf);
+    when(appMaster.getConfig()).thenReturn(clientConf);
 
     // 2 init dispatcher
     AsyncDispatcher dispatcher = new AsyncDispatcher("core");
@@ -136,7 +128,7 @@ public class RssDAGAppMasterTest {
     when(appContext.getCurrentDAG()).thenReturn(dagImpl);
 
     // 4 register call back function
-    RssDAGAppMaster.registerStateEnteredCallback(dagImpl);
+    RssDAGAppMaster.registerStateEnteredCallback(dagImpl, appMaster);
 
     // 5 register DAGEvent, init and start dispatcher
     EventHandler<DAGEvent> dagEventDispatcher = new EventHandler<DAGEvent>() {
@@ -164,19 +156,49 @@ public class RssDAGAppMasterTest {
     verfiyInput(dagImpl, "vertex4", RssUnorderedKVInput.class.getName());
   }
 
-  public static void verfiyInput(DAGImpl dag, String name, String 
expectedInputClassName) throws AMUserCodeException {
+  public static void verfiyInput(DAGImpl dag, String name, String 
expectedInputClassName) throws Exception {
+    // 1 verfiy rename rss io class name
     List<InputSpec> inputSpecs = dag.getVertex(name).getInputSpecList(0);
     Assertions.assertEquals(1, inputSpecs.size());
     Assertions.assertEquals(expectedInputClassName, 
inputSpecs.get(0).getInputDescriptor().getClassName());
+    // 2 verfiy the address and port of shuffle manager 
+    UserPayload payload = 
inputSpecs.get(0).getInputDescriptor().getUserPayload();
+    Configuration conf = TezUtils.createConfFromUserPayload(payload);
+    Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+    Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+    // 3 verfiy the config
+    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
+    Assertions.assertEquals("value1", conf.get("tez.config1"));
+    Assertions.assertEquals("value3", conf.get("tez.config3"));
+    Assertions.assertNull(conf.get("tez.config2"));
+    // TEZ_RUNTIME_IFILE_READAHEAD_BYTES is in getConfigurationKeySet, so the 
config from client should deliver
+    // to Input/Output. But tez.config.from.client is not in 
getConfigurationKeySet, so the config from client
+    // should not deliver to Input/Output.
+    Assertions.assertEquals(12345, 
conf.getInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, -1));
+    Assertions.assertNull(conf.get("tez.config.from.client"));
   }
 
-  public static void verfiyOutput(DAGImpl dag, String name, String 
expectedOutputClassName) throws AMUserCodeException {
+  public static void verfiyOutput(DAGImpl dag, String name, String 
expectedOutputClassName) throws Exception {
+    // 1 verfiy rename rss io class name
     List<OutputSpec> outputSpecs = dag.getVertex(name).getOutputSpecList(0);
     Assertions.assertEquals(1, outputSpecs.size());
     Assertions.assertEquals(expectedOutputClassName, 
outputSpecs.get(0).getOutputDescriptor().getClassName());
+    // 2 verfiy the address and port of shuffle manager
+    UserPayload payload = 
outputSpecs.get(0).getOutputDescriptor().getUserPayload();
+    Configuration conf = TezUtils.createConfFromUserPayload(payload);
+    Assertions.assertEquals("host", conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS));
+    Assertions.assertEquals(0, conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1));
+    // 3 verfiy the config
+    Assertions.assertEquals(StorageType.LOCALFILE.name(), 
conf.get(RSS_STORAGE_TYPE));
+    Assertions.assertEquals("value1", conf.get("tez.config1"));
+    Assertions.assertEquals("value3", conf.get("tez.config3"));
+    Assertions.assertNull(conf.get("tez.config2"));
   }
 
   private static DAG createDAG(String dageName, Configuration conf) {
+    conf.setInt(TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 12345);
+    conf.set("tez.config.from.client", "value.from.client");
+
     DataSourceDescriptor dummyInput = DataSourceDescriptor.create(
         InputDescriptor.create("dummyclass"), 
InputInitializerDescriptor.create(""), null);
 
@@ -193,7 +215,8 @@ public class RssDAGAppMasterTest {
         
OrderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName(),
             
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
     UnorderedKVEdgeConfig edgeConf23 =
-        UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName()).build();
+        UnorderedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName())
+            .setFromConfiguration(conf).build();
     UnorderedPartitionedKVEdgeConfig edgeConf34 =
         
UnorderedPartitionedKVEdgeConfig.newBuilder(NullWritable.class.getName(), 
NullWritable.class.getName(),
             
HashPartitioner.class.getName()).setFromConfiguration(conf).build();
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
index 8d304e40..3af94aa0 100644
--- 
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
@@ -102,8 +102,8 @@ public class TezRemoteShuffleManagerTest {
       tezRemoteShuffleManager.initialize();
       tezRemoteShuffleManager.start();
 
-      String host = tezRemoteShuffleManager.address.getHostString();
-      int port = tezRemoteShuffleManager.address.getPort();
+      String host = tezRemoteShuffleManager.getAddress().getHostString();
+      int port = tezRemoteShuffleManager.getAddress().getPort();
       final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
 
       String tokenIdentifier = appId.toString();


Reply via email to