This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ce102ce9f [#1364] feat(client): introduce option to control whether to 
use local hadoop conf (#1370)
ce102ce9f is described below

commit ce102ce9fb31ce976a6268ca39467fac45c8507c
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Dec 18 10:00:03 2023 +0800

    [#1364] feat(client): introduce option to control whether to use local 
hadoop conf (#1370)
    
    ### What changes were proposed in this pull request?
    
    Introduce the config to control whether to use local hadoop conf for remote 
storage by default
    
    ### Why are the changes needed?
    
    I want to deploy one uniffle cluster to serve for spark jobs in multiple 
hadoop clusters. So I hope the remote storage config could be loaded from the 
default hdfs-site.xml in the classpath.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Internal tests.
---
 .../shuffle/manager/RssShuffleManagerBase.java     | 28 ++++++++++++++++++++++
 .../apache/spark/shuffle/RssShuffleManager.java    |  3 +--
 .../apache/spark/shuffle/RssShuffleManager.java    |  3 +--
 .../uniffle/common/config/RssClientConf.java       |  8 +++++++
 docs/client_guide/spark_client_guide.md            | 15 ++++++------
 5 files changed, 46 insertions(+), 11 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 01ffbdeff..b0836f9d1 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -19,21 +19,30 @@ package org.apache.uniffle.shuffle.manager;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.MapOutputTracker;
 import org.apache.spark.MapOutputTrackerMaster;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.SparkException;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.apache.spark.shuffle.RssSparkShuffleUtils;
 import org.apache.spark.shuffle.ShuffleManager;
 import org.apache.spark.shuffle.SparkVersionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
+
 public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterface, ShuffleManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(RssShuffleManagerBase.class);
   private AtomicBoolean isInitialized = new AtomicBoolean(false);
@@ -151,4 +160,23 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         
Optional.ofNullable(SparkEnv.get()).map(SparkEnv::mapOutputTracker).orElse(null);
     return tracker instanceof MapOutputTrackerMaster ? 
(MapOutputTrackerMaster) tracker : null;
   }
+
+  private Map<String, String> parseRemoteStorageConf(Configuration conf) {
+    Map<String, String> confItems = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : conf) {
+      confItems.put(entry.getKey(), entry.getValue());
+    }
+    return confItems;
+  }
+
+  protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) {
+    Map<String, String> confItems = Maps.newHashMap();
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) {
+      confItems = parseRemoteStorageConf(new Configuration(true));
+    }
+
+    return new RemoteStorageInfo(
+        sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), 
confItems);
+  }
 }
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b68a67d35..26cbd6412 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -306,8 +306,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     }
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    RemoteStorageInfo defaultRemoteStorage =
-        new 
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), 
""));
+    RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
     RemoteStorageInfo remoteStorage =
         ClientUtils.fetchRemoteStorage(
             appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 62a53ae5e..013a4acf1 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -410,8 +410,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     }
 
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
-    RemoteStorageInfo defaultRemoteStorage =
-        new 
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), 
""));
+    RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
     RemoteStorageInfo remoteStorage =
         ClientUtils.fetchRemoteStorage(
             id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 86eb1d950..f73268790 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -142,4 +142,12 @@ public class RssClientConf {
           .enumType(ClientType.class)
           .defaultValue(ClientType.GRPC)
           .withDescription("Supports GRPC, GRPC_NETTY");
+
+  public static final ConfigOption<Boolean> 
RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED =
+      ConfigOptions.key("rss.client.remote.storage.useLocalConfAsDefault")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "This option is only valid when the remote storage path is 
specified. If ture, "
+                  + "the remote storage conf will use the client side hadoop 
configuration loaded from the classpath.");
 }
diff --git a/docs/client_guide/spark_client_guide.md 
b/docs/client_guide/spark_client_guide.md
index d2c896ac5..75e2086a3 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -78,13 +78,14 @@ Local shuffle reader as its name indicates is suitable and 
optimized for spark's
 
 The important configuration is listed as following.
 
-|Property Name|Default|Description|
-|---|---|---|
-|spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
-|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
-|spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool 
of unregistering|
-|spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when 
doing unregister to remote shuffle-servers|
-|spark.rss.client.off.heap.memory.enable|false|The client use off heap memory 
to process data|
+| Property Name                                         | Default | 
Description                                                                     
                                                                                
               |
+|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.writer.buffer.spill.size                    | 128m    | Buffer 
size for total partition data                                                   
                                                                                
        |
+| spark.rss.client.send.size.limit                      | 16m     | The max 
data size sent to shuffle server                                                
                                                                                
       |
+| spark.rss.client.unregister.thread.pool.size          | 10      | The max 
size of thread pool of unregistering                                            
                                                                                
       |
+| spark.rss.client.unregister.request.timeout.sec       | 10      | The max 
timeout sec when doing unregister to remote shuffle-servers                     
                                                                                
       |
+| spark.rss.client.off.heap.memory.enable               | false   | The client 
use off heap memory to process data                                             
                                                                                
    |
+| spark.rss.client.remote.storage.useLocalConfAsDefault | false   | This 
option is only valid when the remote storage path is specified. If ture, the 
remote storage conf will use the client side hadoop configuration loaded from 
the classpath  |
 
 ### Adaptive Remote Shuffle Enabling 
 Currently, this feature only supports Spark. 

Reply via email to