This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b30491c [#1051] fix(mr): Ensure configurations in both 
mapred-site.xml and dynamic_client.conf take effect. (#1112)
9b30491c is described below

commit 9b30491c134d38694c2e292c15bc602fda6967a3
Author: SevenAddSix <[email protected]>
AuthorDate: Thu Aug 17 19:53:25 2023 +0800

    [#1051] fix(mr): Ensure configurations in both mapred-site.xml and 
dynamic_client.conf take effect. (#1112)
    
    ### What changes were proposed in this pull request?
    Ensure configurations in both mapred-site.xml and dynamic_client.conf take 
effect.
    
    ### Why are the changes needed?
    Fix: #1051
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    tested in our cluster and UT
    
    Co-authored-by: qijiale <[email protected]>
---
 .../hadoop/mapred/RssMapOutputCollector.java       |  17 +---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   | 104 ++++++++++-----------
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |  77 ++++++++++-----
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   |  19 +---
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  44 +++++----
 .../apache/hadoop/mapreduce/RssMRUtilsTest.java    |   4 +-
 .../{LargeSorterTest.java => DynamicConfTest.java} |  29 +++---
 .../{LargeSorterTest.java => HadoopConfTest.java}  |  30 +++---
 .../org/apache/uniffle/test/LargeSorterTest.java   |  18 +---
 .../apache/uniffle/test/MRIntegrationTestBase.java |  28 +++++-
 .../org/apache/uniffle/test/SecondarySortTest.java |  17 +---
 .../org/apache/uniffle/test/WordCountTest.java     |  17 +---
 12 files changed, 194 insertions(+), 210 deletions(-)

diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 06351981..1066960c 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -67,11 +68,10 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
     }
     partitions = mrJobConf.getNumReduceTasks();
     MapTask mapTask = context.getMapTask();
-    JobConf rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
+    Configuration rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
     double sortThreshold =
         RssMRUtils.getDouble(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
             RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
     if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) {
@@ -81,14 +81,12 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
     int batch =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
             RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
     RawComparator<K> comparator = mrJobConf.getOutputKeyComparator();
     double memoryThreshold =
         RssMRUtils.getDouble(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD,
             RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
     ApplicationAttemptId applicationAttemptId = 
RssMRUtils.getApplicationAttemptId();
@@ -99,30 +97,26 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
     double sendThreshold =
         RssMRUtils.getDouble(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SEND_THRESHOLD,
             RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
 
     long sendCheckInterval =
         RssMRUtils.getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);
     long sendCheckTimeout =
         RssMRUtils.getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
     int bitmapSplitNum =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_BITMAP_NUM,
             RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
     int numMaps = mrJobConf.getNumMapTasks();
-    String storageType = RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_STORAGE_TYPE);
+    String storageType = RssMRUtils.getString(rssJobConf, 
RssMRConfig.RSS_STORAGE_TYPE);
     if (StringUtils.isEmpty(storageType)) {
       throw new RssException("storage type mustn't be empty");
     }
@@ -133,19 +127,16 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
     long maxSegmentSize =
         RssMRUtils.getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE,
             RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE);
     int sendThreadNum =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM,
             RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
     long maxBufferSize =
         RssMRUtils.getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_WRITER_BUFFER_SIZE,
             RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
     shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
@@ -177,7 +168,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
             RssMRConfig.toRssConf(rssJobConf));
   }
 
-  private Map<Integer, List<ShuffleServerInfo>> createAssignmentMap(JobConf 
jobConf) {
+  private Map<Integer, List<ShuffleServerInfo>> 
createAssignmentMap(Configuration jobConf) {
     Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
Maps.newHashMap();
     for (int i = 0; i < partitions; i++) {
       String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX + i);
diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 251c6204..004aa4bc 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -21,126 +21,124 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.config.RssConf;
 
 public class RssMRConfig {
 
-  public static final String MR_RSS_CONFIG_PREFIX = "mapreduce.";
+  public static final String MR_CONFIG_PREFIX = "mapreduce.";
+
+  public static final String MR_RSS_CONFIG_PREFIX = "mapreduce.rss.";
+
   public static final String RSS_CLIENT_HEARTBEAT_THREAD_NUM =
-      MR_RSS_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
+      MR_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
   public static final int RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE = 4;
-  public static final String RSS_CLIENT_TYPE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE;
+  public static final String RSS_CLIENT_TYPE = MR_CONFIG_PREFIX + 
RssClientConfig.RSS_CLIENT_TYPE;
   public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE;
   public static final String RSS_CLIENT_RETRY_MAX =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
   public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE;
   public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
   public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE;
   public static final String RSS_COORDINATOR_QUORUM =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
-  public static final String RSS_DATA_REPLICA =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
+  public static final String RSS_DATA_REPLICA = MR_CONFIG_PREFIX + 
RssClientConfig.RSS_DATA_REPLICA;
   public static final int RSS_DATA_REPLICA_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_WRITE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
   public static final int RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_READ =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
   public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
   public static final String RSS_DATA_TRANSFER_POOL_SIZE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
   public static final String RSS_DATA_COMMIT_POOL_SIZE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
   public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
 
   public static final String RSS_CLIENT_SEND_THREAD_NUM =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
   public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
       RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM;
   public static final String RSS_CLIENT_SEND_THRESHOLD =
-      MR_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
+      MR_CONFIG_PREFIX + "rss.client.send.threshold";
   public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
   public static final String RSS_HEARTBEAT_INTERVAL =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
   public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE =
       RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE;
   public static final String RSS_HEARTBEAT_TIMEOUT =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
-  public static final String RSS_ASSIGNMENT_PREFIX =
-      MR_RSS_CONFIG_PREFIX + "rss.assignment.partition.";
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
+  public static final String RSS_ASSIGNMENT_PREFIX = MR_CONFIG_PREFIX + 
"rss.assignment.partition.";
   public static final String RSS_CLIENT_BATCH_TRIGGER_NUM =
-      MR_RSS_CONFIG_PREFIX + "rss.client.batch.trigger.num";
+      MR_CONFIG_PREFIX + "rss.client.batch.trigger.num";
   public static final int RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM = 50;
   public static final String RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD =
-      MR_RSS_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold";
+      MR_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold";
   public static final String RSS_WRITER_BUFFER_SIZE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE;
   public static final long RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE = 1024 * 1024 
* 14;
   public static final double RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD = 
0.9f;
   public static final String RSS_CLIENT_MEMORY_THRESHOLD =
-      MR_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
+      MR_CONFIG_PREFIX + "rss.client.memory.threshold";
   public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
   public static final String RSS_CLIENT_SEND_CHECK_INTERVAL_MS =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS;
   public static final long RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE;
   public static final String RSS_CLIENT_SEND_CHECK_TIMEOUT_MS =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS;
   public static final long RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE;
-  public static final String RSS_CLIENT_BITMAP_NUM = MR_RSS_CONFIG_PREFIX + 
"rss.client.bitmap.num";
+  public static final String RSS_CLIENT_BITMAP_NUM = MR_CONFIG_PREFIX + 
"rss.client.bitmap.num";
   public static final int RSS_CLIENT_DEFAULT_BITMAP_NUM = 1;
   public static final String RSS_CLIENT_MAX_SEGMENT_SIZE =
-      MR_RSS_CONFIG_PREFIX + "rss.client.max.buffer.size";
+      MR_CONFIG_PREFIX + "rss.client.max.buffer.size";
   public static final long RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE = 3 * 1024;
-  public static final String RSS_STORAGE_TYPE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
+  public static final String RSS_STORAGE_TYPE = MR_CONFIG_PREFIX + 
RssClientConfig.RSS_STORAGE_TYPE;
 
   public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED =
-      MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.enable";
+      MR_CONFIG_PREFIX + "rss.reduce.remote.spill.enable";
   public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false;
   public static final String RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC =
-      MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc";
+      MR_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc";
   public static final int RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT = 1;
   public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION =
-      MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.replication";
+      MR_CONFIG_PREFIX + "rss.reduce.remote.spill.replication";
   public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1;
   public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES =
-      MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.retries";
+      MR_CONFIG_PREFIX + "rss.reduce.remote.spill.retries";
   public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
 
   public static final String RSS_PARTITION_NUM_PER_RANGE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
   public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
       RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE;
   public static final String RSS_REMOTE_STORAGE_PATH =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
-  public static final String RSS_REMOTE_STORAGE_CONF =
-      MR_RSS_CONFIG_PREFIX + "rss.remote.storage.conf";
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+  public static final String RSS_REMOTE_STORAGE_CONF = MR_CONFIG_PREFIX + 
"rss.remote.storage.conf";
   public static final String RSS_INDEX_READ_LIMIT =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT;
   public static final int RSS_INDEX_READ_LIMIT_DEFAULT_VALUE =
       RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE;
   public static final String RSS_CLIENT_READ_BUFFER_SIZE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE;
 
   // When the size of read buffer reaches the half of JVM region (i.e., 32m),
   // it will incur humongous allocation, so we set it to 14m.
@@ -148,16 +146,16 @@ public class RssMRConfig {
       RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE;
 
   public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
   public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE;
   public static final String RSS_ACCESS_TIMEOUT_MS =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
   public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE =
       RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;
 
   public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
 
   public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
       RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
@@ -165,27 +163,27 @@ public class RssMRConfig {
       
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;
 
   public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
-      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
   public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE;
   public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
   public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
       RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;
 
   public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
-      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
+      MR_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
   public static final boolean 
RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;
 
   public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
-      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
+      MR_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
 
   public static final double 
RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE =
       
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;
 
   public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
-      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
+      MR_CONFIG_PREFIX + 
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
   public static final int 
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
       RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
 
@@ -196,16 +194,16 @@ public class RssMRConfig {
 
   // Whether enable test mode for the MR Client
   public static final String RSS_TEST_MODE_ENABLE =
-      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
 
-  public static RssConf toRssConf(JobConf jobConf) {
+  public static RssConf toRssConf(Configuration jobConf) {
     RssConf rssConf = new RssConf();
     for (Map.Entry<String, String> entry : jobConf) {
       String key = entry.getKey();
-      if (!key.startsWith(MR_RSS_CONFIG_PREFIX)) {
+      if (!key.startsWith(MR_CONFIG_PREFIX)) {
         continue;
       }
-      key = key.substring(MR_RSS_CONFIG_PREFIX.length());
+      key = key.substring(MR_CONFIG_PREFIX.length());
       rssConf.setString(key, entry.getValue());
     }
     return rssConf;
diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 31ce99dc..67ca72b5 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -18,11 +18,14 @@
 package org.apache.hadoop.mapreduce;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -128,7 +131,7 @@ public class RssMRUtils {
     return client;
   }
 
-  public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int 
reduceID) {
+  public static Set<ShuffleServerInfo> getAssignedServers(Configuration 
jobConf, int reduceID) {
     String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX + 
String.valueOf(reduceID));
     String[] splitServers = servers.split(",");
     Set<ShuffleServerInfo> assignServers = Sets.newHashSet();
@@ -142,7 +145,41 @@ public class RssMRUtils {
     return containerId.getApplicationAttemptId();
   }
 
-  public static void applyDynamicClientConf(JobConf jobConf, Map<String, 
String> confItems) {
+  public static void applyClientConf(Configuration jobConf, JobConf mrJobConf) 
{
+
+    if (jobConf == null) {
+      LOG.warn("Job conf is null");
+      return;
+    }
+
+    if (mrJobConf == null) {
+      LOG.warn("Empty conf items");
+      return;
+    }
+
+    Iterator<Map.Entry<String, String>> iterator = mrJobConf.iterator();
+    Map<String, String> confItems = new HashMap<>();
+
+    while (iterator.hasNext()) {
+      Map.Entry<String, String> entry = iterator.next();
+      String key = entry.getKey();
+      if (!key.startsWith(RssMRConfig.MR_RSS_CONFIG_PREFIX)) {
+        continue;
+      }
+      confItems.put(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, String> kv : confItems.entrySet()) {
+      String mrConfKey = kv.getKey();
+      String mrConfVal = kv.getValue();
+      if (StringUtils.isEmpty(jobConf.get(mrConfKey, ""))) {
+        LOG.warn("Use conf client conf {} = {}", mrConfKey, mrConfVal);
+        jobConf.set(mrConfKey, mrConfVal);
+      }
+    }
+  }
+
+  public static void applyDynamicClientConf(Configuration jobConf, Map<String, 
String> confItems) {
     if (jobConf == null) {
       LOG.warn("Job conf is null");
       return;
@@ -155,8 +192,8 @@ public class RssMRUtils {
 
     for (Map.Entry<String, String> kv : confItems.entrySet()) {
       String mrConfKey = kv.getKey();
-      if (!mrConfKey.startsWith(RssMRConfig.MR_RSS_CONFIG_PREFIX)) {
-        mrConfKey = RssMRConfig.MR_RSS_CONFIG_PREFIX + mrConfKey;
+      if (!mrConfKey.startsWith(RssMRConfig.MR_CONFIG_PREFIX)) {
+        mrConfKey = RssMRConfig.MR_CONFIG_PREFIX + mrConfKey;
       }
       String mrConfVal = kv.getValue();
       if (StringUtils.isEmpty(jobConf.get(mrConfKey, ""))
@@ -167,31 +204,28 @@ public class RssMRUtils {
     }
   }
 
-  public static int getInt(JobConf rssJobConf, JobConf mrJobCOnf, String key, 
int defaultValue) {
-    return rssJobConf.getInt(key, mrJobCOnf.getInt(key, defaultValue));
+  public static int getInt(Configuration rssJobConf, String key, int 
defaultValue) {
+    return rssJobConf.getInt(key, defaultValue);
   }
 
-  public static long getLong(JobConf rssJobConf, JobConf mrJobConf, String 
key, long defaultValue) {
-    return rssJobConf.getLong(key, mrJobConf.getLong(key, defaultValue));
+  public static long getLong(Configuration rssJobConf, String key, long 
defaultValue) {
+    return rssJobConf.getLong(key, defaultValue);
   }
 
-  public static boolean getBoolean(
-      JobConf rssJobConf, JobConf mrJobConf, String key, boolean defaultValue) 
{
-    return rssJobConf.getBoolean(key, mrJobConf.getBoolean(key, defaultValue));
+  public static boolean getBoolean(Configuration rssJobConf, String key, 
boolean defaultValue) {
+    return rssJobConf.getBoolean(key, defaultValue);
   }
 
-  public static double getDouble(
-      JobConf rssJobConf, JobConf mrJobConf, String key, double defaultValue) {
-    return rssJobConf.getDouble(key, mrJobConf.getDouble(key, defaultValue));
+  public static double getDouble(Configuration rssJobConf, String key, double 
defaultValue) {
+    return rssJobConf.getDouble(key, defaultValue);
   }
 
-  public static String getString(JobConf rssJobConf, JobConf mrJobConf, String 
key) {
-    return rssJobConf.get(key, mrJobConf.get(key));
+  public static String getString(Configuration rssJobConf, String key) {
+    return rssJobConf.get(key, "");
   }
 
-  public static String getString(
-      JobConf rssJobConf, JobConf mrJobConf, String key, String defaultValue) {
-    return rssJobConf.get(key, mrJobConf.get(key, defaultValue));
+  public static String getString(Configuration rssJobConf, String key, String 
defaultValue) {
+    return rssJobConf.get(key, defaultValue);
   }
 
   public static long getBlockId(long partitionId, long taskAttemptId, int 
nextSeqNo) {
@@ -283,23 +317,20 @@ public class RssMRUtils {
     return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
 
-  public static void validateRssClientConf(JobConf rssJobConf, JobConf 
mrJobConf) {
+  public static void validateRssClientConf(Configuration rssJobConf) {
     int retryMax =
         getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_RETRY_MAX,
             RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
     long retryIntervalMax =
         getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
             RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     long sendCheckTimeout =
         getLong(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
             RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
     if (retryIntervalMax * retryMax > sendCheckTimeout) {
diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 6231000c..47d06154 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
@@ -56,7 +57,7 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
 
   private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
   private JobConf mrJobConf;
-  private JobConf rssJobConf;
+  private Configuration rssJobConf;
   private Reporter reporter;
   private ShuffleClientMetrics metrics;
   private TaskUmbilicalProtocol umbilical;
@@ -102,33 +103,26 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
     this.replicaWrite =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_DATA_REPLICA_WRITE,
             RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
     this.replicaRead =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_DATA_REPLICA_READ,
             RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
     this.replica =
         RssMRUtils.getInt(
-            rssJobConf,
-            mrJobConf,
-            RssMRConfig.RSS_DATA_REPLICA,
-            RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+            rssJobConf, RssMRConfig.RSS_DATA_REPLICA, 
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
 
     this.partitionNum = mrJobConf.getNumReduceTasks();
     this.partitionNumPerRange =
         RssMRUtils.getInt(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
             RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
-    this.basePath =
-        RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
+    this.basePath = RssMRUtils.getString(rssJobConf, 
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
     String remoteStorageConf =
-        RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
+        RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, 
"");
     this.remoteStorageInfo = new RemoteStorageInfo(basePath, 
remoteStorageConf);
     this.merger = createMergeManager(context);
   }
@@ -137,7 +131,6 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
     boolean useRemoteSpill =
         RssMRUtils.getBoolean(
             rssJobConf,
-            mrJobConf,
             RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
             RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT);
     if (useRemoteSpill) {
@@ -146,13 +139,11 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
       int replication =
           RssMRUtils.getInt(
               rssJobConf,
-              mrJobConf,
               RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
               RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
       int retries =
           RssMRUtils.getInt(
               rssJobConf,
-              mrJobConf,
               RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
               RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
       return new RssRemoteMergeManagerImpl(
diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index d3fe6b09..a631696c 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -143,19 +144,6 @@ public class RssMRAppMaster extends MRAppMaster {
       LOG.info("Registering coordinators {}", coordinators);
       client.registerCoordinators(coordinators);
 
-      // Get the configured server assignment tags and it will also add 
default shuffle version tag.
-      Set<String> assignmentTags = new HashSet<>();
-      String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
-      if (StringUtils.isNotEmpty(rawTags)) {
-        rawTags = rawTags.trim();
-        assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
-      }
-      assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
-      String clientType =
-          conf.get(RssMRConfig.RSS_CLIENT_TYPE, 
RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
-      ClientUtils.validateClientType(clientType);
-      assignmentTags.add(clientType);
-
       final ScheduledExecutorService scheduledExecutorService =
           Executors.newSingleThreadScheduledExecutor(
               new ThreadFactory() {
@@ -167,9 +155,13 @@ public class RssMRAppMaster extends MRAppMaster {
                 }
               });
 
-      JobConf extraConf = new JobConf();
+      // set loadDefaults to false, rss_conf.xml should only contain conf of 
RSS,
+      // Hadoop conf is not necessary.
+      Configuration extraConf = new JobConf(false);
       extraConf.clear();
 
+      RssMRUtils.applyClientConf(extraConf, conf);
+
       // get remote storage from coordinator if necessary
       boolean dynamicConfEnabled =
           conf.getBoolean(
@@ -186,21 +178,33 @@ public class RssMRAppMaster extends MRAppMaster {
         RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf);
       }
 
-      String storageType = RssMRUtils.getString(extraConf, conf, 
RssMRConfig.RSS_STORAGE_TYPE);
-      boolean testMode =
-          RssMRUtils.getBoolean(extraConf, conf, 
RssMRConfig.RSS_TEST_MODE_ENABLE, false);
+      // Get the configured server assignment tags and it will also add 
default shuffle version tag.
+      Set<String> assignmentTags = new HashSet<>();
+      String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
+      if (StringUtils.isNotEmpty(rawTags)) {
+        rawTags = rawTags.trim();
+        assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
+      }
+      assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
+      String clientType =
+          extraConf.get(RssMRConfig.RSS_CLIENT_TYPE, 
RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+      ClientUtils.validateClientType(clientType);
+      assignmentTags.add(clientType);
+
+      String storageType = RssMRUtils.getString(extraConf, 
RssMRConfig.RSS_STORAGE_TYPE);
+      boolean testMode = RssMRUtils.getBoolean(extraConf, 
RssMRConfig.RSS_TEST_MODE_ENABLE, false);
       ClientUtils.validateTestModeConf(testMode, storageType);
       ApplicationAttemptId applicationAttemptId = 
RssMRUtils.getApplicationAttemptId();
       String appId = applicationAttemptId.toString();
       RemoteStorageInfo defaultRemoteStorage =
-          new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, 
""));
+          new 
RemoteStorageInfo(extraConf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
       RemoteStorageInfo remoteStorage =
           ClientUtils.fetchRemoteStorage(
               appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
client);
       // set the remote storage with actual value
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, 
remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, 
remoteStorage.getConfString());
-      RssMRUtils.validateRssClientConf(extraConf, conf);
+      RssMRUtils.validateRssClientConf(extraConf);
       // When containers have disk with very limited space, reduce is allowed 
to spill data to hdfs
       if (conf.getBoolean(
           RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
@@ -421,7 +425,7 @@ public class RssMRAppMaster extends MRAppMaster {
     }
   }
 
-  static void writeExtraConf(JobConf conf, JobConf extraConf) {
+  static void writeExtraConf(JobConf conf, Configuration extraConf) {
     try {
       FileSystem fs = new Cluster(conf).getFileSystem();
       String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index 38469273..cb5c2c65 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -246,12 +246,12 @@ public class RssMRUtilsTest {
     JobConf rssJobConf = new JobConf();
     rssJobConf.setInt("mapreduce.job.maps", 500);
     rssJobConf.setInt("mapreduce.job.reduces", 20);
-    RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+    RssMRUtils.validateRssClientConf(rssJobConf);
     rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX, 5);
     rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
     rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
     try {
-      RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+      RssMRUtils.validateRssClientConf(rssJobConf);
       fail(EXPECTED_EXCEPTION_MESSAGE);
     } catch (IllegalArgumentException e) {
       assertTrue(e.getMessage().contains("should not bigger than"));
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
similarity index 64%
copy from 
integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
copy to 
integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
index 4dea5f9f..ab1b9de1 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
@@ -17,38 +17,36 @@
 
 package org.apache.uniffle.test;
 
+import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.LargeSorter;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.storage.util.StorageType;
 
-public class LargeSorterTest extends MRIntegrationTestBase {
+public class DynamicConfTest extends MRIntegrationTestBase {
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = Maps.newHashMap();
-    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
+    MRIntegrationTestBase.setupServers(DynamicConfTest.getDynamicConf());
+  }
+
+  protected static Map<String, String> getDynamicConf() {
+    Map<String, String> dynamicConf = new HashMap<>();
+    dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + 
"rss/test");
     dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    addDynamicConf(coordinatorConf, dynamicConf);
-    createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+    return dynamicConf;
   }
 
   @Test
-  public void largeSorterTest() throws Exception {
+  public void dynamicConfTest() throws Exception {
     run();
   }
 
@@ -56,9 +54,6 @@ public class LargeSorterTest extends MRIntegrationTestBase {
   protected void updateRssConfiguration(Configuration jobConf) {
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
     jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
-    jobConf.set(
-        MRJobConfig.MR_AM_COMMAND_OPTS,
-        "-XX:+TraceClassLoading org.apache.uniffle.test.FailoverAppMaster");
   }
 
   @Override
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
 b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
similarity index 61%
copy from 
integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
copy to 
integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
index 4dea5f9f..892b9e19 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
@@ -17,48 +17,42 @@
 
 package org.apache.uniffle.test;
 
+import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.LargeSorter;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.storage.util.StorageType;
 
-public class LargeSorterTest extends MRIntegrationTestBase {
+public class HadoopConfTest extends MRIntegrationTestBase {
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = Maps.newHashMap();
-    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
-    dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    addDynamicConf(coordinatorConf, dynamicConf);
-    createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    MRIntegrationTestBase.setupServers(HadoopConfTest.getDynamicConf());
+  }
+
+  protected static Map<String, String> getDynamicConf() {
+    return new HashMap<>();
   }
 
   @Test
-  public void largeSorterTest() throws Exception {
+  public void hadoopConfTest() throws Exception {
     run();
   }
 
   @Override
   protected void updateRssConfiguration(Configuration jobConf) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+    jobConf.set(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
     jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
-    jobConf.set(
-        MRJobConfig.MR_AM_COMMAND_OPTS,
-        "-XX:+TraceClassLoading org.apache.uniffle.test.FailoverAppMaster");
   }
 
   @Override
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
index 4dea5f9f..be7ec837 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.uniffle.test;
 
-import java.util.Map;
-
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.LargeSorter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -28,23 +25,13 @@ import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
+import org.apache.uniffle.common.ClientType;
 
 public class LargeSorterTest extends MRIntegrationTestBase {
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = Maps.newHashMap();
-    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
-    dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    addDynamicConf(coordinatorConf, dynamicConf);
-    createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
   @Test
@@ -54,6 +41,7 @@ public class LargeSorterTest extends MRIntegrationTestBase {
 
   @Override
   protected void updateRssConfiguration(Configuration jobConf) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
     jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
     jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
     jobConf.set(
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 4f5905d5..02cce678 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -21,7 +21,9 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +47,9 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 
 import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -62,7 +67,7 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     }
   }
 
-  private static Path TEST_ROOT_DIR =
+  private static final Path TEST_ROOT_DIR =
       localFs.makeQualified(new Path("target", TestMRJobs.class.getName() + 
"-tmpDir"));
   static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
   private static final String OUTPUT_ROOT_DIR = "/tmp/" + 
TestMRJobs.class.getSimpleName();
@@ -176,7 +181,6 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
             + ","
             + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
     jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
-    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
     updateRssConfiguration(jobConf);
     runMRApp(jobConf, getTestTool(), getTestArgs());
   }
@@ -185,7 +189,25 @@ public class MRIntegrationTestBase extends 
IntegrationTestBase {
     return new String[0];
   }
 
-  protected void updateRssConfiguration(Configuration jobConf) {}
+  protected static void setupServers(Map<String, String> dynamicConf) throws 
Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    addDynamicConf(coordinatorConf, dynamicConf);
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  protected static Map<String, String> getDynamicConf() {
+    Map<String, String> dynamicConf = new HashMap<>();
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
+    dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    return dynamicConf;
+  }
+
+  protected void updateRssConfiguration(Configuration jobConf) {
+    jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+  }
 
   private void runMRApp(Configuration conf, Tool tool, String[] args) throws 
Exception {
     assertEquals(0, ToolRunner.run(conf, tool, args), 
tool.getClass().getName() + " failed");
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
index 2ead1c77..6e0bf5dc 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.uniffle.test;
 
-import java.util.Map;
 import java.util.Random;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SecondarySort;
@@ -32,30 +30,17 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
 public class SecondarySortTest extends MRIntegrationTestBase {
 
   String inputPath = "secondary_sort_input";
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = Maps.newHashMap();
-    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
-    dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    addDynamicConf(coordinatorConf, dynamicConf);
-    createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
   @Test
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
index 48c4bab8..2ba76f5a 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
@@ -18,11 +18,9 @@
 package org.apache.uniffle.test;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -32,15 +30,10 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.WordCount;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RssMRConfig;
 import org.apache.hadoop.util.Tool;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
 public class WordCountTest extends MRIntegrationTestBase {
 
   String inputPath = "word_count_input";
@@ -50,15 +43,7 @@ public class WordCountTest extends MRIntegrationTestBase {
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    Map<String, String> dynamicConf = Maps.newHashMap();
-    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
-    dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    addDynamicConf(coordinatorConf, dynamicConf);
-    createCoordinatorServer(coordinatorConf);
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
   }
 
   @Test

Reply via email to