This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new bd98c4447 [#1024] improvement(tez): Optimize user switch to shuffle
mode local/remote. (#1397)
bd98c4447 is described below
commit bd98c4447523cb2c6a6269206ebb8ba6ad3567ab
Author: Qing <[email protected]>
AuthorDate: Wed Jan 3 18:01:18 2024 +0800
[#1024] improvement(tez): Optimize user switch to shuffle mode
local/remote. (#1397)
### What changes were proposed in this pull request?
Currently, users need to set the parameter tez.am.launch.launch.cmd-opts to
switch between local shuffle and remote shuffle, but this parameter is
difficult for users to understand.
We need a simpler and easier to understand parameter,such as
tez.shuffle.mode = local/remote
### Why are the changes needed?
> [Improvement] [tez]:Optimize user switch to shuffle mode local/remote.
#1024
Fix: https://github.com/apache/incubator-uniffle/issues/1024
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test or cluster prd test
---
.../java/org/apache/tez/common/RssTezConfig.java | 2 +
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 43 ++++++++++++++++++----
docs/client_guide/tez_client_guide.md | 1 +
3 files changed, 39 insertions(+), 7 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index bcbb8a95d..3c8cde330 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -222,6 +222,8 @@ public class RssTezConfig {
public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
public static final String RSS_REMOTE_SPILL_STORAGE_PATH =
TEZ_RSS_CONFIG_PREFIX + "rss.remote.spill.storage.path";
+ public static final String RSS_SHUFFLE_MODE = TEZ_RSS_CONFIG_PREFIX +
"shuffle.mode";
+ public static final String DEFAULT_RSS_SHUFFLE_MODE = "remote";
public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 55ee72069..c2c31aedb 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -325,7 +325,15 @@ public class RssDAGAppMaster extends DAGAppMaster {
try {
// We use trick way to introduce RssDAGAppMaster by the config
tez.am.launch.cmd-opts.
// It means some property which is set by command line will be ingored,
so we must reload it.
+ Configuration conf = new Configuration(new YarnConfiguration());
+ DAGProtos.ConfigurationProto confProto =
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(
+ System.getenv(ApplicationConstants.Environment.PWD.name()));
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(conf,
confProto.getConfKeyValuesList());
+
boolean sessionModeCliOption = false;
+ boolean rollBackToLocalShuffle = false;
+ String[] rollBackRemainingArgs = null;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("-D")) {
String[] property = args[i].split("=");
@@ -337,10 +345,24 @@ public class RssDAGAppMaster extends DAGAppMaster {
} else if (args[i].contains("--session") || args[i].contains("-s")) {
sessionModeCliOption = true;
}
+ if (args[i].contains(DAGAppMaster.class.getName()) &&
isLocalShuffleMode(conf)) {
+ rollBackToLocalShuffle = true;
+ rollBackRemainingArgs = Arrays.copyOfRange(args, i + 1, args.length);
+ }
}
+
// Load the log4j config is only init in static code block of
LogManager, so we must
// reconfigure.
reconfigureLog4j();
+ // if set tez.shuffle.mode = local then degenerates to the native way.
+ if (rollBackToLocalShuffle) {
+ // rollback to local shuffle mode.
+ LOG.info(
+ "Rollback to local shuffle mode, since tez.shuffle.mode = {}",
+ conf.get(RssTezConfig.RSS_SHUFFLE_MODE,
RssTezConfig.DEFAULT_RSS_SHUFFLE_MODE));
+ DAGAppMaster.main(rollBackRemainingArgs);
+ return;
+ }
// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
@@ -382,13 +404,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
+ ", logDirs="
+
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
- Configuration conf = new Configuration(new YarnConfiguration());
-
- DAGProtos.ConfigurationProto confProto =
- TezUtilsInternal.readUserSpecifiedTezConfiguration(
- System.getenv(ApplicationConstants.Environment.PWD.name()));
- TezUtilsInternal.addUserSpecifiedTezConfiguration(conf,
confProto.getConfKeyValuesList());
-
AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
@@ -453,6 +468,20 @@ public class RssDAGAppMaster extends DAGAppMaster {
}
}
+ private static boolean isLocalShuffleMode(Configuration conf) {
+ String shuffleMode =
+ conf.get(RssTezConfig.RSS_SHUFFLE_MODE,
RssTezConfig.DEFAULT_RSS_SHUFFLE_MODE);
+ switch (shuffleMode) {
+ case "remote":
+ return false;
+ case "local":
+ return true;
+ default:
+ throw new RssException(
+ "Unsupported shuffle mode" + shuffleMode + ", ensure that it is
set to local/remote.");
+ }
+ }
+
static void mayCloseTezSlowStart(Configuration conf) {
if (!conf.getBoolean(
RssTezConfig.RSS_AM_SLOW_START_ENABLE,
RssTezConfig.RSS_AM_SLOW_START_ENABLE_DEFAULT)) {
diff --git a/docs/client_guide/tez_client_guide.md
b/docs/client_guide/tez_client_guide.md
index e1a7aa483..6dd2156b6 100644
--- a/docs/client_guide/tez_client_guide.md
+++ b/docs/client_guide/tez_client_guide.md
@@ -45,6 +45,7 @@ Note that the RssDAGAppMaster will automatically disable slow
start (i.e., `tez.
| tez.rss.client.max.buffer.size | 3k | The max buffer size in map side.
Control the size of each segment(WrappedBuffer) in the buffer. |
| tez.rss.client.batch.trigger.num | 50 | The max batch of buffers to send
data in map side. Affect the number of blocks sent to the server in each batch,
and may affect rss_worker_used_buffer_size |
| tez.rss.client.send.thread.num | 5 | The thread pool size for the client to
send data to the server. |
+| tez.shuffle.mode | remote | Use Remote Shuffle if the value
is set to 'remote' or use default config value, or set 'local' to use local
shuffle when needs to fall back. |
### Remote Spill (Experimental)