This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 56b7f75f9f [GLUTEN-11245][CORE] Refactor getNativeSessionConf and 
getNativeBackendConf for GlutenConfig (#11246)
56b7f75f9f is described below

commit 56b7f75f9f6a8ff15acf40bc1ebccb2acc0efca2
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Dec 8 09:31:36 2025 +0800

    [GLUTEN-11245][CORE] Refactor getNativeSessionConf and getNativeBackendConf 
for GlutenConfig (#11246)
---
 .../apache/gluten/memory/NativeMemoryManager.scala |  3 +-
 .../scala/org/apache/gluten/runtime/Runtime.scala  |  5 +-
 .../org/apache/gluten/config/GlutenConfig.scala    | 96 +++++++++-------------
 3 files changed, 44 insertions(+), 60 deletions(-)

diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 3955756346..3559d7a213 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -53,7 +53,8 @@ object NativeMemoryManager {
       rl,
       ConfigUtil.serialize(
         GlutenConfig
-          .getNativeSessionConf(backendName, 
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+          .getNativeSessionConf(backendName, 
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
+          .asJava)
     )
     spillers.append(new Spiller() {
       override def spill(self: MemoryTarget, phase: Spiller.Phase, size: 
Long): Long = phase match {
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index bd5ff1c185..219eb216df 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -56,8 +56,9 @@ object Runtime {
       nmm.getHandle(),
       ConfigUtil.serialize(
         (GlutenConfig
-          .getNativeSessionConf(backendName, 
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
-          .asScala ++ extraConf.asScala).asJava)
+          .getNativeSessionConf(
+            backendName,
+            GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ 
extraConf.asScala).asJava)
     )
 
     private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 4145b526b9..35584ebd0b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -21,13 +21,13 @@ import org.apache.gluten.shuffle.SupportsColumnarShuffle
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
 
-import com.google.common.collect.ImmutableList
 import org.apache.hadoop.security.UserGroupInformation
 
 import java.util
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 trait ShuffleWriterType {
   val name: String
@@ -510,18 +510,13 @@ object GlutenConfig extends ConfigRegistry {
     "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
   )
 
-  /**
-   * Get dynamic configs.
-   *
-   * TODO: Improve the get native conf logic.
-   */
-  def getNativeSessionConf(
-      backendName: String,
-      conf: Map[String, String]): util.Map[String, String] = {
-    val nativeConfMap = new util.HashMap[String, String]()
-    nativeConfMap.putAll(conf.filter(e => nativeKeys.contains(e._1)).asJava)
+  /** Get dynamic configs. */
+  def getNativeSessionConf(backendName: String, conf: Map[String, String]): 
Map[String, String] = {
+    val nativeConfMap = mutable.Map[String, String](conf.filter {
+      case (key, _) => nativeKeys.contains(key)
+    }.toSeq: _*)
 
-    val keyWithDefault = ImmutableList.of(
+    Seq(
       (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
       (SQLConf.IGNORE_MISSING_FILES.key, 
SQLConf.IGNORE_MISSING_FILES.defaultValueString),
       (
@@ -537,19 +532,17 @@ object GlutenConfig extends ConfigRegistry {
       (SQLConf.MAP_KEY_DEDUP_POLICY.key, 
SQLConf.MAP_KEY_DEDUP_POLICY.defaultValueString),
       (SQLConf.SESSION_LOCAL_TIMEZONE.key, 
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString),
       (SQLConf.ANSI_ENABLED.key, SQLConf.ANSI_ENABLED.defaultValueString)
+    ).foreach { case (k, defaultValue) => nativeConfMap.put(k, 
conf.getOrElse(k, defaultValue)) }
+
+    Seq(
+      (SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, ByteUnit.BYTE, (v: Long) 
=> v.toString),
+      (SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, ByteUnit.BYTE, (v: Long) => 
v.toString),
+      (SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB, (v: Long) => (v * 
1024).toString)
     )
-    keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
-    GlutenConfigUtil.mapByteConfValue(
-      conf,
-      SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
-      ByteUnit.BYTE)(
-      v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, 
v.toString))
-    GlutenConfigUtil.mapByteConfValue(
-      conf,
-      SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
-      ByteUnit.BYTE)(v => 
nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
-    GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER, 
ByteUnit.KiB)(
-      v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
+      .foreach {
+        case (k, unit, f) =>
+          GlutenConfigUtil.mapByteConfValue(conf, k, unit)(v => 
nativeConfMap.put(k, f(v)))
+      }
 
     conf
       .get(SQLConf.LEGACY_TIME_PARSER_POLICY.key)
@@ -566,8 +559,11 @@ object GlutenConfig extends ConfigRegistry {
     // Backend's dynamic session conf only.
     val confPrefix = prefixOf(backendName)
     conf
-      .filter(entry => entry._1.startsWith(confPrefix) && 
!SQLConf.isStaticConfigKey(entry._1))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+      .filter {
+        case (k, _) =>
+          k.startsWith(confPrefix) && !SQLConf.isStaticConfigKey(k)
+      }
+      .foreach { case (k, v) => nativeConfMap.put(k, v) }
 
     // Pass the latest tokens to native
     nativeConfMap.put(
@@ -580,23 +576,21 @@ object GlutenConfig extends ConfigRegistry {
       UserGroupInformation.getCurrentUser.getUserName)
 
     // return
-    nativeConfMap
+    nativeConfMap.toMap
   }
 
   /**
    * Get static and dynamic configs. Some of the config is dynamic in spark, 
but is static in
    * gluten, these will be used to construct HiveConnector which intends 
reused in velox
-   *
-   * TODO: Improve the get native conf logic.
    */
   def getNativeBackendConf(
       backendName: String,
       conf: scala.collection.Map[String, String]): util.Map[String, String] = {
 
-    val nativeConfMap = new util.HashMap[String, String]()
+    val nativeConfMap = mutable.HashMap.empty[String, String]
 
     // some configs having default values
-    val keyWithDefault = ImmutableList.of(
+    Seq(
       (SPARK_S3_CONNECTION_SSL_ENABLED, "false"),
       (SPARK_S3_PATH_STYLE_ACCESS, "true"),
       (SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
@@ -623,8 +617,7 @@ object GlutenConfig extends ConfigRegistry {
       ("spark.gluten.velox.s3UseProxyFromEnv", "false"),
       ("spark.gluten.velox.s3PayloadSigningPolicy", "Never"),
       (SQLConf.SESSION_LOCAL_TIMEZONE.key, 
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
-    )
-    keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
+    ).foreach { case (k, defaultValue) => nativeConfMap.put(k, 
conf.getOrElse(k, defaultValue)) }
 
     val keys = Set(
       DEBUG_ENABLED.key,
@@ -641,35 +634,24 @@ object GlutenConfig extends ConfigRegistry {
       SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
       COLUMNAR_CUDF_ENABLED.key
     )
-    nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
 
-    val confPrefix = prefixOf(backendName)
-    conf
-      .filter(_._1.startsWith(confPrefix))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+    nativeConfMap ++= conf.filter { case (k, _) => keys.contains(k) }
 
-    // put in all S3 configs
-    conf
-      .filter(_._1.startsWith(HADOOP_PREFIX + S3A_PREFIX))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
-    // handle ABFS config
-    conf
-      .filter(_._1.startsWith(HADOOP_PREFIX + ABFS_PREFIX))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
-    // put in all GCS configs
-    conf
-      .filter(_._1.startsWith(HADOOP_PREFIX + GCS_PREFIX))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
-    // put in all gluten velox configs
+    val confPrefix = prefixOf(backendName)
+    val s3Prefix = HADOOP_PREFIX + S3A_PREFIX
+    val azurePrefix = HADOOP_PREFIX + ABFS_PREFIX
+    val gsPrefix = HADOOP_PREFIX + GCS_PREFIX
+    val backendPrefix = s"spark.gluten.$backendName"
     conf
-      .filter(_._1.startsWith(s"spark.gluten.$backendName"))
-      .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+      .filter {
+        case (k, _) =>
+          k.startsWith(confPrefix) || k.startsWith(s3Prefix) || 
k.startsWith(azurePrefix) || k
+            .startsWith(gsPrefix) || k.startsWith(backendPrefix)
+      }
+      .foreach { case (k, v) => nativeConfMap.put(k, v) }
 
     // return
-    nativeConfMap
+    nativeConfMap.asJava
   }
 
   val GLUTEN_ENABLED = GlutenCoreConfig.GLUTEN_ENABLED


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to