This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1601f2605 [CORE] Enhance gluten config parsing (#5357)
1601f2605 is described below

commit 1601f2605865a2e94b0795b64b7506bec67860b6
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Apr 11 14:58:05 2024 +0800

    [CORE] Enhance gluten config parsing (#5357)
---
 .../gluten/backendsapi/velox/ListenerApiImpl.scala |  3 +-
 .../scala/org/apache/gluten/exec/Runtime.scala     |  3 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 85 ++++++++++------------
 .../spark/sql/internal/GlutenConfigUtil.scala      | 31 ++++++++
 4 files changed, 73 insertions(+), 49 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
index 3cca808c6..2b558a2da 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
@@ -28,6 +28,7 @@ import org.apache.gluten.vectorized.{JniLibLoader, 
JniWorkspace}
 
 import org.apache.spark.SparkConf
 import 
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, 
VeloxParquetWriterInjects, VeloxRowSplitter}
+import org.apache.spark.sql.internal.GlutenConfigUtil
 import org.apache.spark.sql.internal.StaticSQLConf
 
 import org.apache.commons.lang3.StringUtils
@@ -164,7 +165,7 @@ class ListenerApiImpl extends ListenerApi {
       loader.mapAndLoad(VeloxBackend.BACKEND_NAME, false)
     }
 
-    initializeNative(conf.getAll.toMap)
+    initializeNative(GlutenConfigUtil.parseConfig(conf.getAll.toMap))
 
     // inject backend-specific implementations to override spark classes
     // FIXME: The following set instances twice in local mode?
diff --git a/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala 
b/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
index 2ade10ce9..12d855c71 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.init.JniUtils
 
+import org.apache.spark.sql.internal.GlutenConfigUtil
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.TaskResource
 
@@ -32,7 +33,7 @@ class Runtime private[exec] () extends TaskResource {
     JniUtils.toNativeConf(
       GlutenConfig.getNativeSessionConf(
         BackendsApiManager.getSettings.getBackendConfigPrefix,
-        SQLConf.get.getAllConfs))
+        GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
   )
 
   private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index d2b0fd78f..9039d2b8d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -27,6 +27,7 @@ import java.util
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
 
 case class GlutenNumaBindingInfo(
@@ -338,31 +339,31 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def enableVeloxFlushablePartialAggregation: Boolean =
     conf.getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
-  def maxFlushableAggregationMemoryRatio: Option[Double] =
+  def maxFlushableAggregationMemoryRatio: Double =
     conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
-  def maxExtendedFlushableAggregationMemoryRatio: Option[Double] =
+  def maxExtendedFlushableAggregationMemoryRatio: Double =
     conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
-  def abandonFlushableAggregationMinPct: Option[Int] =
+  def abandonFlushableAggregationMinPct: Int =
     conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
-  def abandonFlushableAggregationMinRows: Option[Int] =
+  def abandonFlushableAggregationMinRows: Int =
     conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
 
   // Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` 
instead
   def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED)
 
-  def directorySizeGuess: Option[Int] =
+  def directorySizeGuess: Long =
     conf.getConf(DIRECTORY_SIZE_GUESS)
-  def filePreloadThreshold: Option[Int] =
+  def filePreloadThreshold: Long =
     conf.getConf(FILE_PRELOAD_THRESHOLD)
-  def prefetchRowGroups: Option[Int] =
+  def prefetchRowGroups: Int =
     conf.getConf(PREFETCH_ROW_GROUPS)
-  def loadQuantum: Option[Int] =
+  def loadQuantum: Long =
     conf.getConf(LOAD_QUANTUM)
-  def maxCoalescedDistanceBytes: Option[Int] =
+  def maxCoalescedDistanceBytes: Long =
     conf.getConf(MAX_COALESCED_DISTANCE_BYTES)
-  def maxCoalescedBytes: Option[Int] =
+  def maxCoalescedBytes: Long =
     conf.getConf(MAX_COALESCED_BYTES)
-  def cachePrefetchMinPct: Option[Int] =
+  def cachePrefetchMinPct: Int =
     conf.getConf(CACHE_PREFETCH_MINPCT)
 
   def enableColumnarProjectCollapse: Boolean = 
conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
@@ -536,7 +537,7 @@ object GlutenConfig {
       backendPrefix: String,
       conf: scala.collection.Map[String, String]): util.Map[String, String] = {
     val nativeConfMap = new util.HashMap[String, String]()
-    val keys = ImmutableList.of(
+    val keys = Set(
       GLUTEN_DEBUG_MODE,
       GLUTEN_SAVE_DIR,
       GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
@@ -564,16 +565,11 @@ object GlutenConfig {
       SPARK_GCS_AUTH_TYPE,
       SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE
     )
-    keys.forEach(
-      k => {
-        if (conf.contains(k)) {
-          nativeConfMap.put(k, conf(k))
-        }
-      })
+    nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
 
     val keyWithDefault = ImmutableList.of(
-      (SQLConf.CASE_SENSITIVE.key, "false"),
-      (SQLConf.IGNORE_MISSING_FILES.key, "false")
+      (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
+      (SQLConf.IGNORE_MISSING_FILES.key, 
SQLConf.IGNORE_MISSING_FILES.defaultValueString)
     )
     keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
 
@@ -631,7 +627,7 @@ object GlutenConfig {
     )
     keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
 
-    val keys = ImmutableList.of(
+    val keys = Set(
       GLUTEN_DEBUG_MODE,
       // datasource config
       SPARK_SQL_PARQUET_COMPRESSION_CODEC,
@@ -641,12 +637,7 @@ object GlutenConfig {
       GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
       GLUTEN_OFFHEAP_ENABLED
     )
-    keys.forEach(
-      k => {
-        if (conf.contains(k)) {
-          nativeConfMap.put(k, conf(k))
-        }
-      })
+    nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
 
     conf
       .filter(_._1.startsWith(backendPrefix))
@@ -1035,8 +1026,8 @@ object GlutenConfig {
   val COLUMNAR_PARQUET_WRITE_BLOCK_SIZE =
     buildConf("spark.gluten.sql.columnar.parquet.write.blockSize")
       .internal()
-      .longConf
-      .createWithDefault(128 * 1024 * 1024)
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("128MB")
 
   val COLUMNAR_PARQUET_WRITE_BLOCK_ROWS =
     buildConf("spark.gluten.sql.native.parquet.write.blockRows")
@@ -1501,8 +1492,8 @@ object GlutenConfig {
     buildConf("spark.gluten.sql.text.input.max.block.size")
       .internal()
       .doc("the max block size for text input rows")
-      .longConf
-      .createWithDefault(8192);
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("8KB");
 
   val TEXT_INPUT_EMPTY_AS_DEFAULT =
     buildConf("spark.gluten.sql.text.input.empty.as.default")
@@ -1539,7 +1530,7 @@ object GlutenConfig {
           
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false."
       )
       .doubleConf
-      .createOptional
+      .createWithDefault(0.1)
 
   val MAX_EXTENDED_PARTIAL_AGGREGATION_MEMORY_RATIO =
     
buildConf("spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio")
@@ -1551,7 +1542,7 @@ object GlutenConfig {
           
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false."
       )
       .doubleConf
-      .createOptional
+      .createWithDefault(0.15)
 
   val ABANDON_PARTIAL_AGGREGATION_MIN_PCT =
     
buildConf("spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct")
@@ -1562,7 +1553,7 @@ object GlutenConfig {
           "flushable partial aggregation is enabled. Ignored when " +
           
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.")
       .intConf
-      .createOptional
+      .createWithDefault(90)
 
   val ABANDON_PARTIAL_AGGREGATION_MIN_ROWS =
     
buildConf("spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows")
@@ -1573,7 +1564,7 @@ object GlutenConfig {
           "flushable partial aggregation is enabled. Ignored when " +
           
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.")
       .intConf
-      .createOptional
+      .createWithDefault(100000)
 
   val ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON =
     buildConf("spark.gluten.sql.rewrite.dateTimestampComparison")
@@ -1675,50 +1666,50 @@ object GlutenConfig {
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.directorySizeGuess")
       .internal()
       .doc("Set the directory size guess for velox file scan")
-      .intConf
-      .createOptional
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("32KB")
 
   val FILE_PRELOAD_THRESHOLD =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.filePreloadThreshold")
       .internal()
       .doc("Set the file preload threshold for velox file scan")
-      .intConf
-      .createOptional
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("1MB")
 
   val PREFETCH_ROW_GROUPS =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.prefetchRowGroups")
       .internal()
       .doc("Set the prefetch row groups for velox file scan")
       .intConf
-      .createOptional
+      .createWithDefault(1)
 
   val LOAD_QUANTUM =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.loadQuantum")
       .internal()
       .doc("Set the load quantum for velox file scan")
-      .intConf
-      .createOptional
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("256MB")
 
   val MAX_COALESCED_DISTANCE_BYTES =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.maxCoalescedDistanceBytes")
       .internal()
       .doc(" Set the max coalesced distance bytes for velox file scan")
-      .intConf
-      .createOptional
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("1MB")
 
   val MAX_COALESCED_BYTES =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes")
       .internal()
       .doc("Set the max coalesced bytes for velox file scan")
-      .intConf
-      .createOptional
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")
 
   val CACHE_PREFETCH_MINPCT =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct")
       .internal()
       .doc("Set prefetch cache min pct for velox file scan")
       .intConf
-      .createOptional
+      .createWithDefault(0)
 
   val AWS_SDK_LOG_LEVEL =
     buildConf("spark.gluten.velox.awsSdkLogLevel")
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
new file mode 100644
index 000000000..babb446d0
--- /dev/null
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.internal.config.ConfigReader
+
+import scala.collection.JavaConverters._
+
+object GlutenConfigUtil {
+  def parseConfig(conf: Map[String, String]): Map[String, String] = {
+    val reader = new 
ConfigReader(conf.filter(_._1.contains("spark.gluten.")).asJava)
+    val glutenConfigEntries =
+      SQLConf.getConfigEntries().asScala.filter(e => 
e.key.contains("spark.gluten."))
+    val glutenConfig = glutenConfigEntries.map(e => (e.key, 
e.readFrom(reader).toString)).toMap
+    conf.map(e => (e._1, glutenConfig.getOrElse(e._1, e._2)))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to