This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b25ced222 [VL] Should convert kSpillReadBufferSize and 
kShuffleSpillDiskWriteBufferSize to number (#8684)
9b25ced222 is described below

commit 9b25ced22267181679dc6aced69bb13cec404bd3
Author: Rex(Hui) An <[email protected]>
AuthorDate: Wed Feb 19 17:38:25 2025 +0800

    [VL] Should convert kSpillReadBufferSize and 
kShuffleSpillDiskWriteBufferSize to number (#8684)
---
 .../spark/sql/internal/GlutenConfigUtilSuite.scala | 36 ++++++++--------------
 .../org/apache/gluten/config/GlutenConfig.scala    | 34 +++++++++-----------
 .../spark/sql/internal/GlutenConfigUtil.scala      |  7 +++++
 3 files changed, 34 insertions(+), 43 deletions(-)

diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
 
b/gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
similarity index 52%
copy from 
shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
copy to 
gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
index 87b90938a0..f3ddabcd47 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++ 
b/gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
@@ -16,30 +16,20 @@
  */
 package org.apache.spark.sql.internal
 
-import org.apache.gluten.config._
+import org.apache.spark.network.util.ByteUnit
 
-object GlutenConfigUtil {
-  private def getConfString(configProvider: ConfigProvider, key: String, 
value: String): String = {
-    Option(ConfigEntry.findEntry(key))
-      .map {
-        _.readFrom(configProvider) match {
-          case o: Option[_] => o.map(_.toString).getOrElse(value)
-          case null => value
-          case v => v.toString
-        }
-      }
-      .getOrElse(value)
-  }
+import org.scalatest.funsuite.AnyFunSuite
+
+class GlutenConfigUtilSuite extends AnyFunSuite {
+
+  test("mapByteConfValue should return correct value") {
+    val conf = Map(
+      "spark.unsafe.sorter.spill.reader.buffer.size" -> "2m"
+    )
 
-  def parseConfig(conf: Map[String, String]): Map[String, String] = {
-    val provider = new 
MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
-    conf.map {
-      case (k, v) =>
-        if (k.startsWith("spark.gluten.")) {
-          (k, getConfString(provider, k, v))
-        } else {
-          (k, v)
-        }
-    }.toMap
+    GlutenConfigUtil.mapByteConfValue(
+      conf,
+      "spark.unsafe.sorter.spill.reader.buffer.size",
+      ByteUnit.BYTE)(v => assert(2097152L.equals(v)))
   }
 }
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6797639b23..3b86959631 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -17,8 +17,8 @@
 package org.apache.gluten.config
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf, 
SQLConfProvider}
 
 import com.google.common.collect.ImmutableList
 import org.apache.hadoop.security.UserGroupInformation
@@ -428,9 +428,7 @@ object GlutenConfig {
   val SPARK_REDACTION_REGEX = "spark.redaction.regex"
   val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
   val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = 
"spark.unsafe.sorter.spill.reader.buffer.size"
-  val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
   val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = 
"spark.shuffle.spill.diskWriteBufferSize"
-  val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
   val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
   val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
 
@@ -449,7 +447,7 @@ object GlutenConfig {
    */
   def getNativeSessionConf(
       backendName: String,
-      conf: scala.collection.Map[String, String]): util.Map[String, String] = {
+      conf: Map[String, String]): util.Map[String, String] = {
     val nativeConfMap = new util.HashMap[String, String]()
     val keys = Set(
       DEBUG_ENABLED.key,
@@ -504,24 +502,20 @@ object GlutenConfig {
       (
         GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
         GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
-      (
-        SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
-        SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
-      (
-        SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
-        SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
       (SPARK_SHUFFLE_SPILL_COMPRESS, 
SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
     )
     keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
-
-    conf
-      .get(SPARK_SHUFFLE_FILE_BUFFER)
-      .foreach(
-        v =>
-          nativeConfMap
-            .put(
-              SPARK_SHUFFLE_FILE_BUFFER,
-              (JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024).toString))
+    GlutenConfigUtil.mapByteConfValue(
+      conf,
+      SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
+      ByteUnit.BYTE)(
+      v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, 
v.toString))
+    GlutenConfigUtil.mapByteConfValue(
+      conf,
+      SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
+      ByteUnit.BYTE)(v => 
nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
+    GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER, 
ByteUnit.KiB)(
+      v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
 
     conf
       .get(LEGACY_TIME_PARSER_POLICY.key)
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
index 87b90938a0..d415378c70 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.internal
 
 import org.apache.gluten.config._
 
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+
 object GlutenConfigUtil {
   private def getConfString(configProvider: ConfigProvider, key: String, 
value: String): String = {
     Option(ConfigEntry.findEntry(key))
@@ -42,4 +44,9 @@ object GlutenConfigUtil {
         }
     }.toMap
   }
+
+  def mapByteConfValue(conf: Map[String, String], key: String, unit: ByteUnit)(
+      f: Long => Unit): Unit = {
+    conf.get(key).foreach(v => f(JavaUtils.byteStringAs(v, unit)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to