This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9b30491c [#1051] fix(mr): Ensure configurations in both
mapred-site.xml and dynamic_client.conf take effect. (#1112)
9b30491c is described below
commit 9b30491c134d38694c2e292c15bc602fda6967a3
Author: SevenAddSix <[email protected]>
AuthorDate: Thu Aug 17 19:53:25 2023 +0800
[#1051] fix(mr): Ensure configurations in both mapred-site.xml and
dynamic_client.conf take effect. (#1112)
### What changes were proposed in this pull request?
Ensure configurations in both mapred-site.xml and dynamic_client.conf take
effect.
### Why are the changes needed?
Fix: #1051
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
tested in our cluster and UT
Co-authored-by: qijiale <[email protected]>
---
.../hadoop/mapred/RssMapOutputCollector.java | 17 +---
.../org/apache/hadoop/mapreduce/RssMRConfig.java | 104 ++++++++++-----------
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 77 ++++++++++-----
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 19 +---
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 44 +++++----
.../apache/hadoop/mapreduce/RssMRUtilsTest.java | 4 +-
.../{LargeSorterTest.java => DynamicConfTest.java} | 29 +++---
.../{LargeSorterTest.java => HadoopConfTest.java} | 30 +++---
.../org/apache/uniffle/test/LargeSorterTest.java | 18 +---
.../apache/uniffle/test/MRIntegrationTestBase.java | 28 +++++-
.../org/apache/uniffle/test/SecondarySortTest.java | 17 +---
.../org/apache/uniffle/test/WordCountTest.java | 17 +---
12 files changed, 194 insertions(+), 210 deletions(-)
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 06351981..1066960c 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.JobContext;
@@ -67,11 +68,10 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
}
partitions = mrJobConf.getNumReduceTasks();
MapTask mapTask = context.getMapTask();
- JobConf rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
+ Configuration rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
double sortThreshold =
RssMRUtils.getDouble(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) {
@@ -81,14 +81,12 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
int batch =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
RawComparator<K> comparator = mrJobConf.getOutputKeyComparator();
double memoryThreshold =
RssMRUtils.getDouble(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
ApplicationAttemptId applicationAttemptId =
RssMRUtils.getApplicationAttemptId();
@@ -99,30 +97,26 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
double sendThreshold =
RssMRUtils.getDouble(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
long sendCheckInterval =
RssMRUtils.getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);
long sendCheckTimeout =
RssMRUtils.getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
int bitmapSplitNum =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_BITMAP_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
int numMaps = mrJobConf.getNumMapTasks();
- String storageType = RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_STORAGE_TYPE);
+ String storageType = RssMRUtils.getString(rssJobConf,
RssMRConfig.RSS_STORAGE_TYPE);
if (StringUtils.isEmpty(storageType)) {
throw new RssException("storage type mustn't be empty");
}
@@ -133,19 +127,16 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
long maxSegmentSize =
RssMRUtils.getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE,
RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE);
int sendThreadNum =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
long maxBufferSize =
RssMRUtils.getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
@@ -177,7 +168,7 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
RssMRConfig.toRssConf(rssJobConf));
}
- private Map<Integer, List<ShuffleServerInfo>> createAssignmentMap(JobConf
jobConf) {
+ private Map<Integer, List<ShuffleServerInfo>>
createAssignmentMap(Configuration jobConf) {
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
Maps.newHashMap();
for (int i = 0; i < partitions; i++) {
String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX + i);
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 251c6204..004aa4bc 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -21,126 +21,124 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.config.RssConf;
public class RssMRConfig {
- public static final String MR_RSS_CONFIG_PREFIX = "mapreduce.";
+ public static final String MR_CONFIG_PREFIX = "mapreduce.";
+
+ public static final String MR_RSS_CONFIG_PREFIX = "mapreduce.rss.";
+
public static final String RSS_CLIENT_HEARTBEAT_THREAD_NUM =
- MR_RSS_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
+ MR_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
public static final int RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE = 4;
- public static final String RSS_CLIENT_TYPE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE;
+ public static final String RSS_CLIENT_TYPE = MR_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_TYPE;
public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE;
public static final String RSS_CLIENT_RETRY_MAX =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE;
public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE;
public static final String RSS_COORDINATOR_QUORUM =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
- public static final String RSS_DATA_REPLICA =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
+ public static final String RSS_DATA_REPLICA = MR_CONFIG_PREFIX +
RssClientConfig.RSS_DATA_REPLICA;
public static final int RSS_DATA_REPLICA_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE;
public static final String RSS_DATA_REPLICA_WRITE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
public static final int RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE;
public static final String RSS_DATA_REPLICA_READ =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
public static final String RSS_DATA_TRANSFER_POOL_SIZE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
public static final String RSS_DATA_COMMIT_POOL_SIZE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
public static final String RSS_CLIENT_SEND_THREAD_NUM =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM;
public static final String RSS_CLIENT_SEND_THRESHOLD =
- MR_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
+ MR_CONFIG_PREFIX + "rss.client.send.threshold";
public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
public static final String RSS_HEARTBEAT_INTERVAL =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE =
RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE;
public static final String RSS_HEARTBEAT_TIMEOUT =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
- public static final String RSS_ASSIGNMENT_PREFIX =
- MR_RSS_CONFIG_PREFIX + "rss.assignment.partition.";
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
+ public static final String RSS_ASSIGNMENT_PREFIX = MR_CONFIG_PREFIX +
"rss.assignment.partition.";
public static final String RSS_CLIENT_BATCH_TRIGGER_NUM =
- MR_RSS_CONFIG_PREFIX + "rss.client.batch.trigger.num";
+ MR_CONFIG_PREFIX + "rss.client.batch.trigger.num";
public static final int RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM = 50;
public static final String RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD =
- MR_RSS_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold";
+ MR_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold";
public static final String RSS_WRITER_BUFFER_SIZE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE;
public static final long RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE = 1024 * 1024
* 14;
public static final double RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD =
0.9f;
public static final String RSS_CLIENT_MEMORY_THRESHOLD =
- MR_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
+ MR_CONFIG_PREFIX + "rss.client.memory.threshold";
public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
public static final String RSS_CLIENT_SEND_CHECK_INTERVAL_MS =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS;
public static final long RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE;
public static final String RSS_CLIENT_SEND_CHECK_TIMEOUT_MS =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS;
public static final long RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE;
- public static final String RSS_CLIENT_BITMAP_NUM = MR_RSS_CONFIG_PREFIX +
"rss.client.bitmap.num";
+ public static final String RSS_CLIENT_BITMAP_NUM = MR_CONFIG_PREFIX +
"rss.client.bitmap.num";
public static final int RSS_CLIENT_DEFAULT_BITMAP_NUM = 1;
public static final String RSS_CLIENT_MAX_SEGMENT_SIZE =
- MR_RSS_CONFIG_PREFIX + "rss.client.max.buffer.size";
+ MR_CONFIG_PREFIX + "rss.client.max.buffer.size";
public static final long RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE = 3 * 1024;
- public static final String RSS_STORAGE_TYPE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
+ public static final String RSS_STORAGE_TYPE = MR_CONFIG_PREFIX +
RssClientConfig.RSS_STORAGE_TYPE;
public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED =
- MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.enable";
+ MR_CONFIG_PREFIX + "rss.reduce.remote.spill.enable";
public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false;
public static final String RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC =
- MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc";
+ MR_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc";
public static final int RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT = 1;
public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION =
- MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.replication";
+ MR_CONFIG_PREFIX + "rss.reduce.remote.spill.replication";
public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1;
public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES =
- MR_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.retries";
+ MR_CONFIG_PREFIX + "rss.reduce.remote.spill.retries";
public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
public static final String RSS_PARTITION_NUM_PER_RANGE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE;
public static final String RSS_REMOTE_STORAGE_PATH =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
- public static final String RSS_REMOTE_STORAGE_CONF =
- MR_RSS_CONFIG_PREFIX + "rss.remote.storage.conf";
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+ public static final String RSS_REMOTE_STORAGE_CONF = MR_CONFIG_PREFIX +
"rss.remote.storage.conf";
public static final String RSS_INDEX_READ_LIMIT =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT;
public static final int RSS_INDEX_READ_LIMIT_DEFAULT_VALUE =
RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE;
public static final String RSS_CLIENT_READ_BUFFER_SIZE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE;
// When the size of read buffer reaches the half of JVM region (i.e., 32m),
// it will incur humongous allocation, so we set it to 14m.
@@ -148,16 +146,16 @@ public class RssMRConfig {
RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE;
public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE;
public static final String RSS_ACCESS_TIMEOUT_MS =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE =
RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;
public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
@@ -165,27 +163,27 @@ public class RssMRConfig {
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;
public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
+ MR_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
public static final boolean
RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;
public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
+ MR_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
public static final double
RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;
public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
- MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
+ MR_CONFIG_PREFIX +
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
public static final int
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
@@ -196,16 +194,16 @@ public class RssMRConfig {
// Whether enable test mode for the MR Client
public static final String RSS_TEST_MODE_ENABLE =
- MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
- public static RssConf toRssConf(JobConf jobConf) {
+ public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
String key = entry.getKey();
- if (!key.startsWith(MR_RSS_CONFIG_PREFIX)) {
+ if (!key.startsWith(MR_CONFIG_PREFIX)) {
continue;
}
- key = key.substring(MR_RSS_CONFIG_PREFIX.length());
+ key = key.substring(MR_CONFIG_PREFIX.length());
rssConf.setString(key, entry.getValue());
}
return rssConf;
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 31ce99dc..67ca72b5 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.mapreduce;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -128,7 +131,7 @@ public class RssMRUtils {
return client;
}
- public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int
reduceID) {
+ public static Set<ShuffleServerInfo> getAssignedServers(Configuration
jobConf, int reduceID) {
String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX +
String.valueOf(reduceID));
String[] splitServers = servers.split(",");
Set<ShuffleServerInfo> assignServers = Sets.newHashSet();
@@ -142,7 +145,41 @@ public class RssMRUtils {
return containerId.getApplicationAttemptId();
}
- public static void applyDynamicClientConf(JobConf jobConf, Map<String,
String> confItems) {
+ public static void applyClientConf(Configuration jobConf, JobConf mrJobConf)
{
+
+ if (jobConf == null) {
+ LOG.warn("Job conf is null");
+ return;
+ }
+
+ if (mrJobConf == null) {
+ LOG.warn("Empty conf items");
+ return;
+ }
+
+ Iterator<Map.Entry<String, String>> iterator = mrJobConf.iterator();
+ Map<String, String> confItems = new HashMap<>();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, String> entry = iterator.next();
+ String key = entry.getKey();
+ if (!key.startsWith(RssMRConfig.MR_RSS_CONFIG_PREFIX)) {
+ continue;
+ }
+ confItems.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, String> kv : confItems.entrySet()) {
+ String mrConfKey = kv.getKey();
+ String mrConfVal = kv.getValue();
+ if (StringUtils.isEmpty(jobConf.get(mrConfKey, ""))) {
+ LOG.warn("Use conf client conf {} = {}", mrConfKey, mrConfVal);
+ jobConf.set(mrConfKey, mrConfVal);
+ }
+ }
+ }
+
+ public static void applyDynamicClientConf(Configuration jobConf, Map<String,
String> confItems) {
if (jobConf == null) {
LOG.warn("Job conf is null");
return;
@@ -155,8 +192,8 @@ public class RssMRUtils {
for (Map.Entry<String, String> kv : confItems.entrySet()) {
String mrConfKey = kv.getKey();
- if (!mrConfKey.startsWith(RssMRConfig.MR_RSS_CONFIG_PREFIX)) {
- mrConfKey = RssMRConfig.MR_RSS_CONFIG_PREFIX + mrConfKey;
+ if (!mrConfKey.startsWith(RssMRConfig.MR_CONFIG_PREFIX)) {
+ mrConfKey = RssMRConfig.MR_CONFIG_PREFIX + mrConfKey;
}
String mrConfVal = kv.getValue();
if (StringUtils.isEmpty(jobConf.get(mrConfKey, ""))
@@ -167,31 +204,28 @@ public class RssMRUtils {
}
}
- public static int getInt(JobConf rssJobConf, JobConf mrJobCOnf, String key,
int defaultValue) {
- return rssJobConf.getInt(key, mrJobCOnf.getInt(key, defaultValue));
+ public static int getInt(Configuration rssJobConf, String key, int
defaultValue) {
+ return rssJobConf.getInt(key, defaultValue);
}
- public static long getLong(JobConf rssJobConf, JobConf mrJobConf, String
key, long defaultValue) {
- return rssJobConf.getLong(key, mrJobConf.getLong(key, defaultValue));
+ public static long getLong(Configuration rssJobConf, String key, long
defaultValue) {
+ return rssJobConf.getLong(key, defaultValue);
}
- public static boolean getBoolean(
- JobConf rssJobConf, JobConf mrJobConf, String key, boolean defaultValue)
{
- return rssJobConf.getBoolean(key, mrJobConf.getBoolean(key, defaultValue));
+ public static boolean getBoolean(Configuration rssJobConf, String key,
boolean defaultValue) {
+ return rssJobConf.getBoolean(key, defaultValue);
}
- public static double getDouble(
- JobConf rssJobConf, JobConf mrJobConf, String key, double defaultValue) {
- return rssJobConf.getDouble(key, mrJobConf.getDouble(key, defaultValue));
+ public static double getDouble(Configuration rssJobConf, String key, double
defaultValue) {
+ return rssJobConf.getDouble(key, defaultValue);
}
- public static String getString(JobConf rssJobConf, JobConf mrJobConf, String
key) {
- return rssJobConf.get(key, mrJobConf.get(key));
+ public static String getString(Configuration rssJobConf, String key) {
+ return rssJobConf.get(key, "");
}
- public static String getString(
- JobConf rssJobConf, JobConf mrJobConf, String key, String defaultValue) {
- return rssJobConf.get(key, mrJobConf.get(key, defaultValue));
+ public static String getString(Configuration rssJobConf, String key, String
defaultValue) {
+ return rssJobConf.get(key, defaultValue);
}
public static long getBlockId(long partitionId, long taskAttemptId, int
nextSeqNo) {
@@ -283,23 +317,20 @@ public class RssMRUtils {
return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
}
- public static void validateRssClientConf(JobConf rssJobConf, JobConf
mrJobConf) {
+ public static void validateRssClientConf(Configuration rssJobConf) {
int retryMax =
getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_RETRY_MAX,
RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
long retryIntervalMax =
getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
long sendCheckTimeout =
getLong(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
if (retryIntervalMax * retryMax > sendCheckTimeout) {
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 6231000c..47d06154 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
@@ -56,7 +57,7 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
private JobConf mrJobConf;
- private JobConf rssJobConf;
+ private Configuration rssJobConf;
private Reporter reporter;
private ShuffleClientMetrics metrics;
private TaskUmbilicalProtocol umbilical;
@@ -102,33 +103,26 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
this.replicaWrite =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_DATA_REPLICA_WRITE,
RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
this.replicaRead =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_DATA_REPLICA_READ,
RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.replica =
RssMRUtils.getInt(
- rssJobConf,
- mrJobConf,
- RssMRConfig.RSS_DATA_REPLICA,
- RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+ rssJobConf, RssMRConfig.RSS_DATA_REPLICA,
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
this.partitionNum = mrJobConf.getNumReduceTasks();
this.partitionNumPerRange =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
- this.basePath =
- RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
+ this.basePath = RssMRUtils.getString(rssJobConf,
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
String remoteStorageConf =
- RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
+ RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF,
"");
this.remoteStorageInfo = new RemoteStorageInfo(basePath,
remoteStorageConf);
this.merger = createMergeManager(context);
}
@@ -137,7 +131,6 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
boolean useRemoteSpill =
RssMRUtils.getBoolean(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT);
if (useRemoteSpill) {
@@ -146,13 +139,11 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
int replication =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
int retries =
RssMRUtils.getInt(
rssJobConf,
- mrJobConf,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
return new RssRemoteMergeManagerImpl(
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index d3fe6b09..a631696c 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -143,19 +144,6 @@ public class RssMRAppMaster extends MRAppMaster {
LOG.info("Registering coordinators {}", coordinators);
client.registerCoordinators(coordinators);
- // Get the configured server assignment tags and it will also add
default shuffle version tag.
- Set<String> assignmentTags = new HashSet<>();
- String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
- if (StringUtils.isNotEmpty(rawTags)) {
- rawTags = rawTags.trim();
- assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
- }
- assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
- String clientType =
- conf.get(RssMRConfig.RSS_CLIENT_TYPE,
RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
- ClientUtils.validateClientType(clientType);
- assignmentTags.add(clientType);
-
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@@ -167,9 +155,13 @@ public class RssMRAppMaster extends MRAppMaster {
}
});
- JobConf extraConf = new JobConf();
+ // set loadDefaults to false, rss_conf.xml should only contain conf of
RSS,
+ // Hadoop conf is not necessary.
+ Configuration extraConf = new JobConf(false);
extraConf.clear();
+ RssMRUtils.applyClientConf(extraConf, conf);
+
// get remote storage from coordinator if necessary
boolean dynamicConfEnabled =
conf.getBoolean(
@@ -186,21 +178,33 @@ public class RssMRAppMaster extends MRAppMaster {
RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf);
}
- String storageType = RssMRUtils.getString(extraConf, conf,
RssMRConfig.RSS_STORAGE_TYPE);
- boolean testMode =
- RssMRUtils.getBoolean(extraConf, conf,
RssMRConfig.RSS_TEST_MODE_ENABLE, false);
+ // Get the configured server assignment tags and it will also add
default shuffle version tag.
+ Set<String> assignmentTags = new HashSet<>();
+ String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
+ if (StringUtils.isNotEmpty(rawTags)) {
+ rawTags = rawTags.trim();
+ assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
+ }
+ assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
+ String clientType =
+ extraConf.get(RssMRConfig.RSS_CLIENT_TYPE,
RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+ ClientUtils.validateClientType(clientType);
+ assignmentTags.add(clientType);
+
+ String storageType = RssMRUtils.getString(extraConf,
RssMRConfig.RSS_STORAGE_TYPE);
+ boolean testMode = RssMRUtils.getBoolean(extraConf,
RssMRConfig.RSS_TEST_MODE_ENABLE, false);
ClientUtils.validateTestModeConf(testMode, storageType);
ApplicationAttemptId applicationAttemptId =
RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();
RemoteStorageInfo defaultRemoteStorage =
- new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
""));
+ new
RemoteStorageInfo(extraConf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
client);
// set the remote storage with actual value
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF,
remoteStorage.getConfString());
- RssMRUtils.validateRssClientConf(extraConf, conf);
+ RssMRUtils.validateRssClientConf(extraConf);
// When containers have disk with very limited space, reduce is allowed
to spill data to hdfs
if (conf.getBoolean(
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
@@ -421,7 +425,7 @@ public class RssMRAppMaster extends MRAppMaster {
}
}
- static void writeExtraConf(JobConf conf, JobConf extraConf) {
+ static void writeExtraConf(JobConf conf, Configuration extraConf) {
try {
FileSystem fs = new Cluster(conf).getFileSystem();
String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index 38469273..cb5c2c65 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -246,12 +246,12 @@ public class RssMRUtilsTest {
JobConf rssJobConf = new JobConf();
rssJobConf.setInt("mapreduce.job.maps", 500);
rssJobConf.setInt("mapreduce.job.reduces", 20);
- RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+ RssMRUtils.validateRssClientConf(rssJobConf);
rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX, 5);
rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
try {
- RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+ RssMRUtils.validateRssClientConf(rssJobConf);
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("should not bigger than"));
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
similarity index 64%
copy from
integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
copy to
integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
index 4dea5f9f..ab1b9de1 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
@@ -17,38 +17,36 @@
package org.apache.uniffle.test;
+import java.util.HashMap;
import java.util.Map;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.LargeSorter;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
-public class LargeSorterTest extends MRIntegrationTestBase {
+public class DynamicConfTest extends MRIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = Maps.newHashMap();
- dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
+ MRIntegrationTestBase.setupServers(DynamicConfTest.getDynamicConf());
+ }
+
+ protected static Map<String, String> getDynamicConf() {
+ Map<String, String> dynamicConf = new HashMap<>();
+ dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI +
"rss/test");
dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- createShuffleServer(shuffleServerConf);
- startServers();
+ dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ return dynamicConf;
}
@Test
- public void largeSorterTest() throws Exception {
+ public void dynamicConfTest() throws Exception {
run();
}
@@ -56,9 +54,6 @@ public class LargeSorterTest extends MRIntegrationTestBase {
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
- jobConf.set(
- MRJobConfig.MR_AM_COMMAND_OPTS,
- "-XX:+TraceClassLoading org.apache.uniffle.test.FailoverAppMaster");
}
@Override
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
similarity index 61%
copy from
integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
copy to
integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
index 4dea5f9f..892b9e19 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
@@ -17,48 +17,42 @@
package org.apache.uniffle.test;
+import java.util.HashMap;
import java.util.Map;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.LargeSorter;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
-public class LargeSorterTest extends MRIntegrationTestBase {
+public class HadoopConfTest extends MRIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = Maps.newHashMap();
- dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
- dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- createShuffleServer(shuffleServerConf);
- startServers();
+ MRIntegrationTestBase.setupServers(HadoopConfTest.getDynamicConf());
+ }
+
+ protected static Map<String, String> getDynamicConf() {
+ return new HashMap<>();
}
@Test
- public void largeSorterTest() throws Exception {
+ public void hadoopConfTest() throws Exception {
run();
}
@Override
protected void updateRssConfiguration(Configuration jobConf) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ jobConf.set(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
- jobConf.set(
- MRJobConfig.MR_AM_COMMAND_OPTS,
- "-XX:+TraceClassLoading org.apache.uniffle.test.FailoverAppMaster");
}
@Override
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
index 4dea5f9f..be7ec837 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
@@ -17,9 +17,6 @@
package org.apache.uniffle.test;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.LargeSorter;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -28,23 +25,13 @@ import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
+import org.apache.uniffle.common.ClientType;
public class LargeSorterTest extends MRIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = Maps.newHashMap();
- dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
- dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- createShuffleServer(shuffleServerConf);
- startServers();
+ MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
@Test
@@ -54,6 +41,7 @@ public class LargeSorterTest extends MRIntegrationTestBase {
@Override
protected void updateRssConfiguration(Configuration jobConf) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
jobConf.set(
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 4f5905d5..02cce678 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -21,7 +21,9 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +47,9 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -62,7 +67,7 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
}
}
- private static Path TEST_ROOT_DIR =
+ private static final Path TEST_ROOT_DIR =
localFs.makeQualified(new Path("target", TestMRJobs.class.getName() +
"-tmpDir"));
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
private static final String OUTPUT_ROOT_DIR = "/tmp/" +
TestMRJobs.class.getSimpleName();
@@ -176,7 +181,6 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
+ ","
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
- jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
updateRssConfiguration(jobConf);
runMRApp(jobConf, getTestTool(), getTestArgs());
}
@@ -185,7 +189,25 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
return new String[0];
}
- protected void updateRssConfiguration(Configuration jobConf) {}
+ protected static void setupServers(Map<String, String> dynamicConf) throws
Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ addDynamicConf(coordinatorConf, dynamicConf);
+ createCoordinatorServer(coordinatorConf);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ protected static Map<String, String> getDynamicConf() {
+ Map<String, String> dynamicConf = new HashMap<>();
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
+ dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ return dynamicConf;
+ }
+
+ protected void updateRssConfiguration(Configuration jobConf) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ }
private void runMRApp(Configuration conf, Tool tool, String[] args) throws
Exception {
assertEquals(0, ToolRunner.run(conf, tool, args),
tool.getClass().getName() + " failed");
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
index 2ead1c77..6e0bf5dc 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
@@ -17,10 +17,8 @@
package org.apache.uniffle.test;
-import java.util.Map;
import java.util.Random;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SecondarySort;
@@ -32,30 +30,17 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
public class SecondarySortTest extends MRIntegrationTestBase {
String inputPath = "secondary_sort_input";
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = Maps.newHashMap();
- dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
- dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- createShuffleServer(shuffleServerConf);
- startServers();
+ MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
@Test
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
index 48c4bab8..2ba76f5a 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
@@ -18,11 +18,9 @@
package org.apache.uniffle.test;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -32,15 +30,10 @@ import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
public class WordCountTest extends MRIntegrationTestBase {
String inputPath = "word_count_input";
@@ -50,15 +43,7 @@ public class WordCountTest extends MRIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- Map<String, String> dynamicConf = Maps.newHashMap();
- dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
- dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- createShuffleServer(shuffleServerConf);
- startServers();
+ MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
@Test