This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fa4fa941b3 Add RichSparkConf to simplify the interoperations with
gluten config entries (#9876)
fa4fa941b3 is described below
commit fa4fa941b3a538d318a28775e40d81c670994a8f
Author: Kent Yao <[email protected]>
AuthorDate: Fri Jun 6 11:08:23 2025 +0800
Add RichSparkConf to simplify the interoperations with gluten config
entries (#9876)
* Add aRichSparkConf to simplify interoperations gluten config entries
* Add aRichSparkConf to simplify interoperations gluten config entries
* Add aRichSparkConf to simplify interoperations gluten config entries
* Add aRichSparkConf to simplify interoperations gluten config entries
* Add aRichSparkConf to simplify interoperations gluten config entries
---
.../backendsapi/velox/VeloxListenerApi.scala | 29 +++++------
.../apache/gluten/utils/SharedLibraryLoader.scala | 9 ++--
.../scala/org/apache/gluten/GlutenPlugin.scala | 56 ++++++++--------------
.../org/apache/gluten/expression/UDFMappings.scala | 7 +--
.../spark/sql/internal/SparkConfigUtil.scala | 37 +++++++++++++-
5 files changed, 74 insertions(+), 64 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 778d79ab8b..a5c16bf880 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -44,6 +44,7 @@ import
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import
org.apache.spark.sql.execution.datasources.velox.{VeloxParquetWriterInjects,
VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
+import org.apache.spark.sql.internal.SparkConfigUtil._
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil,
SparkShutdownManagerUtil}
import org.apache.commons.lang3.StringUtils
@@ -60,18 +61,15 @@ class VeloxListenerApi extends ListenerApi with Logging {
// When the Velox cache is enabled, the Velox file handle cache should
also be enabled.
// Otherwise, a 'reference id not found' error may occur.
if (
- conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
- !conf.getBoolean(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, false)
+ conf.get(COLUMNAR_VELOX_CACHE_ENABLED) &&
+ !conf.get(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED)
) {
throw new IllegalArgumentException(
s"${COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
s"${COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled
together.")
}
- if (
- conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
- conf.getSizeAsBytes(LOAD_QUANTUM.key, LOAD_QUANTUM.defaultValueString) >
8 * 1024 * 1024
- ) {
+ if (conf.get(COLUMNAR_VELOX_CACHE_ENABLED) && conf.get(LOAD_QUANTUM) > 8 *
1024 * 1024) {
throw new IllegalArgumentException(
s"Velox currently only support up to 8MB load quantum size " +
s"on SSD cache enabled by ${COLUMNAR_VELOX_CACHE_ENABLED.key}, " +
@@ -101,13 +99,11 @@ class VeloxListenerApi extends ListenerApi with Logging {
s" the recommended size
${ByteUnit.BYTE.toMiB(desiredOverheadSize)}MiB." +
s" This may cause OOM.")
}
- conf.set(GlutenConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key,
overheadSize.toString)
+ conf.set(GlutenConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES, overheadSize)
// Sql table cache serializer.
- if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key,
defaultValue = false)) {
- conf.set(
- StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
- classOf[ColumnarCachedBatchSerializer].getName)
+ if (conf.get(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
+ conf.set(StaticSQLConf.SPARK_CACHE_SERIALIZER,
classOf[ColumnarCachedBatchSerializer].getName)
}
// Static initializers for driver.
@@ -155,8 +151,8 @@ class VeloxListenerApi extends ListenerApi with Logging {
private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
// Sets this configuration only once, since not undoable.
// DebugInstance should be created first.
- if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key,
defaultValue = false)) {
- val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key)
+ if (conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE)) {
+ val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR)
JniWorkspace.enableDebug(debugDir)
} else {
JniWorkspace.initializeDefault(
@@ -203,11 +199,11 @@ class VeloxListenerApi extends ListenerApi with Logging {
SharedLibraryLoader.load(conf, loader)
// Load backend libraries.
- val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH.key, StringUtils.EMPTY)
+ val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH)
if (StringUtils.isNotBlank(libPath)) { // Path based load. Ignore all
other loadees.
JniLibLoader.loadFromPath(libPath)
} else {
- val baseLibName = conf.get(GlutenConfig.GLUTEN_LIB_NAME.key, "gluten")
+ val baseLibName = conf.get(GlutenConfig.GLUTEN_LIB_NAME)
loader.load(s"$platformLibDir/${System.mapLibraryName(baseLibName)}")
loader.load(s"$platformLibDir/${System.mapLibraryName(VeloxBackend.BACKEND_NAME)}")
}
@@ -225,8 +221,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
}
private def addIfNeedMemoryDumpShutdownHook(conf: SparkConf): Unit = {
- val memoryDumpOnExit =
- conf.get(MEMORY_DUMP_ON_EXIT.key,
MEMORY_DUMP_ON_EXIT.defaultValueString).toBoolean
+ val memoryDumpOnExit = conf.get(MEMORY_DUMP_ON_EXIT)
if (memoryDumpOnExit) {
SparkShutdownManagerUtil.addHook(
() => {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala
index 3632202de9..109f3f014a 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.exception.GlutenException
import org.apache.gluten.jni.JniLibLoader
import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.SparkConfigUtil._
import scala.sys.process._
@@ -30,9 +31,7 @@ trait SharedLibraryLoader {
object SharedLibraryLoader {
def load(conf: SparkConf, jni: JniLibLoader): Unit = {
- val shouldLoad = conf.getBoolean(
- GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR.key,
- GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR.defaultValue.get)
+ val shouldLoad = conf.get(GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR)
if (!shouldLoad) {
return
}
@@ -54,9 +53,9 @@ object SharedLibraryLoader {
}
private def find(conf: SparkConf): SharedLibraryLoader = {
- val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS.key)
+ val systemName = conf.get(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
- val systemVersion =
conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION.key)
+ val systemVersion = conf.get(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION.key} must be specified
when specifies the " +
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 611e4e398c..95bebac5ff 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -33,7 +33,8 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.softaffinity.SoftAffinityListener
import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener,
GlutenUIUtils}
-import org.apache.spark.sql.internal.{SparkConfigUtil, SQLConf}
+import org.apache.spark.sql.internal.SparkConfigUtil._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
import org.apache.spark.task.TaskResources
import org.apache.spark.util.SparkResourceUtil
@@ -62,11 +63,7 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// Register Gluten listeners
GlutenSQLAppStatusListener.register(sc)
- if (
- conf.getBoolean(
- GLUTEN_SOFT_AFFINITY_ENABLED.key,
- GLUTEN_SOFT_AFFINITY_ENABLED.defaultValue.get)
- ) {
+ if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
SoftAffinityListener.register(sc)
}
@@ -137,19 +134,13 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
}
private def checkOffHeapSettings(conf: SparkConf): Unit = {
- if (
- conf.getBoolean(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
- ) {
+ if (conf.get(DYNAMIC_OFFHEAP_SIZING_ENABLED)) {
// When dynamic off-heap sizing is enabled, off-heap mode is not
strictly required to be
// enabled. Skip the check.
return
}
- if (
- conf.getBoolean(COLUMNAR_MEMORY_UNTRACKED.key,
COLUMNAR_MEMORY_UNTRACKED.defaultValue.get)
- ) {
+ if (conf.get(COLUMNAR_MEMORY_UNTRACKED)) {
// When untracked memory mode is enabled, off-heap mode is not strictly
required to be
// enabled. Skip the check.
return
@@ -169,22 +160,19 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
private def setPredefinedConfigs(conf: SparkConf): Unit = {
// Spark SQL extensions
- val extensionSeq =
- SparkConfigUtil.getEntryValue(conf,
SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
+ val extensionSeq = conf.get(SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
if
(!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
{
conf.set(
- SPARK_SESSION_EXTENSIONS.key,
- (extensionSeq :+
GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME).mkString(","))
+ SPARK_SESSION_EXTENSIONS,
+ Some(extensionSeq :+
GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
}
// adaptive custom cost evaluator class
- val enableGlutenCostEvaluator = conf.getBoolean(
- GlutenConfig.COST_EVALUATOR_ENABLED.key,
- GlutenConfig.COST_EVALUATOR_ENABLED.defaultValue.get)
+ val enableGlutenCostEvaluator =
conf.get(GlutenConfig.COST_EVALUATOR_ENABLED)
if (enableGlutenCostEvaluator) {
conf.set(
- SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key,
- classOf[GlutenCostEvaluator].getName)
+ SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS,
+ Some(classOf[GlutenCostEvaluator].getName))
}
// check memory off-heap enabled and size.
@@ -194,39 +182,33 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
// Set off-heap size in bytes.
- conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString)
+ conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES, offHeapSize)
// Set off-heap size in bytes per task.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
- conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
+ conf.set(NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)
val offHeapPerTask = offHeapSize / taskSlots
- conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, offHeapPerTask.toString)
+ conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES, offHeapPerTask)
// Pessimistic off-heap sizes, with the assumption that all non-borrowable
storage memory
// determined by spark.memory.storageFraction was used.
val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
- conf.set(
- COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
- conservativeOffHeapPerTask.toString)
+ conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES,
conservativeOffHeapPerTask)
// Disable vanilla columnar readers, to prevent columnar-to-columnar
conversions.
// FIXME: Do we still need this trick since
// https://github.com/apache/incubator-gluten/pull/1931 was merged?
- if (
- !conf.getBoolean(
- VANILLA_VECTORIZED_READERS_ENABLED.key,
- VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
- ) {
+ if (!conf.get(VANILLA_VECTORIZED_READERS_ENABLED)) {
// FIXME Hongze 22/12/06
// BatchScan.scala in shim was not always loaded by class loader.
// The file should be removed and the "ClassCastException" issue caused
by
// spark.sql.<format>.enableVectorizedReader=true should be fixed in
another way.
// Before the issue is fixed we force the use of vanilla row reader by
using
// the following statement.
- conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
- conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
- conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false")
+ conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, false)
+ conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED, false)
+ conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED, false)
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/UDFMappings.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/UDFMappings.scala
index 568688c0bd..baaa6b2d64 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/UDFMappings.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/UDFMappings.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SparkConfigUtil._
import org.apache.commons.lang3.StringUtils
@@ -58,19 +59,19 @@ object UDFMappings extends Logging {
}
def loadFromSparkConf(conf: SparkConf): Unit = {
- val strHiveUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_HIVE_UDFS.key, "")
+ val strHiveUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_HIVE_UDFS)
if (!StringUtils.isBlank(strHiveUDFs)) {
parseStringToMap(strHiveUDFs, hiveUDFMap)
logDebug(s"loaded hive udf mappings:${hiveUDFMap.toString}")
}
- val strPythonUDFs =
conf.get(GlutenConfig.GLUTEN_SUPPORTED_PYTHON_UDFS.key, "")
+ val strPythonUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_PYTHON_UDFS)
if (!StringUtils.isBlank(strPythonUDFs)) {
parseStringToMap(strPythonUDFs, pythonUDFMap)
logDebug(s"loaded python udf mappings:${pythonUDFMap.toString}")
}
- val strScalaUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_SCALA_UDFS.key,
"")
+ val strScalaUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_SCALA_UDFS)
if (!StringUtils.isBlank(strScalaUDFs)) {
parseStringToMap(strScalaUDFs, scalaUDFMap)
logDebug(s"loaded scala udf mappings:${scalaUDFMap.toString}")
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala
b/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala
index 945174073e..9b05aa75d6 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala
@@ -16,11 +16,44 @@
*/
package org.apache.spark.sql.internal
+import org.apache.gluten.config.ConfigEntry
+
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.internal.config.{ConfigEntry => SparkConfigEntry}
object SparkConfigUtil {
- def getEntryValue[T](conf: SparkConf, entry: ConfigEntry[T]): T = {
+
+ implicit class RichSparkConf(val conf: SparkConf) {
+ def get[T](entry: SparkConfigEntry[T]): T = {
+ SparkConfigUtil.get(conf, entry)
+ }
+
+ def get[T](entry: ConfigEntry[T]): T = {
+ SparkConfigUtil.get(conf, entry)
+ }
+
+ def set[T](entry: SparkConfigEntry[T], value: T): SparkConf = {
+ SparkConfigUtil.set(conf, entry, value)
+ }
+
+ def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
+ SparkConfigUtil.set(conf, entry, value)
+ }
+ }
+
+ def get[T](conf: SparkConf, entry: SparkConfigEntry[T]): T = {
conf.get(entry)
}
+
+ def get[T](conf: SparkConf, entry: ConfigEntry[T]): T = {
+ entry.valueConverter(conf.get(entry.key, entry.defaultValueString))
+ }
+
+ def set[T](conf: SparkConf, entry: SparkConfigEntry[T], value: T): SparkConf
= {
+ conf.set(entry, value)
+ }
+
+ def set[T](conf: SparkConf, entry: ConfigEntry[T], value: T): SparkConf = {
+ conf.set(entry.key, entry.stringConverter(value))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]