This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ce102ce9f [#1364] feat(client): introduce option to control whether to
use local hadoop conf (#1370)
ce102ce9f is described below
commit ce102ce9fb31ce976a6268ca39467fac45c8507c
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Dec 18 10:00:03 2023 +0800
[#1364] feat(client): introduce option to control whether to use local
hadoop conf (#1370)
### What changes were proposed in this pull request?
Introduce the config to control whether to use local hadoop conf for remote
storage by default
### Why are the changes needed?
I want to deploy one uniffle cluster to serve for spark jobs in multiple
hadoop clusters. So I hope the remote storage config could be loaded from the
default hdfs-site.xml in the classpath.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Internal tests.
---
.../shuffle/manager/RssShuffleManagerBase.java | 28 ++++++++++++++++++++++
.../apache/spark/shuffle/RssShuffleManager.java | 3 +--
.../apache/spark/shuffle/RssShuffleManager.java | 3 +--
.../uniffle/common/config/RssClientConf.java | 8 +++++++
docs/client_guide/spark_client_guide.md | 15 ++++++------
5 files changed, 46 insertions(+), 11 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 01ffbdeff..b0836f9d1 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -19,21 +19,30 @@ package org.apache.uniffle.shuffle.manager;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.MapOutputTrackerMaster;
+import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkException;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.SparkVersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
+
public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterface, ShuffleManager {
private static final Logger LOG =
LoggerFactory.getLogger(RssShuffleManagerBase.class);
private AtomicBoolean isInitialized = new AtomicBoolean(false);
@@ -151,4 +160,23 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
Optional.ofNullable(SparkEnv.get()).map(SparkEnv::mapOutputTracker).orElse(null);
return tracker instanceof MapOutputTrackerMaster ?
(MapOutputTrackerMaster) tracker : null;
}
+
+ private Map<String, String> parseRemoteStorageConf(Configuration conf) {
+ Map<String, String> confItems = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : conf) {
+ confItems.put(entry.getKey(), entry.getValue());
+ }
+ return confItems;
+ }
+
+ protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) {
+ Map<String, String> confItems = Maps.newHashMap();
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) {
+ confItems = parseRemoteStorageConf(new Configuration(true));
+ }
+
+ return new RemoteStorageInfo(
+ sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""),
confItems);
+ }
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b68a67d35..26cbd6412 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -306,8 +306,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
- RemoteStorageInfo defaultRemoteStorage =
- new
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(),
""));
+ RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 62a53ae5e..013a4acf1 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -410,8 +410,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
- RemoteStorageInfo defaultRemoteStorage =
- new
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(),
""));
+ RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 86eb1d950..f73268790 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -142,4 +142,12 @@ public class RssClientConf {
.enumType(ClientType.class)
.defaultValue(ClientType.GRPC)
.withDescription("Supports GRPC, GRPC_NETTY");
+
+ public static final ConfigOption<Boolean>
RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED =
+ ConfigOptions.key("rss.client.remote.storage.useLocalConfAsDefault")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "This option is only valid when the remote storage path is
specified. If ture, "
+ + "the remote storage conf will use the client side hadoop
configuration loaded from the classpath.");
}
diff --git a/docs/client_guide/spark_client_guide.md
b/docs/client_guide/spark_client_guide.md
index d2c896ac5..75e2086a3 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -78,13 +78,14 @@ Local shuffle reader as its name indicates is suitable and
optimized for spark's
The important configuration is listed as following.
-|Property Name|Default|Description|
-|---|---|---|
-|spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
-|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
-|spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool
of unregistering|
-|spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when
doing unregister to remote shuffle-servers|
-|spark.rss.client.off.heap.memory.enable|false|The client use off heap memory
to process data|
+| Property Name | Default |
Description
|
+|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.writer.buffer.spill.size | 128m | Buffer
size for total partition data
|
+| spark.rss.client.send.size.limit | 16m | The max
data size sent to shuffle server
|
+| spark.rss.client.unregister.thread.pool.size | 10 | The max
size of thread pool of unregistering
|
+| spark.rss.client.unregister.request.timeout.sec | 10 | The max
timeout sec when doing unregister to remote shuffle-servers
|
+| spark.rss.client.off.heap.memory.enable | false | The client
use off heap memory to process data
|
+| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This
option is only valid when the remote storage path is specified. If ture, the
remote storage conf will use the client side hadoop configuration loaded from
the classpath |
### Adaptive Remote Shuffle Enabling
Currently, this feature only supports Spark.