This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 56b7f75f9f [GLUTEN-11245][CORE] Refactor getNativeSessionConf and
getNativeBackendConf for GlutenConfig (#11246)
56b7f75f9f is described below
commit 56b7f75f9f6a8ff15acf40bc1ebccb2acc0efca2
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Dec 8 09:31:36 2025 +0800
[GLUTEN-11245][CORE] Refactor getNativeSessionConf and getNativeBackendConf
for GlutenConfig (#11246)
---
.../apache/gluten/memory/NativeMemoryManager.scala | 3 +-
.../scala/org/apache/gluten/runtime/Runtime.scala | 5 +-
.../org/apache/gluten/config/GlutenConfig.scala | 96 +++++++++-------------
3 files changed, 44 insertions(+), 60 deletions(-)
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 3955756346..3559d7a213 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -53,7 +53,8 @@ object NativeMemoryManager {
rl,
ConfigUtil.serialize(
GlutenConfig
- .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+ .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
+ .asJava)
)
spillers.append(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long = phase match {
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index bd5ff1c185..219eb216df 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -56,8 +56,9 @@ object Runtime {
nmm.getHandle(),
ConfigUtil.serialize(
(GlutenConfig
- .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs))
- .asScala ++ extraConf.asScala).asJava)
+ .getNativeSessionConf(
+ backendName,
+ GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++
extraConf.asScala).asJava)
)
private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 4145b526b9..35584ebd0b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -21,13 +21,13 @@ import org.apache.gluten.shuffle.SupportsColumnarShuffle
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
-import com.google.common.collect.ImmutableList
import org.apache.hadoop.security.UserGroupInformation
import java.util
import java.util.Locale
import scala.collection.JavaConverters._
+import scala.collection.mutable
trait ShuffleWriterType {
val name: String
@@ -510,18 +510,13 @@ object GlutenConfig extends ConfigRegistry {
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
)
- /**
- * Get dynamic configs.
- *
- * TODO: Improve the get native conf logic.
- */
- def getNativeSessionConf(
- backendName: String,
- conf: Map[String, String]): util.Map[String, String] = {
- val nativeConfMap = new util.HashMap[String, String]()
- nativeConfMap.putAll(conf.filter(e => nativeKeys.contains(e._1)).asJava)
+ /** Get dynamic configs. */
+ def getNativeSessionConf(backendName: String, conf: Map[String, String]):
Map[String, String] = {
+ val nativeConfMap = mutable.Map[String, String](conf.filter {
+ case (key, _) => nativeKeys.contains(key)
+ }.toSeq: _*)
- val keyWithDefault = ImmutableList.of(
+ Seq(
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key,
SQLConf.IGNORE_MISSING_FILES.defaultValueString),
(
@@ -537,19 +532,17 @@ object GlutenConfig extends ConfigRegistry {
(SQLConf.MAP_KEY_DEDUP_POLICY.key,
SQLConf.MAP_KEY_DEDUP_POLICY.defaultValueString),
(SQLConf.SESSION_LOCAL_TIMEZONE.key,
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString),
(SQLConf.ANSI_ENABLED.key, SQLConf.ANSI_ENABLED.defaultValueString)
+ ).foreach { case (k, defaultValue) => nativeConfMap.put(k,
conf.getOrElse(k, defaultValue)) }
+
+ Seq(
+ (SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, ByteUnit.BYTE, (v: Long)
=> v.toString),
+ (SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, ByteUnit.BYTE, (v: Long) =>
v.toString),
+ (SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB, (v: Long) => (v *
1024).toString)
)
- keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
- GlutenConfigUtil.mapByteConfValue(
- conf,
- SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
- ByteUnit.BYTE)(
- v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
v.toString))
- GlutenConfigUtil.mapByteConfValue(
- conf,
- SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
- ByteUnit.BYTE)(v =>
nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
- GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER,
ByteUnit.KiB)(
- v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
+ .foreach {
+ case (k, unit, f) =>
+ GlutenConfigUtil.mapByteConfValue(conf, k, unit)(v =>
nativeConfMap.put(k, f(v)))
+ }
conf
.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key)
@@ -566,8 +559,11 @@ object GlutenConfig extends ConfigRegistry {
// Backend's dynamic session conf only.
val confPrefix = prefixOf(backendName)
conf
- .filter(entry => entry._1.startsWith(confPrefix) &&
!SQLConf.isStaticConfigKey(entry._1))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+ .filter {
+ case (k, _) =>
+ k.startsWith(confPrefix) && !SQLConf.isStaticConfigKey(k)
+ }
+ .foreach { case (k, v) => nativeConfMap.put(k, v) }
// Pass the latest tokens to native
nativeConfMap.put(
@@ -580,23 +576,21 @@ object GlutenConfig extends ConfigRegistry {
UserGroupInformation.getCurrentUser.getUserName)
// return
- nativeConfMap
+ nativeConfMap.toMap
}
/**
* Get static and dynamic configs. Some of the config is dynamic in spark,
but is static in
* gluten, these will be used to construct HiveConnector which intends
reused in velox
- *
- * TODO: Improve the get native conf logic.
*/
def getNativeBackendConf(
backendName: String,
conf: scala.collection.Map[String, String]): util.Map[String, String] = {
- val nativeConfMap = new util.HashMap[String, String]()
+ val nativeConfMap = mutable.HashMap.empty[String, String]
// some configs having default values
- val keyWithDefault = ImmutableList.of(
+ Seq(
(SPARK_S3_CONNECTION_SSL_ENABLED, "false"),
(SPARK_S3_PATH_STYLE_ACCESS, "true"),
(SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
@@ -623,8 +617,7 @@ object GlutenConfig extends ConfigRegistry {
("spark.gluten.velox.s3UseProxyFromEnv", "false"),
("spark.gluten.velox.s3PayloadSigningPolicy", "Never"),
(SQLConf.SESSION_LOCAL_TIMEZONE.key,
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
- )
- keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
+ ).foreach { case (k, defaultValue) => nativeConfMap.put(k,
conf.getOrElse(k, defaultValue)) }
val keys = Set(
DEBUG_ENABLED.key,
@@ -641,35 +634,24 @@ object GlutenConfig extends ConfigRegistry {
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
COLUMNAR_CUDF_ENABLED.key
)
- nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
- val confPrefix = prefixOf(backendName)
- conf
- .filter(_._1.startsWith(confPrefix))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+ nativeConfMap ++= conf.filter { case (k, _) => keys.contains(k) }
- // put in all S3 configs
- conf
- .filter(_._1.startsWith(HADOOP_PREFIX + S3A_PREFIX))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
- // handle ABFS config
- conf
- .filter(_._1.startsWith(HADOOP_PREFIX + ABFS_PREFIX))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
- // put in all GCS configs
- conf
- .filter(_._1.startsWith(HADOOP_PREFIX + GCS_PREFIX))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
-
- // put in all gluten velox configs
+ val confPrefix = prefixOf(backendName)
+ val s3Prefix = HADOOP_PREFIX + S3A_PREFIX
+ val azurePrefix = HADOOP_PREFIX + ABFS_PREFIX
+ val gsPrefix = HADOOP_PREFIX + GCS_PREFIX
+ val backendPrefix = s"spark.gluten.$backendName"
conf
- .filter(_._1.startsWith(s"spark.gluten.$backendName"))
- .foreach(entry => nativeConfMap.put(entry._1, entry._2))
+ .filter {
+ case (k, _) =>
+ k.startsWith(confPrefix) || k.startsWith(s3Prefix) ||
k.startsWith(azurePrefix) || k
+ .startsWith(gsPrefix) || k.startsWith(backendPrefix)
+ }
+ .foreach { case (k, v) => nativeConfMap.put(k, v) }
// return
- nativeConfMap
+ nativeConfMap.asJava
}
val GLUTEN_ENABLED = GlutenCoreConfig.GLUTEN_ENABLED
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]