This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1601f2605 [CORE] Enhance gluten config parsing (#5357)
1601f2605 is described below
commit 1601f2605865a2e94b0795b64b7506bec67860b6
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Apr 11 14:58:05 2024 +0800
[CORE] Enhance gluten config parsing (#5357)
---
.../gluten/backendsapi/velox/ListenerApiImpl.scala | 3 +-
.../scala/org/apache/gluten/exec/Runtime.scala | 3 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 85 ++++++++++------------
.../spark/sql/internal/GlutenConfigUtil.scala | 31 ++++++++
4 files changed, 73 insertions(+), 49 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
index 3cca808c6..2b558a2da 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/ListenerApiImpl.scala
@@ -28,6 +28,7 @@ import org.apache.gluten.vectorized.{JniLibLoader,
JniWorkspace}
import org.apache.spark.SparkConf
import
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects,
VeloxParquetWriterInjects, VeloxRowSplitter}
+import org.apache.spark.sql.internal.GlutenConfigUtil
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.commons.lang3.StringUtils
@@ -164,7 +165,7 @@ class ListenerApiImpl extends ListenerApi {
loader.mapAndLoad(VeloxBackend.BACKEND_NAME, false)
}
- initializeNative(conf.getAll.toMap)
+ initializeNative(GlutenConfigUtil.parseConfig(conf.getAll.toMap))
// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
diff --git a/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
b/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
index 2ade10ce9..12d855c71 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/exec/Runtime.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.init.JniUtils
+import org.apache.spark.sql.internal.GlutenConfigUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.TaskResource
@@ -32,7 +33,7 @@ class Runtime private[exec] () extends TaskResource {
JniUtils.toNativeConf(
GlutenConfig.getNativeSessionConf(
BackendsApiManager.getSettings.getBackendConfigPrefix,
- SQLConf.get.getAllConfs))
+ GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
)
private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index d2b0fd78f..9039d2b8d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -27,6 +27,7 @@ import java.util
import java.util.Locale
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
case class GlutenNumaBindingInfo(
@@ -338,31 +339,31 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableVeloxFlushablePartialAggregation: Boolean =
conf.getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
- def maxFlushableAggregationMemoryRatio: Option[Double] =
+ def maxFlushableAggregationMemoryRatio: Double =
conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
- def maxExtendedFlushableAggregationMemoryRatio: Option[Double] =
+ def maxExtendedFlushableAggregationMemoryRatio: Double =
conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
- def abandonFlushableAggregationMinPct: Option[Int] =
+ def abandonFlushableAggregationMinPct: Int =
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
- def abandonFlushableAggregationMinRows: Option[Int] =
+ def abandonFlushableAggregationMinRows: Int =
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
// Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()`
instead
def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED)
- def directorySizeGuess: Option[Int] =
+ def directorySizeGuess: Long =
conf.getConf(DIRECTORY_SIZE_GUESS)
- def filePreloadThreshold: Option[Int] =
+ def filePreloadThreshold: Long =
conf.getConf(FILE_PRELOAD_THRESHOLD)
- def prefetchRowGroups: Option[Int] =
+ def prefetchRowGroups: Int =
conf.getConf(PREFETCH_ROW_GROUPS)
- def loadQuantum: Option[Int] =
+ def loadQuantum: Long =
conf.getConf(LOAD_QUANTUM)
- def maxCoalescedDistanceBytes: Option[Int] =
+ def maxCoalescedDistanceBytes: Long =
conf.getConf(MAX_COALESCED_DISTANCE_BYTES)
- def maxCoalescedBytes: Option[Int] =
+ def maxCoalescedBytes: Long =
conf.getConf(MAX_COALESCED_BYTES)
- def cachePrefetchMinPct: Option[Int] =
+ def cachePrefetchMinPct: Int =
conf.getConf(CACHE_PREFETCH_MINPCT)
def enableColumnarProjectCollapse: Boolean =
conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
@@ -536,7 +537,7 @@ object GlutenConfig {
backendPrefix: String,
conf: scala.collection.Map[String, String]): util.Map[String, String] = {
val nativeConfMap = new util.HashMap[String, String]()
- val keys = ImmutableList.of(
+ val keys = Set(
GLUTEN_DEBUG_MODE,
GLUTEN_SAVE_DIR,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
@@ -564,16 +565,11 @@ object GlutenConfig {
SPARK_GCS_AUTH_TYPE,
SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE
)
- keys.forEach(
- k => {
- if (conf.contains(k)) {
- nativeConfMap.put(k, conf(k))
- }
- })
+ nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
val keyWithDefault = ImmutableList.of(
- (SQLConf.CASE_SENSITIVE.key, "false"),
- (SQLConf.IGNORE_MISSING_FILES.key, "false")
+ (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
+ (SQLConf.IGNORE_MISSING_FILES.key,
SQLConf.IGNORE_MISSING_FILES.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
@@ -631,7 +627,7 @@ object GlutenConfig {
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
- val keys = ImmutableList.of(
+ val keys = Set(
GLUTEN_DEBUG_MODE,
// datasource config
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
@@ -641,12 +637,7 @@ object GlutenConfig {
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_ENABLED
)
- keys.forEach(
- k => {
- if (conf.contains(k)) {
- nativeConfMap.put(k, conf(k))
- }
- })
+ nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
conf
.filter(_._1.startsWith(backendPrefix))
@@ -1035,8 +1026,8 @@ object GlutenConfig {
val COLUMNAR_PARQUET_WRITE_BLOCK_SIZE =
buildConf("spark.gluten.sql.columnar.parquet.write.blockSize")
.internal()
- .longConf
- .createWithDefault(128 * 1024 * 1024)
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("128MB")
val COLUMNAR_PARQUET_WRITE_BLOCK_ROWS =
buildConf("spark.gluten.sql.native.parquet.write.blockRows")
@@ -1501,8 +1492,8 @@ object GlutenConfig {
buildConf("spark.gluten.sql.text.input.max.block.size")
.internal()
.doc("the max block size for text input rows")
- .longConf
- .createWithDefault(8192);
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8KB");
val TEXT_INPUT_EMPTY_AS_DEFAULT =
buildConf("spark.gluten.sql.text.input.empty.as.default")
@@ -1539,7 +1530,7 @@ object GlutenConfig {
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false."
)
.doubleConf
- .createOptional
+ .createWithDefault(0.1)
val MAX_EXTENDED_PARTIAL_AGGREGATION_MEMORY_RATIO =
buildConf("spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio")
@@ -1551,7 +1542,7 @@ object GlutenConfig {
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false."
)
.doubleConf
- .createOptional
+ .createWithDefault(0.15)
val ABANDON_PARTIAL_AGGREGATION_MIN_PCT =
buildConf("spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct")
@@ -1562,7 +1553,7 @@ object GlutenConfig {
"flushable partial aggregation is enabled. Ignored when " +
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.")
.intConf
- .createOptional
+ .createWithDefault(90)
val ABANDON_PARTIAL_AGGREGATION_MIN_ROWS =
buildConf("spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows")
@@ -1573,7 +1564,7 @@ object GlutenConfig {
"flushable partial aggregation is enabled. Ignored when " +
"spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.")
.intConf
- .createOptional
+ .createWithDefault(100000)
val ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON =
buildConf("spark.gluten.sql.rewrite.dateTimestampComparison")
@@ -1675,50 +1666,50 @@ object GlutenConfig {
buildStaticConf("spark.gluten.sql.columnar.backend.velox.directorySizeGuess")
.internal()
.doc("Set the directory size guess for velox file scan")
- .intConf
- .createOptional
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32KB")
val FILE_PRELOAD_THRESHOLD =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.filePreloadThreshold")
.internal()
.doc("Set the file preload threshold for velox file scan")
- .intConf
- .createOptional
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("1MB")
val PREFETCH_ROW_GROUPS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.prefetchRowGroups")
.internal()
.doc("Set the prefetch row groups for velox file scan")
.intConf
- .createOptional
+ .createWithDefault(1)
val LOAD_QUANTUM =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.loadQuantum")
.internal()
.doc("Set the load quantum for velox file scan")
- .intConf
- .createOptional
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("256MB")
val MAX_COALESCED_DISTANCE_BYTES =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.maxCoalescedDistanceBytes")
.internal()
.doc(" Set the max coalesced distance bytes for velox file scan")
- .intConf
- .createOptional
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("1MB")
val MAX_COALESCED_BYTES =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes")
.internal()
.doc("Set the max coalesced bytes for velox file scan")
- .intConf
- .createOptional
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("64MB")
val CACHE_PREFETCH_MINPCT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct")
.internal()
.doc("Set prefetch cache min pct for velox file scan")
.intConf
- .createOptional
+ .createWithDefault(0)
val AWS_SDK_LOG_LEVEL =
buildConf("spark.gluten.velox.awsSdkLogLevel")
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
new file mode 100644
index 000000000..babb446d0
--- /dev/null
+++
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.internal.config.ConfigReader
+
+import scala.collection.JavaConverters._
+
+object GlutenConfigUtil {
+ def parseConfig(conf: Map[String, String]): Map[String, String] = {
+ val reader = new
ConfigReader(conf.filter(_._1.contains("spark.gluten.")).asJava)
+ val glutenConfigEntries =
+ SQLConf.getConfigEntries().asScala.filter(e =>
e.key.contains("spark.gluten."))
+ val glutenConfig = glutenConfigEntries.map(e => (e.key,
e.readFrom(reader).toString)).toMap
+ conf.map(e => (e._1, glutenConfig.getOrElse(e._1, e._2)))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]