This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a5d1ea610c [GLUTEN-8327][CORE][Part-3] Introduce the `ConfigEntry` to
gluten config (#8431)
a5d1ea610c is described below
commit a5d1ea610cd28cacfe005e32c2163b37210b9154
Author: Kaifei Yi <[email protected]>
AuthorDate: Tue Jan 7 21:59:10 2025 +0800
[GLUTEN-8327][CORE][Part-3] Introduce the `ConfigEntry` to gluten config
(#8431)
* Introduce the ConfigEntry to gluten config
* change config provider
---
.../execution/ColumnarCachedBatchSerializer.scala | 14 +-
.../org/apache/gluten/test/MockVeloxBackend.java | 3 +-
.../benchmark/ColumnarTableCacheBenchmark.scala | 15 +-
.../HashAggregateExecBaseTransformer.scala | 2 +-
.../gluten/execution/WholeStageTransformer.scala | 4 +-
.../execution/WriteFilesExecTransformer.scala | 2 +-
.../RemoveNativeWriteFilesSortAndProject.scala | 2 +-
.../SoftAffinityWithRDDInfoSuite.scala | 2 +-
.../sql/GlutenSparkSessionExtensionSuite.scala | 6 +-
.../sql/GlutenSparkSessionExtensionSuite.scala | 6 +-
.../sql/GlutenSparkSessionExtensionSuite.scala | 6 +-
.../sql/GlutenSparkSessionExtensionSuite.scala | 6 +-
.../org/apache/gluten/config/ConfigBuilder.scala | 229 ++++++++++++++
.../org/apache/gluten/config/ConfigEntry.scala | 242 ++++++++++++++
.../org/apache/gluten/config/GlutenConfig.scala | 346 +++++++++++----------
...GlutenConfigUtil.scala => ConfigProvider.scala} | 35 +--
.../spark/sql/internal/GlutenConfigUtil.scala | 14 +-
17 files changed, 706 insertions(+), 228 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 9982fa36c7..75e73f64e5 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -28,7 +28,7 @@ import
org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer
@@ -76,9 +76,11 @@ case class CachedColumnarBatch(
* -> Convert DefaultCachedBatch to InternalRow using vanilla Spark
serializer
*/
// format: on
-class ColumnarCachedBatchSerializer extends CachedBatchSerializer with
SQLConfHelper with Logging {
+class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging
{
private lazy val rowBasedCachedBatchSerializer = new
DefaultCachedBatchSerializer
+ private def glutenConf: GlutenConfig = GlutenConfig.get
+
private def toStructType(schema: Seq[Attribute]): StructType = {
StructType(schema.map(a => StructField(a.name, a.dataType, a.nullable,
a.metadata)))
}
@@ -108,14 +110,14 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
// `convertColumnarBatchToCachedBatch`, but the inside ColumnarBatch is
not arrow-based.
// See: `InMemoryRelation.apply()`.
// So we should disallow columnar input if using vanilla Spark columnar
scan.
- val noVanillaSparkColumnarScan =
conf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
- !conf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
- conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
+ val noVanillaSparkColumnarScan =
glutenConf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
+ !glutenConf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
+ glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
schema) && noVanillaSparkColumnarScan
}
override def supportsColumnarOutput(schema: StructType): Boolean = {
- conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
+ glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
}
override def convertInternalRowToCachedBatch(
diff --git
a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
index 912aa09bcb..efb3903d23 100644
--- a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
+++ b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
@@ -17,6 +17,7 @@
package org.apache.gluten.test;
import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenConfig$;
import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
@@ -71,7 +72,7 @@ public final class MockVeloxBackend {
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
- conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS(), "0");
+
conf.set(GlutenConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(),
"0");
return conf;
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
index 10c0b01ead..0cecae47a9 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
@@ -33,11 +33,16 @@ object ColumnarTableCacheBenchmark extends
SqlBasedBenchmark {
private def doBenchmark(name: String, cardinality: Long)(f: => Unit): Unit =
{
val benchmark = new Benchmark(name, cardinality, output = output)
- val flag = if
(spark.sessionState.conf.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
- "enable"
- } else {
- "disable"
- }
+ val flag =
+ if (
+ spark.sessionState.conf
+ .getConfString(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key)
+ .toBoolean
+ ) {
+ "enable"
+ } else {
+ "disable"
+ }
benchmark.addCase(s"$flag columnar table cache", 3)(_ => f)
benchmark.run()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 46c1ad0029..35809be0be 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -53,7 +53,7 @@ abstract class HashAggregateExecBaseTransformer(
BackendsApiManager.getMetricsApiInstance.genHashAggregateTransformerMetrics(sparkContext)
protected def isCapableForStreamingAggregation: Boolean = {
- if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
+ if (!glutenConf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE))
{
return false
}
if (groupingExpressions.isEmpty) {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 5d37d53893..7a06709da0 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -271,7 +271,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
override def nodeName: String = s"WholeStageCodegenTransformer
($transformStageId)"
override def verboseStringWithOperatorId(): String = {
- val nativePlan = if
(conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
+ val nativePlan = if
(glutenConf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
s"Native Plan:\n${nativePlanString()}"
} else {
""
@@ -315,7 +315,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
def doWholeStageTransform(): WholeStageTransformContext = {
val context = generateWholeStageTransformContext()
- if (conf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
+ if
(glutenConf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
wholeStageTransformerContext = Some(context)
}
context
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 60259e00dc..1a95a96a55 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -118,7 +118,7 @@ case class WriteFilesExecTransformer(
}
private def getFinalChildOutput: Seq[Attribute] = {
- val metadataExclusionList = conf
+ val metadataExclusionList = glutenConf
.getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
.split(",")
.map(_.trim)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
index ebab261442..7fce082f8a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec,
SparkPlan}
*/
case class RemoveNativeWriteFilesSortAndProject() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if
(!conf.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
+ if
(!GlutenConfig.get.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT))
{
return plan
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 7915282e3e..2100518e74 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -49,7 +49,7 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with
SharedSparkSession wit
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED,
"true")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")
- .set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL, "INFO")
+ .set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL.key, "INFO")
test("Soft Affinity Scheduler with duplicate reading detection") {
if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback:
Boolean): Unit = {
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED,
scanFallback)
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED,
aggFallback)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key,
scanFallback.toString)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key,
aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback:
Boolean): Unit = {
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED,
scanFallback)
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED,
aggFallback)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key,
scanFallback.toString)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key,
aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback:
Boolean): Unit = {
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED,
scanFallback)
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED,
aggFallback)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key,
scanFallback.toString)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key,
aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback:
Boolean): Unit = {
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED,
scanFallback)
-
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED,
aggFallback)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key,
scanFallback.toString)
+ session.sessionState.conf
+ .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key,
aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala
b/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala
new file mode 100644
index 0000000000..475fe69410
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.network.util.JavaUtils
+
+import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern;
+
+object BackendType extends Enumeration {
+ type BackendType = Value
+ val COMMON, VELOX, CLICKHOUSE = Value
+}
+
+private[gluten] case class ConfigBuilder(key: String) {
+ import ConfigHelpers._
+
+ private[config] var _doc = ""
+ private[config] var _version = ""
+ private[config] var _backend = BackendType.COMMON
+ private[config] var _public = true
+ private[config] var _alternatives = List.empty[String]
+ private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
+
+ def doc(s: String): ConfigBuilder = {
+ _doc = s
+ this
+ }
+
+ def version(s: String): ConfigBuilder = {
+ _version = s
+ this
+ }
+
+ def backend(backend: BackendType.BackendType): ConfigBuilder = {
+ _backend = backend
+ this
+ }
+
+ def internal(): ConfigBuilder = {
+ _public = false
+ this
+ }
+
+ def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
+ _onCreate = Option(callback)
+ this
+ }
+
+ def withAlternative(key: String): ConfigBuilder = {
+ _alternatives = _alternatives :+ key
+ this
+ }
+
+ def intConf: TypedConfigBuilder[Int] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
+ }
+
+ def longConf: TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
+ }
+
+ def doubleConf: TypedConfigBuilder[Double] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
+ }
+
+ def booleanConf: TypedConfigBuilder[Boolean] = {
+ new TypedConfigBuilder(this, toBoolean(_, key))
+ }
+
+ def stringConf: TypedConfigBuilder[String] = {
+ new TypedConfigBuilder(this, identity)
+ }
+
+ def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_,
unit))
+ }
+
+ def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_,
unit))
+ }
+
+ def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
+ val entry =
+ new ConfigEntryFallback[T](key, _doc, _version, _backend, _public,
_alternatives, fallback)
+ _onCreate.foreach(_(entry))
+ entry
+ }
+}
+
+private object ConfigHelpers {
+ def toNumber[T](s: String, converter: String => T, key: String, configType:
String): T = {
+ try {
+ converter(s.trim)
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be $configType, but
was $s")
+ }
+ }
+
+ def toBoolean(s: String, key: String): Boolean = {
+ try {
+ s.trim.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(s"$key should be boolean, but was
$s")
+ }
+ }
+
+ private val TIME_STRING_PATTERN = Pattern.compile("(-?[0-9]+)([a-z]+)?")
+
+ def timeFromString(str: String, unit: TimeUnit): Long =
JavaUtils.timeStringAs(str, unit)
+
+ def timeToString(v: Long, unit: TimeUnit): String =
s"${TimeUnit.MILLISECONDS.convert(v, unit)}ms"
+
+ def byteFromString(str: String, unit: ByteUnit): Long = {
+ val (input, multiplier) =
+ if (str.length() > 0 && str.charAt(0) == '-') {
+ (str.substring(1), -1)
+ } else {
+ (str, 1)
+ }
+ multiplier * JavaUtils.byteStringAs(input, unit)
+ }
+
+ def byteToString(v: Long, unit: ByteUnit): String = s"${unit.convertTo(v,
ByteUnit.BYTE)}b"
+}
+
+private[gluten] class TypedConfigBuilder[T](
+ val parent: ConfigBuilder,
+ val converter: String => T,
+ val stringConverter: T => String) {
+
+ def this(parent: ConfigBuilder, converter: String => T) = {
+ this(parent, converter, { v: T => v.toString })
+ }
+
+ def transform(fn: T => T): TypedConfigBuilder[T] = {
+ new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
+ }
+
+ def checkValue(validator: T => Boolean, errorMsg: String):
TypedConfigBuilder[T] = {
+ transform {
+ v =>
+ if (!validator(v)) {
+ throw new IllegalArgumentException(s"'$v' in ${parent.key} is
invalid. $errorMsg")
+ }
+ v
+ }
+ }
+
+ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
+ transform {
+ v =>
+ if (!validValues.contains(v)) {
+ throw new IllegalArgumentException(
+ s"The value of ${parent.key} should be one of
${validValues.mkString(", ")}, " +
+ s"but was $v")
+ }
+ v
+ }
+ }
+
+ def createOptional: OptionalConfigEntry[T] = {
+ val entry = new OptionalConfigEntry[T](
+ parent.key,
+ parent._doc,
+ parent._version,
+ parent._backend,
+ parent._public,
+ parent._alternatives,
+ converter,
+ stringConverter)
+ parent._onCreate.foreach(_(entry))
+ entry
+ }
+
+ def createWithDefault(default: T): ConfigEntry[T] = {
+ assert(default != null, "Use createOptional.")
+ default match {
+ case str: String => createWithDefaultString(str)
+ case _ =>
+ val transformedDefault = converter(stringConverter(default))
+ val entry = new ConfigEntryWithDefault[T](
+ parent.key,
+ parent._doc,
+ parent._version,
+ parent._backend,
+ parent._public,
+ parent._alternatives,
+ converter,
+ stringConverter,
+ transformedDefault)
+ parent._onCreate.foreach(_(entry))
+ entry
+ }
+ }
+
+ def createWithDefaultString(default: String): ConfigEntry[T] = {
+ val entry = new ConfigEntryWithDefaultString[T](
+ parent.key,
+ parent._doc,
+ parent._version,
+ parent._backend,
+ parent._public,
+ parent._alternatives,
+ converter,
+ stringConverter,
+ default
+ )
+ parent._onCreate.foreach(_(entry))
+ entry
+ }
+}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala
b/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala
new file mode 100644
index 0000000000..5d087d3385
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.gluten.config.BackendType.BackendType
+
+import org.apache.spark.sql.internal.ConfigProvider
+
+/**
+ * An entry contains all meta information for a configuration.
+ *
+ * The code is similar to Spark's relevant config code but extended for
Gluten's use, like adding
+ * backend type, etc.
+ *
+ * @tparam T
+ * the value type
+ */
+trait ConfigEntry[T] {
+
+ /** The key for the configuration. */
+ def key: String
+
+ /** The documentation for the configuration. */
+ def doc: String
+
+ /** The gluten version when the configuration was released. */
+ def version: String
+
+ /** The backend type of the configuration. */
+ def backend: BackendType.BackendType
+
+ /**
+ * If this configuration is public to the user. If it's `false`, this
configuration is only used
+ * internally and we should not expose it to users.
+ */
+ def isPublic: Boolean
+
+ /** the alternative keys for the configuration. */
+ def alternatives: List[String]
+
+ /**
+ * How to convert a string to the value. It should throw an exception if the
string does not have
+ * the required format.
+ */
+ def valueConverter: String => T
+
+ /** How to convert a value to a string that the user can use it as a valid
string value. */
+ def stringConverter: T => String
+
+ /** Read the configuration from the given ConfigProvider. */
+ def readFrom(conf: ConfigProvider): T
+
+ /** The default value of the configuration. */
+ def defaultValue: Option[T]
+
+ /** The string representation of the default value. */
+ def defaultValueString: String
+
+ final protected def readString(provider: ConfigProvider): Option[String] = {
+ alternatives.foldLeft(provider.get(key))((res, nextKey) =>
res.orElse(provider.get(nextKey)))
+ }
+
+ override def toString: String = {
+ s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
+ s"public=$isPublic, version=$version)"
+ }
+
+ ConfigEntry.registerEntry(this)
+}
+
+private[gluten] class OptionalConfigEntry[T](
+ _key: String,
+ _doc: String,
+ _version: String,
+ _backend: BackendType,
+ _isPublic: Boolean,
+ _alternatives: List[String],
+ _valueConverter: String => T,
+ _stringConverter: T => String)
+ extends ConfigEntry[Option[T]] {
+ override def key: String = _key
+
+ override def doc: String = _doc
+
+ override def version: String = _version
+
+ override def backend: BackendType = _backend
+
+ override def isPublic: Boolean = _isPublic
+
+ override def alternatives: List[String] = _alternatives
+
+ override def valueConverter: String => Option[T] = s =>
Option(_valueConverter(s))
+
+ override def stringConverter: Option[T] => String = v =>
v.map(_stringConverter).orNull
+
+ override def readFrom(conf: ConfigProvider): Option[T] =
readString(conf).map(_valueConverter)
+
+ override def defaultValue: Option[Option[T]] = None
+
+ override def defaultValueString: String = ConfigEntry.UNDEFINED
+}
+
+private[gluten] class ConfigEntryWithDefault[T](
+ _key: String,
+ _doc: String,
+ _version: String,
+ _backend: BackendType,
+ _isPublic: Boolean,
+ _alternatives: List[String],
+ _valueConverter: String => T,
+ _stringConverter: T => String,
+ _defaultVal: T)
+ extends ConfigEntry[T] {
+ override def key: String = _key
+
+ override def doc: String = _doc
+
+ override def version: String = _version
+
+ override def backend: BackendType = _backend
+
+ override def isPublic: Boolean = _isPublic
+
+ override def alternatives: List[String] = _alternatives
+
+ override def valueConverter: String => T = _valueConverter
+
+ override def stringConverter: T => String = _stringConverter
+
+ override def readFrom(conf: ConfigProvider): T = {
+ readString(conf).map(valueConverter).getOrElse(_defaultVal)
+ }
+
+ override def defaultValue: Option[T] = Option(_defaultVal)
+
+ override def defaultValueString: String = stringConverter(_defaultVal)
+}
+
+private[gluten] class ConfigEntryWithDefaultString[T](
+ _key: String,
+ _doc: String,
+ _version: String,
+ _backend: BackendType,
+ _isPublic: Boolean,
+ _alternatives: List[String],
+ _valueConverter: String => T,
+ _stringConverter: T => String,
+ _defaultVal: String)
+ extends ConfigEntry[T] {
+ override def key: String = _key
+
+ override def doc: String = _doc
+
+ override def version: String = _version
+
+ override def backend: BackendType = _backend
+
+ override def isPublic: Boolean = _isPublic
+
+ override def alternatives: List[String] = _alternatives
+
+ override def valueConverter: String => T = _valueConverter
+
+ override def stringConverter: T => String = _stringConverter
+
+ override def readFrom(conf: ConfigProvider): T = {
+ val value = readString(conf).getOrElse(_defaultVal)
+ valueConverter(value)
+ }
+
+ override def defaultValue: Option[T] = Some(valueConverter(_defaultVal))
+
+ override def defaultValueString: String = _defaultVal
+}
+
+private[gluten] class ConfigEntryFallback[T](
+ _key: String,
+ _doc: String,
+ _version: String,
+ _backend: BackendType,
+ _isPublic: Boolean,
+ _alternatives: List[String],
+ fallback: ConfigEntry[T])
+ extends ConfigEntry[T] {
+ override def key: String = _key
+
+ override def doc: String = _doc
+
+ override def version: String = _version
+
+ override def backend: BackendType = _backend
+
+ override def isPublic: Boolean = _isPublic
+
+ override def alternatives: List[String] = _alternatives
+
+ override def valueConverter: String => T = fallback.valueConverter
+
+ override def stringConverter: T => String = fallback.stringConverter
+
+ override def readFrom(conf: ConfigProvider): T = {
+ readString(conf).map(valueConverter).getOrElse(fallback.readFrom(conf))
+ }
+
+ override def defaultValue: Option[T] = fallback.defaultValue
+
+ override def defaultValueString: String = fallback.defaultValueString
+}
+
+object ConfigEntry {
+
+ val UNDEFINED = "<undefined>"
+
+ private val knownConfigs =
+ new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+ private def registerEntry(entry: ConfigEntry[_]): Unit = {
+ val existing = knownConfigs.putIfAbsent(entry.key, entry)
+ require(existing == null, s"Config entry ${entry.key} already registered!")
+ }
+
+ def containsEntry(entry: ConfigEntry[_]): Boolean = {
+ Option(knownConfigs.get(entry.key)).isDefined
+ }
+
+ def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index cd01b1a42f..cee4ea7d16 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.config
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
import com.google.common.collect.ImmutableList
import org.apache.hadoop.security.UserGroupInformation
@@ -37,97 +37,104 @@ case class GlutenNumaBindingInfo(
class GlutenConfig(conf: SQLConf) extends Logging {
import GlutenConfig._
+ private lazy val configProvider = new SQLConfProvider(conf)
+
+ def getConf[T](entry: ConfigEntry[T]): T = {
+ require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
+ entry.readFrom(configProvider)
+ }
+
def enableAnsiMode: Boolean = conf.ansiEnabled
- def enableGluten: Boolean = conf.getConf(GLUTEN_ENABLED)
+ def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
// FIXME the option currently controls both JVM and native validation
against a Substrait plan.
- def enableNativeValidation: Boolean = conf.getConf(NATIVE_VALIDATION_ENABLED)
+ def enableNativeValidation: Boolean = getConf(NATIVE_VALIDATION_ENABLED)
- def enableColumnarBatchScan: Boolean =
conf.getConf(COLUMNAR_BATCHSCAN_ENABLED)
+ def enableColumnarBatchScan: Boolean = getConf(COLUMNAR_BATCHSCAN_ENABLED)
- def enableColumnarFileScan: Boolean = conf.getConf(COLUMNAR_FILESCAN_ENABLED)
+ def enableColumnarFileScan: Boolean = getConf(COLUMNAR_FILESCAN_ENABLED)
- def enableColumnarHiveTableScan: Boolean =
conf.getConf(COLUMNAR_HIVETABLESCAN_ENABLED)
+ def enableColumnarHiveTableScan: Boolean =
getConf(COLUMNAR_HIVETABLESCAN_ENABLED)
def enableColumnarHiveTableScanNestedColumnPruning: Boolean =
- conf.getConf(COLUMNAR_HIVETABLESCAN_NESTED_COLUMN_PRUNING_ENABLED)
+ getConf(COLUMNAR_HIVETABLESCAN_NESTED_COLUMN_PRUNING_ENABLED)
- def enableVanillaVectorizedReaders: Boolean =
conf.getConf(VANILLA_VECTORIZED_READERS_ENABLED)
+ def enableVanillaVectorizedReaders: Boolean =
getConf(VANILLA_VECTORIZED_READERS_ENABLED)
- def enableColumnarHashAgg: Boolean = conf.getConf(COLUMNAR_HASHAGG_ENABLED)
+ def enableColumnarHashAgg: Boolean = getConf(COLUMNAR_HASHAGG_ENABLED)
- def forceToUseHashAgg: Boolean = conf.getConf(COLUMNAR_FORCE_HASHAGG_ENABLED)
+ def forceToUseHashAgg: Boolean = getConf(COLUMNAR_FORCE_HASHAGG_ENABLED)
- def mergeTwoPhasesAggEnabled: Boolean =
conf.getConf(MERGE_TWO_PHASES_ENABLED)
+ def mergeTwoPhasesAggEnabled: Boolean = getConf(MERGE_TWO_PHASES_ENABLED)
- def enableColumnarProject: Boolean = conf.getConf(COLUMNAR_PROJECT_ENABLED)
+ def enableColumnarProject: Boolean = getConf(COLUMNAR_PROJECT_ENABLED)
- def enableColumnarFilter: Boolean = conf.getConf(COLUMNAR_FILTER_ENABLED)
+ def enableColumnarFilter: Boolean = getConf(COLUMNAR_FILTER_ENABLED)
- def enableColumnarSort: Boolean = conf.getConf(COLUMNAR_SORT_ENABLED)
+ def enableColumnarSort: Boolean = getConf(COLUMNAR_SORT_ENABLED)
- def enableColumnarWindow: Boolean = conf.getConf(COLUMNAR_WINDOW_ENABLED)
+ def enableColumnarWindow: Boolean = getConf(COLUMNAR_WINDOW_ENABLED)
- def enableColumnarWindowGroupLimit: Boolean =
conf.getConf(COLUMNAR_WINDOW_GROUP_LIMIT_ENABLED)
+ def enableColumnarWindowGroupLimit: Boolean =
getConf(COLUMNAR_WINDOW_GROUP_LIMIT_ENABLED)
- def veloxColumnarWindowType: String =
conf.getConfString(COLUMNAR_VELOX_WINDOW_TYPE.key)
+ def veloxColumnarWindowType: String = getConf(COLUMNAR_VELOX_WINDOW_TYPE)
- def enableColumnarShuffledHashJoin: Boolean =
conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
+ def enableColumnarShuffledHashJoin: Boolean =
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
def shuffledHashJoinOptimizeBuildSide: Boolean =
- conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE)
+ getConf(COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE)
- def enableNativeColumnarToRow: Boolean =
conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)
+ def enableNativeColumnarToRow: Boolean =
getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)
- def forceShuffledHashJoin: Boolean =
conf.getConf(COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED)
+ def forceShuffledHashJoin: Boolean =
getConf(COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED)
- def enableColumnarSortMergeJoin: Boolean =
conf.getConf(COLUMNAR_SORTMERGEJOIN_ENABLED)
+ def enableColumnarSortMergeJoin: Boolean =
getConf(COLUMNAR_SORTMERGEJOIN_ENABLED)
- def enableColumnarUnion: Boolean = conf.getConf(COLUMNAR_UNION_ENABLED)
+ def enableColumnarUnion: Boolean = getConf(COLUMNAR_UNION_ENABLED)
- def enableNativeUnion: Boolean = conf.getConf(NATIVE_UNION_ENABLED)
+ def enableNativeUnion: Boolean = getConf(NATIVE_UNION_ENABLED)
- def enableColumnarExpand: Boolean = conf.getConf(COLUMNAR_EXPAND_ENABLED)
+ def enableColumnarExpand: Boolean = getConf(COLUMNAR_EXPAND_ENABLED)
- def enableColumnarBroadcastExchange: Boolean =
conf.getConf(COLUMNAR_BROADCAST_EXCHANGE_ENABLED)
+ def enableColumnarBroadcastExchange: Boolean =
getConf(COLUMNAR_BROADCAST_EXCHANGE_ENABLED)
- def enableColumnarBroadcastJoin: Boolean =
conf.getConf(COLUMNAR_BROADCAST_JOIN_ENABLED)
+ def enableColumnarBroadcastJoin: Boolean =
getConf(COLUMNAR_BROADCAST_JOIN_ENABLED)
- def enableColumnarSample: Boolean = conf.getConf(COLUMNAR_SAMPLE_ENABLED)
+ def enableColumnarSample: Boolean = getConf(COLUMNAR_SAMPLE_ENABLED)
- def enableColumnarArrowUDF: Boolean =
conf.getConf(COLUMNAR_ARROW_UDF_ENABLED)
+ def enableColumnarArrowUDF: Boolean = getConf(COLUMNAR_ARROW_UDF_ENABLED)
- def enableColumnarCoalesce: Boolean = conf.getConf(COLUMNAR_COALESCE_ENABLED)
+ def enableColumnarCoalesce: Boolean = getConf(COLUMNAR_COALESCE_ENABLED)
- def columnarTableCacheEnabled: Boolean =
conf.getConf(COLUMNAR_TABLE_CACHE_ENABLED)
+ def columnarTableCacheEnabled: Boolean =
getConf(COLUMNAR_TABLE_CACHE_ENABLED)
def enableRewriteDateTimestampComparison: Boolean =
- conf.getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
+ getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
def enableCollapseNestedGetJsonObject: Boolean =
- conf.getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)
+ getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)
def enableCHRewriteDateConversion: Boolean =
- conf.getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
+ getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
def enableCommonSubexpressionEliminate: Boolean =
- conf.getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
+ getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
def enableCountDistinctWithoutExpand: Boolean =
- conf.getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
+ getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
def enableExtendedColumnPruning: Boolean =
- conf.getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
+ getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
def veloxOrcScanEnabled: Boolean =
- conf.getConf(VELOX_ORC_SCAN_ENABLED)
+ getConf(VELOX_ORC_SCAN_ENABLED)
def forceOrcCharTypeScanFallbackEnabled: Boolean =
- conf.getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
+ getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
def scanFileSchemeValidationEnabled: Boolean =
- conf.getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
+ getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
// Whether to use GlutenShuffleManager (experimental).
def isUseGlutenShuffleManager: Boolean =
@@ -158,44 +165,43 @@ class GlutenConfig(conf: SQLConf) extends Logging {
.getConfString("spark.celeborn.client.spark.shuffle.writer",
GLUTEN_HASH_SHUFFLE_WRITER)
.toLowerCase(Locale.ROOT)
- def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
+ def enableColumnarShuffle: Boolean = getConf(COLUMNAR_SHUFFLE_ENABLED)
- def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)
+ def enablePreferColumnar: Boolean = getConf(COLUMNAR_PREFER_ENABLED)
- def enableOneRowRelationColumnar: Boolean =
conf.getConf(COLUMNAR_ONE_ROW_RELATION_ENABLED)
+ def enableOneRowRelationColumnar: Boolean =
getConf(COLUMNAR_ONE_ROW_RELATION_ENABLED)
def physicalJoinOptimizationThrottle: Integer =
- conf.getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_THROTTLE)
+ getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_THROTTLE)
def enablePhysicalJoinOptimize: Boolean =
- conf.getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_ENABLED)
+ getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_ENABLED)
def logicalJoinOptimizationThrottle: Integer =
- conf.getConf(COLUMNAR_LOGICAL_JOIN_OPTIMIZATION_THROTTLE)
+ getConf(COLUMNAR_LOGICAL_JOIN_OPTIMIZATION_THROTTLE)
- def enableScanOnly: Boolean = conf.getConf(COLUMNAR_SCAN_ONLY_ENABLED)
+ def enableScanOnly: Boolean = getConf(COLUMNAR_SCAN_ONLY_ENABLED)
- def tmpFile: Option[String] = conf.getConf(COLUMNAR_TEMP_DIR)
+ def tmpFile: Option[String] = getConf(COLUMNAR_TEMP_DIR)
- @deprecated def broadcastCacheTimeout: Int =
conf.getConf(COLUMNAR_BROADCAST_CACHE_TIMEOUT)
+ @deprecated def broadcastCacheTimeout: Int =
getConf(COLUMNAR_BROADCAST_CACHE_TIMEOUT)
def columnarShuffleSortPartitionsThreshold: Int =
- conf.getConf(COLUMNAR_SHUFFLE_SORT_PARTITIONS_THRESHOLD)
+ getConf(COLUMNAR_SHUFFLE_SORT_PARTITIONS_THRESHOLD)
def columnarShuffleSortColumnsThreshold: Int =
- conf.getConf(COLUMNAR_SHUFFLE_SORT_COLUMNS_THRESHOLD)
+ getConf(COLUMNAR_SHUFFLE_SORT_COLUMNS_THRESHOLD)
- def columnarShuffleReallocThreshold: Double =
conf.getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
+ def columnarShuffleReallocThreshold: Double =
getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
- def columnarShuffleMergeThreshold: Double =
conf.getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
+ def columnarShuffleMergeThreshold: Double =
getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
- def columnarShuffleCodec: Option[String] =
conf.getConf(COLUMNAR_SHUFFLE_CODEC)
+ def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC)
def columnarShuffleCompressionMode: String =
- conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_MODE)
+ getConf(COLUMNAR_SHUFFLE_COMPRESSION_MODE)
- def columnarShuffleCodecBackend: Option[String] = conf
- .getConf(COLUMNAR_SHUFFLE_CODEC_BACKEND)
+ def columnarShuffleCodecBackend: Option[String] =
getConf(COLUMNAR_SHUFFLE_CODEC_BACKEND)
.filter(Set(GLUTEN_QAT_BACKEND_NAME, GLUTEN_IAA_BACKEND_NAME).contains(_))
def columnarShuffleEnableQat: Boolean =
@@ -205,54 +211,53 @@ class GlutenConfig(conf: SQLConf) extends Logging {
columnarShuffleCodecBackend.contains(GlutenConfig.GLUTEN_IAA_BACKEND_NAME)
def columnarShuffleCompressionThreshold: Int =
- conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
+ getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
def columnarShuffleReaderBufferSize: Long =
- conf.getConf(COLUMNAR_SHUFFLE_READER_BUFFER_SIZE)
+ getConf(COLUMNAR_SHUFFLE_READER_BUFFER_SIZE)
- def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+ def maxBatchSize: Int = getConf(COLUMNAR_MAX_BATCH_SIZE)
def columnarToRowMemThreshold: Long =
- conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
+ getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
- def shuffleWriterBufferSize: Int = conf
- .getConf(SHUFFLE_WRITER_BUFFER_SIZE)
+ def shuffleWriterBufferSize: Int = getConf(SHUFFLE_WRITER_BUFFER_SIZE)
.getOrElse(maxBatchSize)
- def enableColumnarLimit: Boolean = conf.getConf(COLUMNAR_LIMIT_ENABLED)
+ def enableColumnarLimit: Boolean = getConf(COLUMNAR_LIMIT_ENABLED)
- def enableColumnarGenerate: Boolean = conf.getConf(COLUMNAR_GENERATE_ENABLED)
+ def enableColumnarGenerate: Boolean = getConf(COLUMNAR_GENERATE_ENABLED)
def enableTakeOrderedAndProject: Boolean =
- conf.getConf(COLUMNAR_TAKE_ORDERED_AND_PROJECT_ENABLED)
+ getConf(COLUMNAR_TAKE_ORDERED_AND_PROJECT_ENABLED)
- def enableNativeBloomFilter: Boolean =
conf.getConf(COLUMNAR_NATIVE_BLOOMFILTER_ENABLED)
+ def enableNativeBloomFilter: Boolean =
getConf(COLUMNAR_NATIVE_BLOOMFILTER_ENABLED)
def enableNativeHyperLogLogAggregateFunction: Boolean =
- conf.getConf(COLUMNAR_NATIVE_HYPERLOGLOG_AGGREGATE_ENABLED)
+ getConf(COLUMNAR_NATIVE_HYPERLOGLOG_AGGREGATE_ENABLED)
def columnarParquetWriteBlockSize: Long =
- conf.getConf(COLUMNAR_PARQUET_WRITE_BLOCK_SIZE)
+ getConf(COLUMNAR_PARQUET_WRITE_BLOCK_SIZE)
def columnarParquetWriteBlockRows: Long =
- conf.getConf(COLUMNAR_PARQUET_WRITE_BLOCK_ROWS)
+ getConf(COLUMNAR_PARQUET_WRITE_BLOCK_ROWS)
- def wholeStageFallbackThreshold: Int =
conf.getConf(COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD)
+ def wholeStageFallbackThreshold: Int =
getConf(COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD)
- def queryFallbackThreshold: Int =
conf.getConf(COLUMNAR_QUERY_FALLBACK_THRESHOLD)
+ def queryFallbackThreshold: Int = getConf(COLUMNAR_QUERY_FALLBACK_THRESHOLD)
- def fallbackIgnoreRowToColumnar: Boolean =
conf.getConf(COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR)
+ def fallbackIgnoreRowToColumnar: Boolean =
getConf(COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR)
- def fallbackExpressionsThreshold: Int =
conf.getConf(COLUMNAR_FALLBACK_EXPRESSIONS_THRESHOLD)
+ def fallbackExpressionsThreshold: Int =
getConf(COLUMNAR_FALLBACK_EXPRESSIONS_THRESHOLD)
- def fallbackPreferColumnar: Boolean =
conf.getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
+ def fallbackPreferColumnar: Boolean =
getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
def numaBindingInfo: GlutenNumaBindingInfo = {
- val enableNumaBinding: Boolean =
conf.getConf(COLUMNAR_NUMA_BINDING_ENABLED)
+ val enableNumaBinding: Boolean = getConf(COLUMNAR_NUMA_BINDING_ENABLED)
if (!enableNumaBinding) {
GlutenNumaBindingInfo(enableNumaBinding = false)
} else {
- val tmp = conf.getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
+ val tmp = getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
if (tmp.isEmpty) {
GlutenNumaBindingInfo(enableNumaBinding = false)
} else {
@@ -264,78 +269,78 @@ class GlutenConfig(conf: SQLConf) extends Logging {
}
}
- def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)
+ def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
- def memoryBacktraceAllocation: Boolean =
conf.getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)
+ def memoryBacktraceAllocation: Boolean =
getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)
def numTaskSlotsPerExecutor: Int = {
- val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
+ val numSlots = getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
assert(numSlots > 0, s"Number of task slot not found. This should not
happen.")
numSlots
}
- def offHeapMemorySize: Long = conf.getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+ def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
- def taskOffHeapMemorySize: Long =
conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
+ def taskOffHeapMemorySize: Long =
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
- def memoryOverAcquiredRatio: Double =
conf.getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
+ def memoryOverAcquiredRatio: Double =
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
- def memoryReservationBlockSize: Long =
conf.getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
+ def memoryReservationBlockSize: Long =
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
def conservativeTaskOffHeapMemorySize: Long =
- conf.getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
+ getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
// Options used by RAS.
- def enableRas: Boolean = conf.getConf(RAS_ENABLED)
+ def enableRas: Boolean = getConf(RAS_ENABLED)
- def rasCostModel: String = conf.getConf(RAS_COST_MODEL)
+ def rasCostModel: String = getConf(RAS_COST_MODEL)
- def rasRough2SizeBytesThreshold: Long =
conf.getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
+ def rasRough2SizeBytesThreshold: Long =
getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
- def rasRough2R2cCost: Long = conf.getConf(RAS_ROUGH2_R2C_COST)
+ def rasRough2R2cCost: Long = getConf(RAS_ROUGH2_R2C_COST)
- def rasRough2VanillaCost: Long = conf.getConf(RAS_ROUGH2_VANILLA_COST)
+ def rasRough2VanillaCost: Long = getConf(RAS_ROUGH2_VANILLA_COST)
- def enableVeloxCache: Boolean = conf.getConf(COLUMNAR_VELOX_CACHE_ENABLED)
+ def enableVeloxCache: Boolean = getConf(COLUMNAR_VELOX_CACHE_ENABLED)
- def veloxMemCacheSize: Long = conf.getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
+ def veloxMemCacheSize: Long = getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
- def veloxSsdCachePath: String = conf.getConf(COLUMNAR_VELOX_SSD_CACHE_PATH)
+ def veloxSsdCachePath: String = getConf(COLUMNAR_VELOX_SSD_CACHE_PATH)
- def veloxSsdCacheSize: Long = conf.getConf(COLUMNAR_VELOX_SSD_CACHE_SIZE)
+ def veloxSsdCacheSize: Long = getConf(COLUMNAR_VELOX_SSD_CACHE_SIZE)
- def veloxSsdCacheShards: Integer =
conf.getConf(COLUMNAR_VELOX_SSD_CACHE_SHARDS)
+ def veloxSsdCacheShards: Integer = getConf(COLUMNAR_VELOX_SSD_CACHE_SHARDS)
- def veloxSsdCacheIOThreads: Integer =
conf.getConf(COLUMNAR_VELOX_SSD_CACHE_IO_THREADS)
+ def veloxSsdCacheIOThreads: Integer =
getConf(COLUMNAR_VELOX_SSD_CACHE_IO_THREADS)
- def veloxSsdODirectEnabled: Boolean =
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
+ def veloxSsdODirectEnabled: Boolean =
getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
def veloxConnectorIOThreads: Int = {
-
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+
getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
}
- def veloxSplitPreloadPerDriver: Integer =
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
+ def veloxSplitPreloadPerDriver: Integer =
getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
- def veloxSpillStrategy: String = conf.getConf(COLUMNAR_VELOX_SPILL_STRATEGY)
+ def veloxSpillStrategy: String = getConf(COLUMNAR_VELOX_SPILL_STRATEGY)
- def veloxMaxSpillLevel: Int = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_LEVEL)
+ def veloxMaxSpillLevel: Int = getConf(COLUMNAR_VELOX_MAX_SPILL_LEVEL)
- def veloxMaxSpillFileSize: Long =
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_FILE_SIZE)
+ def veloxMaxSpillFileSize: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_FILE_SIZE)
- def veloxSpillFileSystem: String =
conf.getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM)
+ def veloxSpillFileSystem: String = getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM)
- def veloxMaxSpillRunRows: Long =
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_RUN_ROWS)
+ def veloxMaxSpillRunRows: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_RUN_ROWS)
- def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
+ def veloxMaxSpillBytes: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
def veloxBloomFilterExpectedNumItems: Long =
- conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
+ getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
- def veloxBloomFilterNumBits: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS)
+ def veloxBloomFilterNumBits: Long =
getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS)
- def veloxBloomFilterMaxNumBits: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
+ def veloxBloomFilterMaxNumBits: Long =
getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
- def castFromVarcharAddTrimNode: Boolean =
conf.getConf(CAST_FROM_VARCHAR_ADD_TRIM_NODE)
+ def castFromVarcharAddTrimNode: Boolean =
getConf(CAST_FROM_VARCHAR_ADD_TRIM_NODE)
case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
@@ -353,62 +358,61 @@ class GlutenConfig(conf: SQLConf) extends Logging {
}
def veloxResizeBatchesShuffleInput: Boolean =
- conf.getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
+ getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
def veloxResizeBatchesShuffleInputRange: ResizeRange = {
- val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+ val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE)
val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
- val minSize = conf
- .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+ val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
.getOrElse(defaultMinSize)
ResizeRange(minSize, Int.MaxValue)
}
def chColumnarShuffleSpillThreshold: Long = {
- val threshold = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+ val threshold = getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
if (threshold == 0) {
- (conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
+ (getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
} else {
threshold
}
}
- def chColumnarMaxSortBufferSize: Long =
conf.getConf(COLUMNAR_CH_MAX_SORT_BUFFER_SIZE)
+ def chColumnarMaxSortBufferSize: Long =
getConf(COLUMNAR_CH_MAX_SORT_BUFFER_SIZE)
def chColumnarForceMemorySortShuffle: Boolean =
- conf.getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
+ getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
def cartesianProductTransformerEnabled: Boolean =
- conf.getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
+ getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
def broadcastNestedLoopJoinTransformerTransformerEnabled: Boolean =
- conf.getConf(BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED)
+ getConf(BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED)
- def transformPlanLogLevel: String = conf.getConf(TRANSFORM_PLAN_LOG_LEVEL)
+ def transformPlanLogLevel: String = getConf(TRANSFORM_PLAN_LOG_LEVEL)
- def substraitPlanLogLevel: String = conf.getConf(SUBSTRAIT_PLAN_LOG_LEVEL)
+ def substraitPlanLogLevel: String = getConf(SUBSTRAIT_PLAN_LOG_LEVEL)
- def validationLogLevel: String = conf.getConf(VALIDATION_LOG_LEVEL)
+ def validationLogLevel: String = getConf(VALIDATION_LOG_LEVEL)
- def softAffinityLogLevel: String = conf.getConf(SOFT_AFFINITY_LOG_LEVEL)
+ def softAffinityLogLevel: String = getConf(SOFT_AFFINITY_LOG_LEVEL)
// A comma-separated list of classes for the extended columnar pre rules
- def extendedColumnarTransformRules: String =
conf.getConf(EXTENDED_COLUMNAR_TRANSFORM_RULES)
+ def extendedColumnarTransformRules: String =
getConf(EXTENDED_COLUMNAR_TRANSFORM_RULES)
// A comma-separated list of classes for the extended columnar post rules
- def extendedColumnarPostRules: String =
conf.getConf(EXTENDED_COLUMNAR_POST_RULES)
+ def extendedColumnarPostRules: String = getConf(EXTENDED_COLUMNAR_POST_RULES)
- def extendedExpressionTransformer: String =
conf.getConf(EXTENDED_EXPRESSION_TRAN_CONF)
+ def extendedExpressionTransformer: String =
getConf(EXTENDED_EXPRESSION_TRAN_CONF)
def expressionBlacklist: Set[String] = {
- val blacklist = conf.getConf(EXPRESSION_BLACK_LIST)
+ val blacklist = getConf(EXPRESSION_BLACK_LIST)
val blacklistSet: Set[String] = if (blacklist.isDefined) {
blacklist.get.toLowerCase(Locale.ROOT).trim.split(",").toSet
} else {
Set.empty
}
- if (conf.getConf(FALLBACK_REGEXP_EXPRESSIONS)) {
+ if (getConf(FALLBACK_REGEXP_EXPRESSIONS)) {
val regexpList =
"rlike,regexp_replace,regexp_extract,regexp_extract_all,split"
regexpList.trim.split(",").toSet ++ blacklistSet
} else {
@@ -417,87 +421,93 @@ class GlutenConfig(conf: SQLConf) extends Logging {
}
def printStackOnValidationFailure: Boolean =
- conf.getConf(VALIDATION_PRINT_FAILURE_STACK_)
+ getConf(VALIDATION_PRINT_FAILURE_STACK_)
- def enableFallbackReport: Boolean = conf.getConf(FALLBACK_REPORTER_ENABLED)
+ def enableFallbackReport: Boolean = getConf(FALLBACK_REPORTER_ENABLED)
def enableVeloxUserExceptionStacktrace: Boolean =
- conf.getConf(COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE)
+ getConf(COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE)
def memoryUseHugePages: Boolean =
- conf.getConf(COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES)
-
- def debug: Boolean = conf.getConf(DEBUG_ENABLED)
- def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
- def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
- def benchmarkStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
- def benchmarkPartitionId: String = conf.getConf(BENCHMARK_TASK_PARTITIONID)
- def benchmarkTaskId: String = conf.getConf(BENCHMARK_TASK_TASK_ID)
- def benchmarkSaveDir: String = conf.getConf(BENCHMARK_SAVE_DIR)
- def textInputMaxBlockSize: Long = conf.getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
- def textIputEmptyAsDefault: Boolean =
conf.getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
+ getConf(COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES)
+
+ def debug: Boolean = getConf(DEBUG_ENABLED)
+ def debugKeepJniWorkspace: Boolean = getConf(DEBUG_KEEP_JNI_WORKSPACE)
+ def collectUtStats: Boolean = getConf(UT_STATISTIC)
+ def benchmarkStageId: Int = getConf(BENCHMARK_TASK_STAGEID)
+ def benchmarkPartitionId: String = getConf(BENCHMARK_TASK_PARTITIONID)
+ def benchmarkTaskId: String = getConf(BENCHMARK_TASK_TASK_ID)
+ def benchmarkSaveDir: String = getConf(BENCHMARK_SAVE_DIR)
+ def textInputMaxBlockSize: Long = getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
+ def textIputEmptyAsDefault: Boolean = getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
def enableParquetRowGroupMaxMinIndex: Boolean =
- conf.getConf(ENABLE_PARQUET_ROW_GROUP_MAX_MIN_INDEX)
+ getConf(ENABLE_PARQUET_ROW_GROUP_MAX_MIN_INDEX)
def enableVeloxFlushablePartialAggregation: Boolean =
- conf.getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
+ getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
def maxFlushableAggregationMemoryRatio: Double =
- conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
+ getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
def maxExtendedFlushableAggregationMemoryRatio: Double =
- conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
+ getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
def abandonFlushableAggregationMinPct: Int =
- conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
+ getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
def abandonFlushableAggregationMinRows: Int =
- conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
+ getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
// Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()`
instead
- def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED)
+ def enableNativeWriter: Option[Boolean] = getConf(NATIVE_WRITER_ENABLED)
- def enableNativeArrowReader: Boolean =
conf.getConf(NATIVE_ARROW_READER_ENABLED)
+ def enableNativeArrowReader: Boolean = getConf(NATIVE_ARROW_READER_ENABLED)
def directorySizeGuess: Long =
- conf.getConf(DIRECTORY_SIZE_GUESS)
+ getConf(DIRECTORY_SIZE_GUESS)
def filePreloadThreshold: Long =
- conf.getConf(FILE_PRELOAD_THRESHOLD)
+ getConf(FILE_PRELOAD_THRESHOLD)
def prefetchRowGroups: Int =
- conf.getConf(PREFETCH_ROW_GROUPS)
+ getConf(PREFETCH_ROW_GROUPS)
def loadQuantum: Long =
- conf.getConf(LOAD_QUANTUM)
+ getConf(LOAD_QUANTUM)
def maxCoalescedDistance: String =
- conf.getConf(MAX_COALESCED_DISTANCE_BYTES)
+ getConf(MAX_COALESCED_DISTANCE_BYTES)
def maxCoalescedBytes: Long =
- conf.getConf(MAX_COALESCED_BYTES)
+ getConf(MAX_COALESCED_BYTES)
def cachePrefetchMinPct: Int =
- conf.getConf(CACHE_PREFETCH_MINPCT)
+ getConf(CACHE_PREFETCH_MINPCT)
- def enableColumnarProjectCollapse: Boolean =
conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
+ def enableColumnarProjectCollapse: Boolean =
getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
- def enableColumnarPartialProject: Boolean =
conf.getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
+ def enableColumnarPartialProject: Boolean =
getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
- def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)
+ def awsSdkLogLevel: String = getConf(AWS_SDK_LOG_LEVEL)
- def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)
+ def awsS3RetryMode: String = getConf(AWS_S3_RETRY_MODE)
- def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)
+ def awsConnectionTimeout: String = getConf(AWS_S3_CONNECT_TIMEOUT)
- def enableCastAvgAggregateFunction: Boolean =
conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
+ def enableCastAvgAggregateFunction: Boolean =
getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
def dynamicOffHeapSizingEnabled: Boolean =
- conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+ getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
- def enableHiveFileFormatWriter: Boolean =
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
+ def enableHiveFileFormatWriter: Boolean =
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
- def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
+ def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
- def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
+ def enableHdfsViewfs: Boolean = getConf(HDFS_VIEWFS_ENABLED)
def enableBroadcastBuildRelationInOffheap: Boolean =
- conf.getConf(VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP)
+ getConf(VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP)
}
object GlutenConfig {
import SQLConf._
+ def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)
+
+ def buildStaticConf(key: String): ConfigBuilder = {
+ ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
+ }
+
val GLUTEN_ENABLED_BY_DEFAULT = true
val GLUTEN_ENABLED_KEY = "spark.gluten.enabled"
val GLUTEN_LIB_NAME = "spark.gluten.sql.columnar.libname"
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
b/shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
similarity index 50%
copy from
shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
copy to
shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
index 1a45572acd..8293a4a09b 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
@@ -16,32 +16,15 @@
*/
package org.apache.spark.sql.internal
-import org.apache.spark.internal.config.ConfigReader
-
-import scala.collection.JavaConverters._
+/** A source of configuration values. */
+trait ConfigProvider {
+ def get(key: String): Option[String]
+}
-object GlutenConfigUtil {
- private def getConfString(reader: ConfigReader, key: String, value: String):
String = {
- Option(SQLConf.getConfigEntry(key))
- .map {
- _.readFrom(reader) match {
- case o: Option[_] => o.map(_.toString).getOrElse(value)
- case null => value
- case v => v.toString
- }
- }
- .getOrElse(value)
- }
+class SQLConfProvider(conf: SQLConf) extends ConfigProvider {
+ override def get(key: String): Option[String] =
Option(conf.settings.get(key))
+}
- def parseConfig(conf: Map[String, String]): Map[String, String] = {
- val reader = new
ConfigReader(conf.filter(_._1.startsWith("spark.gluten.")).asJava)
- conf.map {
- case (k, v) =>
- if (k.startsWith("spark.gluten.")) {
- (k, getConfString(reader, k, v))
- } else {
- (k, v)
- }
- }.toMap
- }
+class MapProvider(conf: Map[String, String]) extends ConfigProvider {
+ override def get(key: String): Option[String] = conf.get(key)
}
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
index 1a45572acd..87b90938a0 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -16,15 +16,13 @@
*/
package org.apache.spark.sql.internal
-import org.apache.spark.internal.config.ConfigReader
-
-import scala.collection.JavaConverters._
+import org.apache.gluten.config._
object GlutenConfigUtil {
- private def getConfString(reader: ConfigReader, key: String, value: String):
String = {
- Option(SQLConf.getConfigEntry(key))
+ private def getConfString(configProvider: ConfigProvider, key: String,
value: String): String = {
+ Option(ConfigEntry.findEntry(key))
.map {
- _.readFrom(reader) match {
+ _.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
case null => value
case v => v.toString
@@ -34,11 +32,11 @@ object GlutenConfigUtil {
}
def parseConfig(conf: Map[String, String]): Map[String, String] = {
- val reader = new
ConfigReader(conf.filter(_._1.startsWith("spark.gluten.")).asJava)
+ val provider = new
MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
conf.map {
case (k, v) =>
if (k.startsWith("spark.gluten.")) {
- (k, getConfString(reader, k, v))
+ (k, getConfString(provider, k, v))
} else {
(k, v)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]