This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6663e26ff [#1416] feat(spark): support custom hadoop config in client
side (#1417)
6663e26ff is described below
commit 6663e26ff2bfe671cf36280fd1f7799c0cae763c
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 8 11:39:53 2024 +0800
[#1416] feat(spark): support custom hadoop config in client side (#1417)
### What changes were proposed in this pull request?
support custom hadoop config in client side
### Why are the changes needed?
It's necessary to support user specify custom hadoop config in client side
for remote storage.
For #1416
### Does this PR introduce _any_ user-facing change?
Yes.
The prefix key of `spark.rss.hadoop.*` for Hadoop conf.
For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`,
this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage
### How was this patch tested?
UTs
---
.../shuffle/manager/RssShuffleManagerBase.java | 15 ++++++--
.../shuffle/manager/RssShuffleManagerBaseTest.java | 42 ++++++++++++++++++++++
.../apache/spark/shuffle/RssShuffleManager.java | 2 +-
.../apache/spark/shuffle/RssShuffleManager.java | 2 +-
.../uniffle/common/config/RssClientConf.java | 9 +++++
docs/client_guide/spark_client_guide.md | 17 ++++-----
6 files changed, 75 insertions(+), 12 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index b0836f9d1..b70ab6933 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import static
org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterface, ShuffleManager {
@@ -161,7 +162,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
return tracker instanceof MapOutputTrackerMaster ?
(MapOutputTrackerMaster) tracker : null;
}
- private Map<String, String> parseRemoteStorageConf(Configuration conf) {
+ private static Map<String, String> parseRemoteStorageConf(Configuration
conf) {
Map<String, String> confItems = Maps.newHashMap();
for (Map.Entry<String, String> entry : conf) {
confItems.put(entry.getKey(), entry.getValue());
@@ -169,13 +170,23 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
return confItems;
}
- protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) {
+ protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf
sparkConf) {
Map<String, String> confItems = Maps.newHashMap();
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) {
confItems = parseRemoteStorageConf(new Configuration(true));
}
+ for (String key : rssConf.getKeySet()) {
+ if (key.startsWith(HADOOP_CONFIG_KEY_PREFIX)) {
+ String val = rssConf.getString(key, null);
+ if (val != null) {
+ String extractedKey = key.replaceFirst(HADOOP_CONFIG_KEY_PREFIX, "");
+ confItems.put(extractedKey, val);
+ }
+ }
+ }
+
return new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""),
confItems);
}
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
new file mode 100644
index 000000000..15cc7fac0
--- /dev/null
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.spark.SparkConf;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssShuffleManagerBaseTest {
+
+ @Test
+ public void testGetDefaultRemoteStorageInfo() {
+ SparkConf sparkConf = new SparkConf();
+ RemoteStorageInfo remoteStorageInfo =
+ RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
+ assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+ sparkConf.set("spark.rss.hadoop.fs.defaultFs", "hdfs://rbf-xxx/foo");
+ remoteStorageInfo =
RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
+ assertEquals(remoteStorageInfo.getConfItems().size(), 1);
+ assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"),
"hdfs://rbf-xxx/foo");
+ }
+}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 26cbd6412..fdcc167b5 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -306,7 +306,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
- RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
+ RemoteStorageInfo defaultRemoteStorage =
getDefaultRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 013a4acf1..27c8bb8ba 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -410,7 +410,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
- RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
+ RemoteStorageInfo defaultRemoteStorage =
getDefaultRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index f73268790..b8adb591b 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -25,6 +25,15 @@ import org.apache.uniffle.common.netty.IOMode;
import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
public class RssClientConf {
+ /**
+ * The prefix key for Hadoop conf. For Spark like that:
+ *
+ * <p>key: spark.rss.hadoop.fs.defaultFS val: hdfs://rbf-x1
+ *
+ * <p>The key will be extracted to the hadoop conf: "fs.defaultFS" and
inject this into Hadoop
+ * storage configuration.
+ */
+ public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop.";
public static final ConfigOption<Codec.Type> COMPRESSION_TYPE =
ConfigOptions.key("rss.client.io.compression.codec")
diff --git a/docs/client_guide/spark_client_guide.md
b/docs/client_guide/spark_client_guide.md
index 75e2086a3..528da39be 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -78,14 +78,15 @@ Local shuffle reader as its name indicates is suitable and
optimized for spark's
The important configuration is listed as following.
-| Property Name | Default |
Description
|
-|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| spark.rss.writer.buffer.spill.size | 128m | Buffer
size for total partition data
|
-| spark.rss.client.send.size.limit | 16m | The max
data size sent to shuffle server
|
-| spark.rss.client.unregister.thread.pool.size | 10 | The max
size of thread pool of unregistering
|
-| spark.rss.client.unregister.request.timeout.sec | 10 | The max
timeout sec when doing unregister to remote shuffle-servers
|
-| spark.rss.client.off.heap.memory.enable | false | The client
use off heap memory to process data
|
-| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This
option is only valid when the remote storage path is specified. If ture, the
remote storage conf will use the client side hadoop configuration loaded from
the classpath |
+| Property Name | Default |
Description
|
+|-------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.writer.buffer.spill.size | 128m | Buffer
size for total partition data
|
+| spark.rss.client.send.size.limit | 16m | The max
data size sent to shuffle server
|
+| spark.rss.client.unregister.thread.pool.size | 10 | The max
size of thread pool of unregistering
|
+| spark.rss.client.unregister.request.timeout.sec | 10 | The max
timeout sec when doing unregister to remote shuffle-servers
|
+| spark.rss.client.off.heap.memory.enable | false | The client
use off heap memory to process data
|
+| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This
option is only valid when the remote storage path is specified. If ture, the
remote storage conf will use the client side hadoop configuration loaded from
the classpath |
+| spark.rss.hadoop.* | - | The prefix
key for Hadoop conf. For Spark like that:
`spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as
`fs.defaultFS=hdfs://rbf-x1` for Hadoop storage |
### Adaptive Remote Shuffle Enabling
Currently, this feature only supports Spark.