This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9b25ced222 [VL] Should convert kSpillReadBufferSize and
kShuffleSpillDiskWriteBufferSize to number (#8684)
9b25ced222 is described below
commit 9b25ced22267181679dc6aced69bb13cec404bd3
Author: Rex(Hui) An <[email protected]>
AuthorDate: Wed Feb 19 17:38:25 2025 +0800
[VL] Should convert kSpillReadBufferSize and
kShuffleSpillDiskWriteBufferSize to number (#8684)
---
.../spark/sql/internal/GlutenConfigUtilSuite.scala | 36 ++++++++--------------
.../org/apache/gluten/config/GlutenConfig.scala | 34 +++++++++-----------
.../spark/sql/internal/GlutenConfigUtil.scala | 7 +++++
3 files changed, 34 insertions(+), 43 deletions(-)
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
b/gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
similarity index 52%
copy from
shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
copy to
gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
index 87b90938a0..f3ddabcd47 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++
b/gluten-core/src/test/scala/org/apache/spark/sql/internal/GlutenConfigUtilSuite.scala
@@ -16,30 +16,20 @@
*/
package org.apache.spark.sql.internal
-import org.apache.gluten.config._
+import org.apache.spark.network.util.ByteUnit
-object GlutenConfigUtil {
- private def getConfString(configProvider: ConfigProvider, key: String,
value: String): String = {
- Option(ConfigEntry.findEntry(key))
- .map {
- _.readFrom(configProvider) match {
- case o: Option[_] => o.map(_.toString).getOrElse(value)
- case null => value
- case v => v.toString
- }
- }
- .getOrElse(value)
- }
+import org.scalatest.funsuite.AnyFunSuite
+
+class GlutenConfigUtilSuite extends AnyFunSuite {
+
+ test("mapByteConfValue should return correct value") {
+ val conf = Map(
+ "spark.unsafe.sorter.spill.reader.buffer.size" -> "2m"
+ )
- def parseConfig(conf: Map[String, String]): Map[String, String] = {
- val provider = new
MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
- conf.map {
- case (k, v) =>
- if (k.startsWith("spark.gluten.")) {
- (k, getConfString(provider, k, v))
- } else {
- (k, v)
- }
- }.toMap
+ GlutenConfigUtil.mapByteConfValue(
+ conf,
+ "spark.unsafe.sorter.spill.reader.buffer.size",
+ ByteUnit.BYTE)(v => assert(2097152L.equals(v)))
}
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6797639b23..3b86959631 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -17,8 +17,8 @@
package org.apache.gluten.config
import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf,
SQLConfProvider}
import com.google.common.collect.ImmutableList
import org.apache.hadoop.security.UserGroupInformation
@@ -428,9 +428,7 @@ object GlutenConfig {
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
"spark.unsafe.sorter.spill.reader.buffer.size"
- val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE =
"spark.shuffle.spill.diskWriteBufferSize"
- val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
@@ -449,7 +447,7 @@ object GlutenConfig {
*/
def getNativeSessionConf(
backendName: String,
- conf: scala.collection.Map[String, String]): util.Map[String, String] = {
+ conf: Map[String, String]): util.Map[String, String] = {
val nativeConfMap = new util.HashMap[String, String]()
val keys = Set(
DEBUG_ENABLED.key,
@@ -504,24 +502,20 @@ object GlutenConfig {
(
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
- (
- SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
- SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
- (
- SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
- SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
(SPARK_SHUFFLE_SPILL_COMPRESS,
SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
-
- conf
- .get(SPARK_SHUFFLE_FILE_BUFFER)
- .foreach(
- v =>
- nativeConfMap
- .put(
- SPARK_SHUFFLE_FILE_BUFFER,
- (JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024).toString))
+ GlutenConfigUtil.mapByteConfValue(
+ conf,
+ SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
+ ByteUnit.BYTE)(
+ v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
v.toString))
+ GlutenConfigUtil.mapByteConfValue(
+ conf,
+ SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
+ ByteUnit.BYTE)(v =>
nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
+ GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER,
ByteUnit.KiB)(
+ v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
conf
.get(LEGACY_TIME_PARSER_POLICY.key)
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
index 87b90938a0..d415378c70 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.internal
import org.apache.gluten.config._
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+
object GlutenConfigUtil {
private def getConfString(configProvider: ConfigProvider, key: String,
value: String): String = {
Option(ConfigEntry.findEntry(key))
@@ -42,4 +44,9 @@ object GlutenConfigUtil {
}
}.toMap
}
+
+ def mapByteConfValue(conf: Map[String, String], key: String, unit: ByteUnit)(
+ f: Long => Unit): Unit = {
+ conf.get(key).foreach(v => f(JavaUtils.byteStringAs(v, unit)))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]