This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6663e26ff [#1416] feat(spark): support custom hadoop config in client 
side (#1417)
6663e26ff is described below

commit 6663e26ff2bfe671cf36280fd1f7799c0cae763c
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 8 11:39:53 2024 +0800

    [#1416] feat(spark): support custom hadoop config in client side (#1417)
    
    ### What changes were proposed in this pull request?
    
    support custom hadoop config in client side
    
    ### Why are the changes needed?
    
    It's necessary to support user specify custom hadoop config in client side 
for remote storage.
    
    For #1416
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    The prefix key of `spark.rss.hadoop.*` for Hadoop conf.
    For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`,
    this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage
    
    ### How was this patch tested?
    
    UTs
---
 .../shuffle/manager/RssShuffleManagerBase.java     | 15 ++++++--
 .../shuffle/manager/RssShuffleManagerBaseTest.java | 42 ++++++++++++++++++++++
 .../apache/spark/shuffle/RssShuffleManager.java    |  2 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  2 +-
 .../uniffle/common/config/RssClientConf.java       |  9 +++++
 docs/client_guide/spark_client_guide.md            | 17 ++++-----
 6 files changed, 75 insertions(+), 12 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index b0836f9d1..b70ab6933 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
 import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
 
 public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterface, ShuffleManager {
@@ -161,7 +162,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     return tracker instanceof MapOutputTrackerMaster ? 
(MapOutputTrackerMaster) tracker : null;
   }
 
-  private Map<String, String> parseRemoteStorageConf(Configuration conf) {
+  private static Map<String, String> parseRemoteStorageConf(Configuration 
conf) {
     Map<String, String> confItems = Maps.newHashMap();
     for (Map.Entry<String, String> entry : conf) {
       confItems.put(entry.getKey(), entry.getValue());
@@ -169,13 +170,23 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     return confItems;
   }
 
-  protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) {
+  protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf 
sparkConf) {
     Map<String, String> confItems = Maps.newHashMap();
     RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) {
       confItems = parseRemoteStorageConf(new Configuration(true));
     }
 
+    for (String key : rssConf.getKeySet()) {
+      if (key.startsWith(HADOOP_CONFIG_KEY_PREFIX)) {
+        String val = rssConf.getString(key, null);
+        if (val != null) {
+          String extractedKey = key.replaceFirst(HADOOP_CONFIG_KEY_PREFIX, "");
+          confItems.put(extractedKey, val);
+        }
+      }
+    }
+
     return new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), 
confItems);
   }
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
new file mode 100644
index 000000000..15cc7fac0
--- /dev/null
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import org.apache.spark.SparkConf;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssShuffleManagerBaseTest {
+
+  @Test
+  public void testGetDefaultRemoteStorageInfo() {
+    SparkConf sparkConf = new SparkConf();
+    RemoteStorageInfo remoteStorageInfo =
+        RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
+    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+    sparkConf.set("spark.rss.hadoop.fs.defaultFs", "hdfs://rbf-xxx/foo");
+    remoteStorageInfo = 
RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
+    assertEquals(remoteStorageInfo.getConfItems().size(), 1);
+    assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"), 
"hdfs://rbf-xxx/foo");
+  }
+}
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 26cbd6412..fdcc167b5 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -306,7 +306,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     }
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
+    RemoteStorageInfo defaultRemoteStorage = 
getDefaultRemoteStorageInfo(sparkConf);
     RemoteStorageInfo remoteStorage =
         ClientUtils.fetchRemoteStorage(
             appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 013a4acf1..27c8bb8ba 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -410,7 +410,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     }
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
+    RemoteStorageInfo defaultRemoteStorage = 
getDefaultRemoteStorageInfo(sparkConf);
     RemoteStorageInfo remoteStorage =
         ClientUtils.fetchRemoteStorage(
             id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index f73268790..b8adb591b 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -25,6 +25,15 @@ import org.apache.uniffle.common.netty.IOMode;
 import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
 
 public class RssClientConf {
+  /**
+   * The prefix key for Hadoop conf. For Spark like that:
+   *
+   * <p>key: spark.rss.hadoop.fs.defaultFS val: hdfs://rbf-x1
+   *
+   * <p>The key will be extracted to the hadoop conf: "fs.defaultFS" and 
inject this into Hadoop
+   * storage configuration.
+   */
+  public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop.";
 
   public static final ConfigOption<Codec.Type> COMPRESSION_TYPE =
       ConfigOptions.key("rss.client.io.compression.codec")
diff --git a/docs/client_guide/spark_client_guide.md 
b/docs/client_guide/spark_client_guide.md
index 75e2086a3..528da39be 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -78,14 +78,15 @@ Local shuffle reader as its name indicates is suitable and 
optimized for spark's
 
 The important configuration is listed as following.
 
-| Property Name                                         | Default | 
Description                                                                     
                                                                                
               |
-|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| spark.rss.writer.buffer.spill.size                    | 128m    | Buffer 
size for total partition data                                                   
                                                                                
        |
-| spark.rss.client.send.size.limit                      | 16m     | The max 
data size sent to shuffle server                                                
                                                                                
       |
-| spark.rss.client.unregister.thread.pool.size          | 10      | The max 
size of thread pool of unregistering                                            
                                                                                
       |
-| spark.rss.client.unregister.request.timeout.sec       | 10      | The max 
timeout sec when doing unregister to remote shuffle-servers                     
                                                                                
       |
-| spark.rss.client.off.heap.memory.enable               | false   | The client 
use off heap memory to process data                                             
                                                                                
    |
-| spark.rss.client.remote.storage.useLocalConfAsDefault | false   | This 
option is only valid when the remote storage path is specified. If ture, the 
remote storage conf will use the client side hadoop configuration loaded from 
the classpath  |
+| Property Name                                         | Default | 
Description                                                                     
                                                                                
              |
+|-------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.writer.buffer.spill.size                    | 128m    | Buffer 
size for total partition data                                                   
                                                                                
       |
+| spark.rss.client.send.size.limit                      | 16m     | The max 
data size sent to shuffle server                                                
                                                                                
      |
+| spark.rss.client.unregister.thread.pool.size          | 10      | The max 
size of thread pool of unregistering                                            
                                                                                
      |
+| spark.rss.client.unregister.request.timeout.sec       | 10      | The max 
timeout sec when doing unregister to remote shuffle-servers                     
                                                                                
      |
+| spark.rss.client.off.heap.memory.enable               | false   | The client 
use off heap memory to process data                                             
                                                                                
   |
+| spark.rss.client.remote.storage.useLocalConfAsDefault | false   | This 
option is only valid when the remote storage path is specified. If ture, the 
remote storage conf will use the client side hadoop configuration loaded from 
the classpath |
+| spark.rss.hadoop.*                                    | -       | The prefix 
key for Hadoop conf. For Spark like that: 
`spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as 
`fs.defaultFS=hdfs://rbf-x1` for Hadoop storage           |
 
 ### Adaptive Remote Shuffle Enabling 
 Currently, this feature only supports Spark. 

Reply via email to