This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a5d1ea610c [GLUTEN-8327][CORE][Part-3] Introduce the `ConfigEntry` to 
gluten config (#8431)
a5d1ea610c is described below

commit a5d1ea610cd28cacfe005e32c2163b37210b9154
Author: Kaifei Yi <[email protected]>
AuthorDate: Tue Jan 7 21:59:10 2025 +0800

    [GLUTEN-8327][CORE][Part-3] Introduce the `ConfigEntry` to gluten config 
(#8431)
    
    * Introduce the ConfigEntry to gluten config
    
    * change config provider
---
 .../execution/ColumnarCachedBatchSerializer.scala  |  14 +-
 .../org/apache/gluten/test/MockVeloxBackend.java   |   3 +-
 .../benchmark/ColumnarTableCacheBenchmark.scala    |  15 +-
 .../HashAggregateExecBaseTransformer.scala         |   2 +-
 .../gluten/execution/WholeStageTransformer.scala   |   4 +-
 .../execution/WriteFilesExecTransformer.scala      |   2 +-
 .../RemoveNativeWriteFilesSortAndProject.scala     |   2 +-
 .../SoftAffinityWithRDDInfoSuite.scala             |   2 +-
 .../sql/GlutenSparkSessionExtensionSuite.scala     |   6 +-
 .../sql/GlutenSparkSessionExtensionSuite.scala     |   6 +-
 .../sql/GlutenSparkSessionExtensionSuite.scala     |   6 +-
 .../sql/GlutenSparkSessionExtensionSuite.scala     |   6 +-
 .../org/apache/gluten/config/ConfigBuilder.scala   | 229 ++++++++++++++
 .../org/apache/gluten/config/ConfigEntry.scala     | 242 ++++++++++++++
 .../org/apache/gluten/config/GlutenConfig.scala    | 346 +++++++++++----------
 ...GlutenConfigUtil.scala => ConfigProvider.scala} |  35 +--
 .../spark/sql/internal/GlutenConfigUtil.scala      |  14 +-
 17 files changed, 706 insertions(+), 228 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 9982fa36c7..75e73f64e5 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -28,7 +28,7 @@ import 
org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
 import org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer
@@ -76,9 +76,11 @@ case class CachedColumnarBatch(
  *     -> Convert DefaultCachedBatch to InternalRow using vanilla Spark 
serializer
  */
 // format: on
-class ColumnarCachedBatchSerializer extends CachedBatchSerializer with 
SQLConfHelper with Logging {
+class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging 
{
   private lazy val rowBasedCachedBatchSerializer = new 
DefaultCachedBatchSerializer
 
+  private def glutenConf: GlutenConfig = GlutenConfig.get
+
   private def toStructType(schema: Seq[Attribute]): StructType = {
     StructType(schema.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata)))
   }
@@ -108,14 +110,14 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with SQLConfHe
     // `convertColumnarBatchToCachedBatch`, but the inside ColumnarBatch is 
not arrow-based.
     // See: `InMemoryRelation.apply()`.
     // So we should disallow columnar input if using vanilla Spark columnar 
scan.
-    val noVanillaSparkColumnarScan = 
conf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
-      !conf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
-    conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
+    val noVanillaSparkColumnarScan = 
glutenConf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
+      !glutenConf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
+    glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
       schema) && noVanillaSparkColumnarScan
   }
 
   override def supportsColumnarOutput(schema: StructType): Boolean = {
-    conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
+    glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
   }
 
   override def convertInternalRowToCachedBatch(
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java 
b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
index 912aa09bcb..efb3903d23 100644
--- a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
+++ b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.test;
 
 import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenConfig$;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.spark.SparkConf;
@@ -71,7 +72,7 @@ public final class MockVeloxBackend {
   private static SparkConf newSparkConf() {
     final SparkConf conf = new SparkConf();
     conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
-    conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS(), "0");
+    
conf.set(GlutenConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), 
"0");
     return conf;
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
index 10c0b01ead..0cecae47a9 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCacheBenchmark.scala
@@ -33,11 +33,16 @@ object ColumnarTableCacheBenchmark extends 
SqlBasedBenchmark {
 
   private def doBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = 
{
     val benchmark = new Benchmark(name, cardinality, output = output)
-    val flag = if 
(spark.sessionState.conf.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
-      "enable"
-    } else {
-      "disable"
-    }
+    val flag =
+      if (
+        spark.sessionState.conf
+          .getConfString(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key)
+          .toBoolean
+      ) {
+        "enable"
+      } else {
+        "disable"
+      }
     benchmark.addCase(s"$flag columnar table cache", 3)(_ => f)
     benchmark.run()
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 46c1ad0029..35809be0be 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -53,7 +53,7 @@ abstract class HashAggregateExecBaseTransformer(
     
BackendsApiManager.getMetricsApiInstance.genHashAggregateTransformerMetrics(sparkContext)
 
   protected def isCapableForStreamingAggregation: Boolean = {
-    if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
+    if (!glutenConf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) 
{
       return false
     }
     if (groupingExpressions.isEmpty) {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 5d37d53893..7a06709da0 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -271,7 +271,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
   override def nodeName: String = s"WholeStageCodegenTransformer 
($transformStageId)"
 
   override def verboseStringWithOperatorId(): String = {
-    val nativePlan = if 
(conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
+    val nativePlan = if 
(glutenConf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
       s"Native Plan:\n${nativePlanString()}"
     } else {
       ""
@@ -315,7 +315,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   def doWholeStageTransform(): WholeStageTransformContext = {
     val context = generateWholeStageTransformContext()
-    if (conf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
+    if 
(glutenConf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
       wholeStageTransformerContext = Some(context)
     }
     context
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 60259e00dc..1a95a96a55 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -118,7 +118,7 @@ case class WriteFilesExecTransformer(
   }
 
   private def getFinalChildOutput: Seq[Attribute] = {
-    val metadataExclusionList = conf
+    val metadataExclusionList = glutenConf
       .getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
       .split(",")
       .map(_.trim)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
index ebab261442..7fce082f8a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, 
SparkPlan}
  */
 case class RemoveNativeWriteFilesSortAndProject() extends Rule[SparkPlan] {
   override def apply(plan: SparkPlan): SparkPlan = {
-    if 
(!conf.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
+    if 
(!GlutenConfig.get.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT))
 {
       return plan
     }
 
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 7915282e3e..2100518e74 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -49,7 +49,7 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with 
SharedSparkSession wit
     .set(GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED, 
"true")
     .set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
     .set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")
-    .set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL, "INFO")
+    .set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL.key, "INFO")
 
   test("Soft Affinity Scheduler with duplicate reading detection") {
     if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
         try {
           
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
           def testWithFallbackSettings(scanFallback: Boolean, aggFallback: 
Boolean): Unit = {
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, 
scanFallback)
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, 
aggFallback)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, 
scanFallback.toString)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, 
aggFallback.toString)
             val df = session.sql("SELECT max(id) FROM a")
             val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
               session,
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
         try {
           
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
           def testWithFallbackSettings(scanFallback: Boolean, aggFallback: 
Boolean): Unit = {
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, 
scanFallback)
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, 
aggFallback)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, 
scanFallback.toString)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, 
aggFallback.toString)
             val df = session.sql("SELECT max(id) FROM a")
             val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
               session,
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
         try {
           
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
           def testWithFallbackSettings(scanFallback: Boolean, aggFallback: 
Boolean): Unit = {
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, 
scanFallback)
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, 
aggFallback)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, 
scanFallback.toString)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, 
aggFallback.toString)
             val df = session.sql("SELECT max(id) FROM a")
             val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
               session,
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
index c5d528ddec..ae9b3901af 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSparkSessionExtensionSuite.scala
@@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
         try {
           
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
           def testWithFallbackSettings(scanFallback: Boolean, aggFallback: 
Boolean): Unit = {
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, 
scanFallback)
-            
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, 
aggFallback)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, 
scanFallback.toString)
+            session.sessionState.conf
+              .setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, 
aggFallback.toString)
             val df = session.sql("SELECT max(id) FROM a")
             val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
               session,
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala
new file mode 100644
index 0000000000..475fe69410
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/config/ConfigBuilder.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.network.util.JavaUtils
+
+import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern;
+
+object BackendType extends Enumeration {
+  type BackendType = Value
+  val COMMON, VELOX, CLICKHOUSE = Value
+}
+
+private[gluten] case class ConfigBuilder(key: String) {
+  import ConfigHelpers._
+
+  private[config] var _doc = ""
+  private[config] var _version = ""
+  private[config] var _backend = BackendType.COMMON
+  private[config] var _public = true
+  private[config] var _alternatives = List.empty[String]
+  private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
+
+  def doc(s: String): ConfigBuilder = {
+    _doc = s
+    this
+  }
+
+  def version(s: String): ConfigBuilder = {
+    _version = s
+    this
+  }
+
+  def backend(backend: BackendType.BackendType): ConfigBuilder = {
+    _backend = backend
+    this
+  }
+
+  def internal(): ConfigBuilder = {
+    _public = false
+    this
+  }
+
+  def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
+    _onCreate = Option(callback)
+    this
+  }
+
+  def withAlternative(key: String): ConfigBuilder = {
+    _alternatives = _alternatives :+ key
+    this
+  }
+
+  def intConf: TypedConfigBuilder[Int] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
+  }
+
+  def longConf: TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
+  }
+
+  def doubleConf: TypedConfigBuilder[Double] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
+  }
+
+  def booleanConf: TypedConfigBuilder[Boolean] = {
+    new TypedConfigBuilder(this, toBoolean(_, key))
+  }
+
+  def stringConf: TypedConfigBuilder[String] = {
+    new TypedConfigBuilder(this, identity)
+  }
+
+  def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, 
unit))
+  }
+
+  def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, 
unit))
+  }
+
+  def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
+    val entry =
+      new ConfigEntryFallback[T](key, _doc, _version, _backend, _public, 
_alternatives, fallback)
+    _onCreate.foreach(_(entry))
+    entry
+  }
+}
+
+private object ConfigHelpers {
+  def toNumber[T](s: String, converter: String => T, key: String, configType: 
String): T = {
+    try {
+      converter(s.trim)
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalArgumentException(s"$key should be $configType, but 
was $s")
+    }
+  }
+
+  def toBoolean(s: String, key: String): Boolean = {
+    try {
+      s.trim.toBoolean
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new IllegalArgumentException(s"$key should be boolean, but was 
$s")
+    }
+  }
+
+  private val TIME_STRING_PATTERN = Pattern.compile("(-?[0-9]+)([a-z]+)?")
+
+  def timeFromString(str: String, unit: TimeUnit): Long = 
JavaUtils.timeStringAs(str, unit)
+
+  def timeToString(v: Long, unit: TimeUnit): String = 
s"${TimeUnit.MILLISECONDS.convert(v, unit)}ms"
+
+  def byteFromString(str: String, unit: ByteUnit): Long = {
+    val (input, multiplier) =
+      if (str.length() > 0 && str.charAt(0) == '-') {
+        (str.substring(1), -1)
+      } else {
+        (str, 1)
+      }
+    multiplier * JavaUtils.byteStringAs(input, unit)
+  }
+
+  def byteToString(v: Long, unit: ByteUnit): String = s"${unit.convertTo(v, 
ByteUnit.BYTE)}b"
+}
+
+private[gluten] class TypedConfigBuilder[T](
+    val parent: ConfigBuilder,
+    val converter: String => T,
+    val stringConverter: T => String) {
+
+  def this(parent: ConfigBuilder, converter: String => T) = {
+    this(parent, converter, { v: T => v.toString })
+  }
+
+  def transform(fn: T => T): TypedConfigBuilder[T] = {
+    new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
+  }
+
+  def checkValue(validator: T => Boolean, errorMsg: String): 
TypedConfigBuilder[T] = {
+    transform {
+      v =>
+        if (!validator(v)) {
+          throw new IllegalArgumentException(s"'$v' in ${parent.key} is 
invalid. $errorMsg")
+        }
+        v
+    }
+  }
+
+  def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
+    transform {
+      v =>
+        if (!validValues.contains(v)) {
+          throw new IllegalArgumentException(
+            s"The value of ${parent.key} should be one of 
${validValues.mkString(", ")}, " +
+              s"but was $v")
+        }
+        v
+    }
+  }
+
+  def createOptional: OptionalConfigEntry[T] = {
+    val entry = new OptionalConfigEntry[T](
+      parent.key,
+      parent._doc,
+      parent._version,
+      parent._backend,
+      parent._public,
+      parent._alternatives,
+      converter,
+      stringConverter)
+    parent._onCreate.foreach(_(entry))
+    entry
+  }
+
+  def createWithDefault(default: T): ConfigEntry[T] = {
+    assert(default != null, "Use createOptional.")
+    default match {
+      case str: String => createWithDefaultString(str)
+      case _ =>
+        val transformedDefault = converter(stringConverter(default))
+        val entry = new ConfigEntryWithDefault[T](
+          parent.key,
+          parent._doc,
+          parent._version,
+          parent._backend,
+          parent._public,
+          parent._alternatives,
+          converter,
+          stringConverter,
+          transformedDefault)
+        parent._onCreate.foreach(_(entry))
+        entry
+    }
+  }
+
+  def createWithDefaultString(default: String): ConfigEntry[T] = {
+    val entry = new ConfigEntryWithDefaultString[T](
+      parent.key,
+      parent._doc,
+      parent._version,
+      parent._backend,
+      parent._public,
+      parent._alternatives,
+      converter,
+      stringConverter,
+      default
+    )
+    parent._onCreate.foreach(_(entry))
+    entry
+  }
+}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala
new file mode 100644
index 0000000000..5d087d3385
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/config/ConfigEntry.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.gluten.config.BackendType.BackendType
+
+import org.apache.spark.sql.internal.ConfigProvider
+
+/**
+ * An entry contains all meta information for a configuration.
+ *
+ * The code is similar to Spark's relevant config code but extended for 
Gluten's use, like adding
+ * backend type, etc.
+ *
+ * @tparam T
+ *   the value type
+ */
+trait ConfigEntry[T] {
+
+  /** The key for the configuration. */
+  def key: String
+
+  /** The documentation for the configuration. */
+  def doc: String
+
+  /** The gluten version when the configuration was released. */
+  def version: String
+
+  /** The backend type of the configuration. */
+  def backend: BackendType.BackendType
+
+  /**
+   * If this configuration is public to the user. If it's `false`, this 
configuration is only used
+   * internally and we should not expose it to users.
+   */
+  def isPublic: Boolean
+
+  /** the alternative keys for the configuration. */
+  def alternatives: List[String]
+
+  /**
+   * How to convert a string to the value. It should throw an exception if the 
string does not have
+   * the required format.
+   */
+  def valueConverter: String => T
+
+  /** How to convert a value to a string that the user can use it as a valid 
string value. */
+  def stringConverter: T => String
+
+  /** Read the configuration from the given ConfigProvider. */
+  def readFrom(conf: ConfigProvider): T
+
+  /** The default value of the configuration. */
+  def defaultValue: Option[T]
+
+  /** The string representation of the default value. */
+  def defaultValueString: String
+
+  final protected def readString(provider: ConfigProvider): Option[String] = {
+    alternatives.foldLeft(provider.get(key))((res, nextKey) => 
res.orElse(provider.get(nextKey)))
+  }
+
+  override def toString: String = {
+    s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
+      s"public=$isPublic, version=$version)"
+  }
+
+  ConfigEntry.registerEntry(this)
+}
+
+private[gluten] class OptionalConfigEntry[T](
+    _key: String,
+    _doc: String,
+    _version: String,
+    _backend: BackendType,
+    _isPublic: Boolean,
+    _alternatives: List[String],
+    _valueConverter: String => T,
+    _stringConverter: T => String)
+  extends ConfigEntry[Option[T]] {
+  override def key: String = _key
+
+  override def doc: String = _doc
+
+  override def version: String = _version
+
+  override def backend: BackendType = _backend
+
+  override def isPublic: Boolean = _isPublic
+
+  override def alternatives: List[String] = _alternatives
+
+  override def valueConverter: String => Option[T] = s => 
Option(_valueConverter(s))
+
+  override def stringConverter: Option[T] => String = v => 
v.map(_stringConverter).orNull
+
+  override def readFrom(conf: ConfigProvider): Option[T] = 
readString(conf).map(_valueConverter)
+
+  override def defaultValue: Option[Option[T]] = None
+
+  override def defaultValueString: String = ConfigEntry.UNDEFINED
+}
+
+private[gluten] class ConfigEntryWithDefault[T](
+    _key: String,
+    _doc: String,
+    _version: String,
+    _backend: BackendType,
+    _isPublic: Boolean,
+    _alternatives: List[String],
+    _valueConverter: String => T,
+    _stringConverter: T => String,
+    _defaultVal: T)
+  extends ConfigEntry[T] {
+  override def key: String = _key
+
+  override def doc: String = _doc
+
+  override def version: String = _version
+
+  override def backend: BackendType = _backend
+
+  override def isPublic: Boolean = _isPublic
+
+  override def alternatives: List[String] = _alternatives
+
+  override def valueConverter: String => T = _valueConverter
+
+  override def stringConverter: T => String = _stringConverter
+
+  override def readFrom(conf: ConfigProvider): T = {
+    readString(conf).map(valueConverter).getOrElse(_defaultVal)
+  }
+
+  override def defaultValue: Option[T] = Option(_defaultVal)
+
+  override def defaultValueString: String = stringConverter(_defaultVal)
+}
+
+private[gluten] class ConfigEntryWithDefaultString[T](
+    _key: String,
+    _doc: String,
+    _version: String,
+    _backend: BackendType,
+    _isPublic: Boolean,
+    _alternatives: List[String],
+    _valueConverter: String => T,
+    _stringConverter: T => String,
+    _defaultVal: String)
+  extends ConfigEntry[T] {
+  override def key: String = _key
+
+  override def doc: String = _doc
+
+  override def version: String = _version
+
+  override def backend: BackendType = _backend
+
+  override def isPublic: Boolean = _isPublic
+
+  override def alternatives: List[String] = _alternatives
+
+  override def valueConverter: String => T = _valueConverter
+
+  override def stringConverter: T => String = _stringConverter
+
+  override def readFrom(conf: ConfigProvider): T = {
+    val value = readString(conf).getOrElse(_defaultVal)
+    valueConverter(value)
+  }
+
+  override def defaultValue: Option[T] = Some(valueConverter(_defaultVal))
+
+  override def defaultValueString: String = _defaultVal
+}
+
+private[gluten] class ConfigEntryFallback[T](
+    _key: String,
+    _doc: String,
+    _version: String,
+    _backend: BackendType,
+    _isPublic: Boolean,
+    _alternatives: List[String],
+    fallback: ConfigEntry[T])
+  extends ConfigEntry[T] {
+  override def key: String = _key
+
+  override def doc: String = _doc
+
+  override def version: String = _version
+
+  override def backend: BackendType = _backend
+
+  override def isPublic: Boolean = _isPublic
+
+  override def alternatives: List[String] = _alternatives
+
+  override def valueConverter: String => T = fallback.valueConverter
+
+  override def stringConverter: T => String = fallback.stringConverter
+
+  override def readFrom(conf: ConfigProvider): T = {
+    readString(conf).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  }
+
+  override def defaultValue: Option[T] = fallback.defaultValue
+
+  override def defaultValueString: String = fallback.defaultValueString
+}
+
+object ConfigEntry {
+
+  val UNDEFINED = "<undefined>"
+
+  private val knownConfigs =
+    new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+  private def registerEntry(entry: ConfigEntry[_]): Unit = {
+    val existing = knownConfigs.putIfAbsent(entry.key, entry)
+    require(existing == null, s"Config entry ${entry.key} already registered!")
+  }
+
+  def containsEntry(entry: ConfigEntry[_]): Boolean = {
+    Option(knownConfigs.get(entry.key)).isDefined
+  }
+
+  def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index cd01b1a42f..cee4ea7d16 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.config
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
 
 import com.google.common.collect.ImmutableList
 import org.apache.hadoop.security.UserGroupInformation
@@ -37,97 +37,104 @@ case class GlutenNumaBindingInfo(
 class GlutenConfig(conf: SQLConf) extends Logging {
   import GlutenConfig._
 
+  private lazy val configProvider = new SQLConfProvider(conf)
+
+  def getConf[T](entry: ConfigEntry[T]): T = {
+    require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
+    entry.readFrom(configProvider)
+  }
+
   def enableAnsiMode: Boolean = conf.ansiEnabled
 
-  def enableGluten: Boolean = conf.getConf(GLUTEN_ENABLED)
+  def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
 
   // FIXME the option currently controls both JVM and native validation 
against a Substrait plan.
-  def enableNativeValidation: Boolean = conf.getConf(NATIVE_VALIDATION_ENABLED)
+  def enableNativeValidation: Boolean = getConf(NATIVE_VALIDATION_ENABLED)
 
-  def enableColumnarBatchScan: Boolean = 
conf.getConf(COLUMNAR_BATCHSCAN_ENABLED)
+  def enableColumnarBatchScan: Boolean = getConf(COLUMNAR_BATCHSCAN_ENABLED)
 
-  def enableColumnarFileScan: Boolean = conf.getConf(COLUMNAR_FILESCAN_ENABLED)
+  def enableColumnarFileScan: Boolean = getConf(COLUMNAR_FILESCAN_ENABLED)
 
-  def enableColumnarHiveTableScan: Boolean = 
conf.getConf(COLUMNAR_HIVETABLESCAN_ENABLED)
+  def enableColumnarHiveTableScan: Boolean = 
getConf(COLUMNAR_HIVETABLESCAN_ENABLED)
 
   def enableColumnarHiveTableScanNestedColumnPruning: Boolean =
-    conf.getConf(COLUMNAR_HIVETABLESCAN_NESTED_COLUMN_PRUNING_ENABLED)
+    getConf(COLUMNAR_HIVETABLESCAN_NESTED_COLUMN_PRUNING_ENABLED)
 
-  def enableVanillaVectorizedReaders: Boolean = 
conf.getConf(VANILLA_VECTORIZED_READERS_ENABLED)
+  def enableVanillaVectorizedReaders: Boolean = 
getConf(VANILLA_VECTORIZED_READERS_ENABLED)
 
-  def enableColumnarHashAgg: Boolean = conf.getConf(COLUMNAR_HASHAGG_ENABLED)
+  def enableColumnarHashAgg: Boolean = getConf(COLUMNAR_HASHAGG_ENABLED)
 
-  def forceToUseHashAgg: Boolean = conf.getConf(COLUMNAR_FORCE_HASHAGG_ENABLED)
+  def forceToUseHashAgg: Boolean = getConf(COLUMNAR_FORCE_HASHAGG_ENABLED)
 
-  def mergeTwoPhasesAggEnabled: Boolean = 
conf.getConf(MERGE_TWO_PHASES_ENABLED)
+  def mergeTwoPhasesAggEnabled: Boolean = getConf(MERGE_TWO_PHASES_ENABLED)
 
-  def enableColumnarProject: Boolean = conf.getConf(COLUMNAR_PROJECT_ENABLED)
+  def enableColumnarProject: Boolean = getConf(COLUMNAR_PROJECT_ENABLED)
 
-  def enableColumnarFilter: Boolean = conf.getConf(COLUMNAR_FILTER_ENABLED)
+  def enableColumnarFilter: Boolean = getConf(COLUMNAR_FILTER_ENABLED)
 
-  def enableColumnarSort: Boolean = conf.getConf(COLUMNAR_SORT_ENABLED)
+  def enableColumnarSort: Boolean = getConf(COLUMNAR_SORT_ENABLED)
 
-  def enableColumnarWindow: Boolean = conf.getConf(COLUMNAR_WINDOW_ENABLED)
+  def enableColumnarWindow: Boolean = getConf(COLUMNAR_WINDOW_ENABLED)
 
-  def enableColumnarWindowGroupLimit: Boolean = 
conf.getConf(COLUMNAR_WINDOW_GROUP_LIMIT_ENABLED)
+  def enableColumnarWindowGroupLimit: Boolean = 
getConf(COLUMNAR_WINDOW_GROUP_LIMIT_ENABLED)
 
-  def veloxColumnarWindowType: String = 
conf.getConfString(COLUMNAR_VELOX_WINDOW_TYPE.key)
+  def veloxColumnarWindowType: String = getConf(COLUMNAR_VELOX_WINDOW_TYPE)
 
-  def enableColumnarShuffledHashJoin: Boolean = 
conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
+  def enableColumnarShuffledHashJoin: Boolean = 
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
 
   def shuffledHashJoinOptimizeBuildSide: Boolean =
-    conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE)
+    getConf(COLUMNAR_SHUFFLED_HASH_JOIN_OPTIMIZE_BUILD_SIDE)
 
-  def enableNativeColumnarToRow: Boolean = 
conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)
+  def enableNativeColumnarToRow: Boolean = 
getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)
 
-  def forceShuffledHashJoin: Boolean = 
conf.getConf(COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED)
+  def forceShuffledHashJoin: Boolean = 
getConf(COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED)
 
-  def enableColumnarSortMergeJoin: Boolean = 
conf.getConf(COLUMNAR_SORTMERGEJOIN_ENABLED)
+  def enableColumnarSortMergeJoin: Boolean = 
getConf(COLUMNAR_SORTMERGEJOIN_ENABLED)
 
-  def enableColumnarUnion: Boolean = conf.getConf(COLUMNAR_UNION_ENABLED)
+  def enableColumnarUnion: Boolean = getConf(COLUMNAR_UNION_ENABLED)
 
-  def enableNativeUnion: Boolean = conf.getConf(NATIVE_UNION_ENABLED)
+  def enableNativeUnion: Boolean = getConf(NATIVE_UNION_ENABLED)
 
-  def enableColumnarExpand: Boolean = conf.getConf(COLUMNAR_EXPAND_ENABLED)
+  def enableColumnarExpand: Boolean = getConf(COLUMNAR_EXPAND_ENABLED)
 
-  def enableColumnarBroadcastExchange: Boolean = 
conf.getConf(COLUMNAR_BROADCAST_EXCHANGE_ENABLED)
+  def enableColumnarBroadcastExchange: Boolean = 
getConf(COLUMNAR_BROADCAST_EXCHANGE_ENABLED)
 
-  def enableColumnarBroadcastJoin: Boolean = 
conf.getConf(COLUMNAR_BROADCAST_JOIN_ENABLED)
+  def enableColumnarBroadcastJoin: Boolean = 
getConf(COLUMNAR_BROADCAST_JOIN_ENABLED)
 
-  def enableColumnarSample: Boolean = conf.getConf(COLUMNAR_SAMPLE_ENABLED)
+  def enableColumnarSample: Boolean = getConf(COLUMNAR_SAMPLE_ENABLED)
 
-  def enableColumnarArrowUDF: Boolean = 
conf.getConf(COLUMNAR_ARROW_UDF_ENABLED)
+  def enableColumnarArrowUDF: Boolean = getConf(COLUMNAR_ARROW_UDF_ENABLED)
 
-  def enableColumnarCoalesce: Boolean = conf.getConf(COLUMNAR_COALESCE_ENABLED)
+  def enableColumnarCoalesce: Boolean = getConf(COLUMNAR_COALESCE_ENABLED)
 
-  def columnarTableCacheEnabled: Boolean = 
conf.getConf(COLUMNAR_TABLE_CACHE_ENABLED)
+  def columnarTableCacheEnabled: Boolean = 
getConf(COLUMNAR_TABLE_CACHE_ENABLED)
 
   def enableRewriteDateTimestampComparison: Boolean =
-    conf.getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
+    getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
 
   def enableCollapseNestedGetJsonObject: Boolean =
-    conf.getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)
+    getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)
 
   def enableCHRewriteDateConversion: Boolean =
-    conf.getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
+    getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
 
   def enableCommonSubexpressionEliminate: Boolean =
-    conf.getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
+    getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
 
   def enableCountDistinctWithoutExpand: Boolean =
-    conf.getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
+    getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
 
   def enableExtendedColumnPruning: Boolean =
-    conf.getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
+    getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
 
   def veloxOrcScanEnabled: Boolean =
-    conf.getConf(VELOX_ORC_SCAN_ENABLED)
+    getConf(VELOX_ORC_SCAN_ENABLED)
 
   def forceOrcCharTypeScanFallbackEnabled: Boolean =
-    conf.getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
+    getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
 
   def scanFileSchemeValidationEnabled: Boolean =
-    conf.getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
+    getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
 
   // Whether to use GlutenShuffleManager (experimental).
   def isUseGlutenShuffleManager: Boolean =
@@ -158,44 +165,43 @@ class GlutenConfig(conf: SQLConf) extends Logging {
       .getConfString("spark.celeborn.client.spark.shuffle.writer", 
GLUTEN_HASH_SHUFFLE_WRITER)
       .toLowerCase(Locale.ROOT)
 
-  def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
+  def enableColumnarShuffle: Boolean = getConf(COLUMNAR_SHUFFLE_ENABLED)
 
-  def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)
+  def enablePreferColumnar: Boolean = getConf(COLUMNAR_PREFER_ENABLED)
 
-  def enableOneRowRelationColumnar: Boolean = 
conf.getConf(COLUMNAR_ONE_ROW_RELATION_ENABLED)
+  def enableOneRowRelationColumnar: Boolean = 
getConf(COLUMNAR_ONE_ROW_RELATION_ENABLED)
 
   def physicalJoinOptimizationThrottle: Integer =
-    conf.getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_THROTTLE)
+    getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_THROTTLE)
 
   def enablePhysicalJoinOptimize: Boolean =
-    conf.getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_ENABLED)
+    getConf(COLUMNAR_PHYSICAL_JOIN_OPTIMIZATION_ENABLED)
 
   def logicalJoinOptimizationThrottle: Integer =
-    conf.getConf(COLUMNAR_LOGICAL_JOIN_OPTIMIZATION_THROTTLE)
+    getConf(COLUMNAR_LOGICAL_JOIN_OPTIMIZATION_THROTTLE)
 
-  def enableScanOnly: Boolean = conf.getConf(COLUMNAR_SCAN_ONLY_ENABLED)
+  def enableScanOnly: Boolean = getConf(COLUMNAR_SCAN_ONLY_ENABLED)
 
-  def tmpFile: Option[String] = conf.getConf(COLUMNAR_TEMP_DIR)
+  def tmpFile: Option[String] = getConf(COLUMNAR_TEMP_DIR)
 
-  @deprecated def broadcastCacheTimeout: Int = 
conf.getConf(COLUMNAR_BROADCAST_CACHE_TIMEOUT)
+  @deprecated def broadcastCacheTimeout: Int = 
getConf(COLUMNAR_BROADCAST_CACHE_TIMEOUT)
 
   def columnarShuffleSortPartitionsThreshold: Int =
-    conf.getConf(COLUMNAR_SHUFFLE_SORT_PARTITIONS_THRESHOLD)
+    getConf(COLUMNAR_SHUFFLE_SORT_PARTITIONS_THRESHOLD)
 
   def columnarShuffleSortColumnsThreshold: Int =
-    conf.getConf(COLUMNAR_SHUFFLE_SORT_COLUMNS_THRESHOLD)
+    getConf(COLUMNAR_SHUFFLE_SORT_COLUMNS_THRESHOLD)
 
-  def columnarShuffleReallocThreshold: Double = 
conf.getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
+  def columnarShuffleReallocThreshold: Double = 
getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
 
-  def columnarShuffleMergeThreshold: Double = 
conf.getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
+  def columnarShuffleMergeThreshold: Double = 
getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
 
-  def columnarShuffleCodec: Option[String] = 
conf.getConf(COLUMNAR_SHUFFLE_CODEC)
+  def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC)
 
   def columnarShuffleCompressionMode: String =
-    conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_MODE)
+    getConf(COLUMNAR_SHUFFLE_COMPRESSION_MODE)
 
-  def columnarShuffleCodecBackend: Option[String] = conf
-    .getConf(COLUMNAR_SHUFFLE_CODEC_BACKEND)
+  def columnarShuffleCodecBackend: Option[String] = 
getConf(COLUMNAR_SHUFFLE_CODEC_BACKEND)
     .filter(Set(GLUTEN_QAT_BACKEND_NAME, GLUTEN_IAA_BACKEND_NAME).contains(_))
 
   def columnarShuffleEnableQat: Boolean =
@@ -205,54 +211,53 @@ class GlutenConfig(conf: SQLConf) extends Logging {
     columnarShuffleCodecBackend.contains(GlutenConfig.GLUTEN_IAA_BACKEND_NAME)
 
   def columnarShuffleCompressionThreshold: Int =
-    conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
+    getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
 
   def columnarShuffleReaderBufferSize: Long =
-    conf.getConf(COLUMNAR_SHUFFLE_READER_BUFFER_SIZE)
+    getConf(COLUMNAR_SHUFFLE_READER_BUFFER_SIZE)
 
-  def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+  def maxBatchSize: Int = getConf(COLUMNAR_MAX_BATCH_SIZE)
 
   def columnarToRowMemThreshold: Long =
-    conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
+    getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
 
-  def shuffleWriterBufferSize: Int = conf
-    .getConf(SHUFFLE_WRITER_BUFFER_SIZE)
+  def shuffleWriterBufferSize: Int = getConf(SHUFFLE_WRITER_BUFFER_SIZE)
     .getOrElse(maxBatchSize)
 
-  def enableColumnarLimit: Boolean = conf.getConf(COLUMNAR_LIMIT_ENABLED)
+  def enableColumnarLimit: Boolean = getConf(COLUMNAR_LIMIT_ENABLED)
 
-  def enableColumnarGenerate: Boolean = conf.getConf(COLUMNAR_GENERATE_ENABLED)
+  def enableColumnarGenerate: Boolean = getConf(COLUMNAR_GENERATE_ENABLED)
 
   def enableTakeOrderedAndProject: Boolean =
-    conf.getConf(COLUMNAR_TAKE_ORDERED_AND_PROJECT_ENABLED)
+    getConf(COLUMNAR_TAKE_ORDERED_AND_PROJECT_ENABLED)
 
-  def enableNativeBloomFilter: Boolean = 
conf.getConf(COLUMNAR_NATIVE_BLOOMFILTER_ENABLED)
+  def enableNativeBloomFilter: Boolean = 
getConf(COLUMNAR_NATIVE_BLOOMFILTER_ENABLED)
 
   def enableNativeHyperLogLogAggregateFunction: Boolean =
-    conf.getConf(COLUMNAR_NATIVE_HYPERLOGLOG_AGGREGATE_ENABLED)
+    getConf(COLUMNAR_NATIVE_HYPERLOGLOG_AGGREGATE_ENABLED)
 
   def columnarParquetWriteBlockSize: Long =
-    conf.getConf(COLUMNAR_PARQUET_WRITE_BLOCK_SIZE)
+    getConf(COLUMNAR_PARQUET_WRITE_BLOCK_SIZE)
 
   def columnarParquetWriteBlockRows: Long =
-    conf.getConf(COLUMNAR_PARQUET_WRITE_BLOCK_ROWS)
+    getConf(COLUMNAR_PARQUET_WRITE_BLOCK_ROWS)
 
-  def wholeStageFallbackThreshold: Int = 
conf.getConf(COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD)
+  def wholeStageFallbackThreshold: Int = 
getConf(COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD)
 
-  def queryFallbackThreshold: Int = 
conf.getConf(COLUMNAR_QUERY_FALLBACK_THRESHOLD)
+  def queryFallbackThreshold: Int = getConf(COLUMNAR_QUERY_FALLBACK_THRESHOLD)
 
-  def fallbackIgnoreRowToColumnar: Boolean = 
conf.getConf(COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR)
+  def fallbackIgnoreRowToColumnar: Boolean = 
getConf(COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR)
 
-  def fallbackExpressionsThreshold: Int = 
conf.getConf(COLUMNAR_FALLBACK_EXPRESSIONS_THRESHOLD)
+  def fallbackExpressionsThreshold: Int = 
getConf(COLUMNAR_FALLBACK_EXPRESSIONS_THRESHOLD)
 
-  def fallbackPreferColumnar: Boolean = 
conf.getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
+  def fallbackPreferColumnar: Boolean = 
getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
 
   def numaBindingInfo: GlutenNumaBindingInfo = {
-    val enableNumaBinding: Boolean = 
conf.getConf(COLUMNAR_NUMA_BINDING_ENABLED)
+    val enableNumaBinding: Boolean = getConf(COLUMNAR_NUMA_BINDING_ENABLED)
     if (!enableNumaBinding) {
       GlutenNumaBindingInfo(enableNumaBinding = false)
     } else {
-      val tmp = conf.getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
+      val tmp = getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
       if (tmp.isEmpty) {
         GlutenNumaBindingInfo(enableNumaBinding = false)
       } else {
@@ -264,78 +269,78 @@ class GlutenConfig(conf: SQLConf) extends Logging {
     }
   }
 
-  def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)
+  def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
 
-  def memoryBacktraceAllocation: Boolean = 
conf.getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)
+  def memoryBacktraceAllocation: Boolean = 
getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)
 
   def numTaskSlotsPerExecutor: Int = {
-    val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
+    val numSlots = getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
     assert(numSlots > 0, s"Number of task slot not found. This should not 
happen.")
     numSlots
   }
 
-  def offHeapMemorySize: Long = conf.getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+  def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
 
-  def taskOffHeapMemorySize: Long = 
conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
+  def taskOffHeapMemorySize: Long = 
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
 
-  def memoryOverAcquiredRatio: Double = 
conf.getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
+  def memoryOverAcquiredRatio: Double = 
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
 
-  def memoryReservationBlockSize: Long = 
conf.getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
+  def memoryReservationBlockSize: Long = 
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
 
   def conservativeTaskOffHeapMemorySize: Long =
-    conf.getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
+    getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
 
   // Options used by RAS.
-  def enableRas: Boolean = conf.getConf(RAS_ENABLED)
+  def enableRas: Boolean = getConf(RAS_ENABLED)
 
-  def rasCostModel: String = conf.getConf(RAS_COST_MODEL)
+  def rasCostModel: String = getConf(RAS_COST_MODEL)
 
-  def rasRough2SizeBytesThreshold: Long = 
conf.getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
+  def rasRough2SizeBytesThreshold: Long = 
getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
 
-  def rasRough2R2cCost: Long = conf.getConf(RAS_ROUGH2_R2C_COST)
+  def rasRough2R2cCost: Long = getConf(RAS_ROUGH2_R2C_COST)
 
-  def rasRough2VanillaCost: Long = conf.getConf(RAS_ROUGH2_VANILLA_COST)
+  def rasRough2VanillaCost: Long = getConf(RAS_ROUGH2_VANILLA_COST)
 
-  def enableVeloxCache: Boolean = conf.getConf(COLUMNAR_VELOX_CACHE_ENABLED)
+  def enableVeloxCache: Boolean = getConf(COLUMNAR_VELOX_CACHE_ENABLED)
 
-  def veloxMemCacheSize: Long = conf.getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
+  def veloxMemCacheSize: Long = getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
 
-  def veloxSsdCachePath: String = conf.getConf(COLUMNAR_VELOX_SSD_CACHE_PATH)
+  def veloxSsdCachePath: String = getConf(COLUMNAR_VELOX_SSD_CACHE_PATH)
 
-  def veloxSsdCacheSize: Long = conf.getConf(COLUMNAR_VELOX_SSD_CACHE_SIZE)
+  def veloxSsdCacheSize: Long = getConf(COLUMNAR_VELOX_SSD_CACHE_SIZE)
 
-  def veloxSsdCacheShards: Integer = 
conf.getConf(COLUMNAR_VELOX_SSD_CACHE_SHARDS)
+  def veloxSsdCacheShards: Integer = getConf(COLUMNAR_VELOX_SSD_CACHE_SHARDS)
 
-  def veloxSsdCacheIOThreads: Integer = 
conf.getConf(COLUMNAR_VELOX_SSD_CACHE_IO_THREADS)
+  def veloxSsdCacheIOThreads: Integer = 
getConf(COLUMNAR_VELOX_SSD_CACHE_IO_THREADS)
 
-  def veloxSsdODirectEnabled: Boolean = 
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
+  def veloxSsdODirectEnabled: Boolean = 
getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
 
   def veloxConnectorIOThreads: Int = {
-    
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+    
getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
   }
 
-  def veloxSplitPreloadPerDriver: Integer = 
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
+  def veloxSplitPreloadPerDriver: Integer = 
getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
 
-  def veloxSpillStrategy: String = conf.getConf(COLUMNAR_VELOX_SPILL_STRATEGY)
+  def veloxSpillStrategy: String = getConf(COLUMNAR_VELOX_SPILL_STRATEGY)
 
-  def veloxMaxSpillLevel: Int = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_LEVEL)
+  def veloxMaxSpillLevel: Int = getConf(COLUMNAR_VELOX_MAX_SPILL_LEVEL)
 
-  def veloxMaxSpillFileSize: Long = 
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_FILE_SIZE)
+  def veloxMaxSpillFileSize: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_FILE_SIZE)
 
-  def veloxSpillFileSystem: String = 
conf.getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM)
+  def veloxSpillFileSystem: String = getConf(COLUMNAR_VELOX_SPILL_FILE_SYSTEM)
 
-  def veloxMaxSpillRunRows: Long = 
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_RUN_ROWS)
+  def veloxMaxSpillRunRows: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_RUN_ROWS)
 
-  def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
+  def veloxMaxSpillBytes: Long = getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
 
   def veloxBloomFilterExpectedNumItems: Long =
-    conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
+    getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
 
-  def veloxBloomFilterNumBits: Long = 
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS)
+  def veloxBloomFilterNumBits: Long = 
getConf(COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS)
 
-  def veloxBloomFilterMaxNumBits: Long = 
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
+  def veloxBloomFilterMaxNumBits: Long = 
getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
 
-  def castFromVarcharAddTrimNode: Boolean = 
conf.getConf(CAST_FROM_VARCHAR_ADD_TRIM_NODE)
+  def castFromVarcharAddTrimNode: Boolean = 
getConf(CAST_FROM_VARCHAR_ADD_TRIM_NODE)
 
   case class ResizeRange(min: Int, max: Int) {
     assert(max >= min)
@@ -353,62 +358,61 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   }
 
   def veloxResizeBatchesShuffleInput: Boolean =
-    conf.getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
+    getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
 
   def veloxResizeBatchesShuffleInputRange: ResizeRange = {
-    val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+    val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE)
     val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
-    val minSize = conf
-      .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+    val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
       .getOrElse(defaultMinSize)
     ResizeRange(minSize, Int.MaxValue)
   }
 
   def chColumnarShuffleSpillThreshold: Long = {
-    val threshold = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+    val threshold = getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
     if (threshold == 0) {
-      (conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
+      (getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
     } else {
       threshold
     }
   }
 
-  def chColumnarMaxSortBufferSize: Long = 
conf.getConf(COLUMNAR_CH_MAX_SORT_BUFFER_SIZE)
+  def chColumnarMaxSortBufferSize: Long = 
getConf(COLUMNAR_CH_MAX_SORT_BUFFER_SIZE)
 
   def chColumnarForceMemorySortShuffle: Boolean =
-    conf.getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
+    getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
 
   def cartesianProductTransformerEnabled: Boolean =
-    conf.getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
+    getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
 
   def broadcastNestedLoopJoinTransformerTransformerEnabled: Boolean =
-    conf.getConf(BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED)
+    getConf(BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED)
 
-  def transformPlanLogLevel: String = conf.getConf(TRANSFORM_PLAN_LOG_LEVEL)
+  def transformPlanLogLevel: String = getConf(TRANSFORM_PLAN_LOG_LEVEL)
 
-  def substraitPlanLogLevel: String = conf.getConf(SUBSTRAIT_PLAN_LOG_LEVEL)
+  def substraitPlanLogLevel: String = getConf(SUBSTRAIT_PLAN_LOG_LEVEL)
 
-  def validationLogLevel: String = conf.getConf(VALIDATION_LOG_LEVEL)
+  def validationLogLevel: String = getConf(VALIDATION_LOG_LEVEL)
 
-  def softAffinityLogLevel: String = conf.getConf(SOFT_AFFINITY_LOG_LEVEL)
+  def softAffinityLogLevel: String = getConf(SOFT_AFFINITY_LOG_LEVEL)
 
   // A comma-separated list of classes for the extended columnar pre rules
-  def extendedColumnarTransformRules: String = 
conf.getConf(EXTENDED_COLUMNAR_TRANSFORM_RULES)
+  def extendedColumnarTransformRules: String = 
getConf(EXTENDED_COLUMNAR_TRANSFORM_RULES)
 
   // A comma-separated list of classes for the extended columnar post rules
-  def extendedColumnarPostRules: String = 
conf.getConf(EXTENDED_COLUMNAR_POST_RULES)
+  def extendedColumnarPostRules: String = getConf(EXTENDED_COLUMNAR_POST_RULES)
 
-  def extendedExpressionTransformer: String = 
conf.getConf(EXTENDED_EXPRESSION_TRAN_CONF)
+  def extendedExpressionTransformer: String = 
getConf(EXTENDED_EXPRESSION_TRAN_CONF)
 
   def expressionBlacklist: Set[String] = {
-    val blacklist = conf.getConf(EXPRESSION_BLACK_LIST)
+    val blacklist = getConf(EXPRESSION_BLACK_LIST)
     val blacklistSet: Set[String] = if (blacklist.isDefined) {
       blacklist.get.toLowerCase(Locale.ROOT).trim.split(",").toSet
     } else {
       Set.empty
     }
 
-    if (conf.getConf(FALLBACK_REGEXP_EXPRESSIONS)) {
+    if (getConf(FALLBACK_REGEXP_EXPRESSIONS)) {
       val regexpList = 
"rlike,regexp_replace,regexp_extract,regexp_extract_all,split"
       regexpList.trim.split(",").toSet ++ blacklistSet
     } else {
@@ -417,87 +421,93 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   }
 
   def printStackOnValidationFailure: Boolean =
-    conf.getConf(VALIDATION_PRINT_FAILURE_STACK_)
+    getConf(VALIDATION_PRINT_FAILURE_STACK_)
 
-  def enableFallbackReport: Boolean = conf.getConf(FALLBACK_REPORTER_ENABLED)
+  def enableFallbackReport: Boolean = getConf(FALLBACK_REPORTER_ENABLED)
 
   def enableVeloxUserExceptionStacktrace: Boolean =
-    conf.getConf(COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE)
+    getConf(COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE)
 
   def memoryUseHugePages: Boolean =
-    conf.getConf(COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES)
-
-  def debug: Boolean = conf.getConf(DEBUG_ENABLED)
-  def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
-  def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
-  def benchmarkStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
-  def benchmarkPartitionId: String = conf.getConf(BENCHMARK_TASK_PARTITIONID)
-  def benchmarkTaskId: String = conf.getConf(BENCHMARK_TASK_TASK_ID)
-  def benchmarkSaveDir: String = conf.getConf(BENCHMARK_SAVE_DIR)
-  def textInputMaxBlockSize: Long = conf.getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
-  def textIputEmptyAsDefault: Boolean = 
conf.getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
+    getConf(COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES)
+
+  def debug: Boolean = getConf(DEBUG_ENABLED)
+  def debugKeepJniWorkspace: Boolean = getConf(DEBUG_KEEP_JNI_WORKSPACE)
+  def collectUtStats: Boolean = getConf(UT_STATISTIC)
+  def benchmarkStageId: Int = getConf(BENCHMARK_TASK_STAGEID)
+  def benchmarkPartitionId: String = getConf(BENCHMARK_TASK_PARTITIONID)
+  def benchmarkTaskId: String = getConf(BENCHMARK_TASK_TASK_ID)
+  def benchmarkSaveDir: String = getConf(BENCHMARK_SAVE_DIR)
+  def textInputMaxBlockSize: Long = getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
+  def textIputEmptyAsDefault: Boolean = getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
   def enableParquetRowGroupMaxMinIndex: Boolean =
-    conf.getConf(ENABLE_PARQUET_ROW_GROUP_MAX_MIN_INDEX)
+    getConf(ENABLE_PARQUET_ROW_GROUP_MAX_MIN_INDEX)
 
   def enableVeloxFlushablePartialAggregation: Boolean =
-    conf.getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
+    getConf(VELOX_FLUSHABLE_PARTIAL_AGGREGATION_ENABLED)
   def maxFlushableAggregationMemoryRatio: Double =
-    conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
+    getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
   def maxExtendedFlushableAggregationMemoryRatio: Double =
-    conf.getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
+    getConf(MAX_PARTIAL_AGGREGATION_MEMORY_RATIO)
   def abandonFlushableAggregationMinPct: Int =
-    conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
+    getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
   def abandonFlushableAggregationMinRows: Int =
-    conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
+    getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
 
   // Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` 
instead
-  def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED)
+  def enableNativeWriter: Option[Boolean] = getConf(NATIVE_WRITER_ENABLED)
 
-  def enableNativeArrowReader: Boolean = 
conf.getConf(NATIVE_ARROW_READER_ENABLED)
+  def enableNativeArrowReader: Boolean = getConf(NATIVE_ARROW_READER_ENABLED)
 
   def directorySizeGuess: Long =
-    conf.getConf(DIRECTORY_SIZE_GUESS)
+    getConf(DIRECTORY_SIZE_GUESS)
   def filePreloadThreshold: Long =
-    conf.getConf(FILE_PRELOAD_THRESHOLD)
+    getConf(FILE_PRELOAD_THRESHOLD)
   def prefetchRowGroups: Int =
-    conf.getConf(PREFETCH_ROW_GROUPS)
+    getConf(PREFETCH_ROW_GROUPS)
   def loadQuantum: Long =
-    conf.getConf(LOAD_QUANTUM)
+    getConf(LOAD_QUANTUM)
   def maxCoalescedDistance: String =
-    conf.getConf(MAX_COALESCED_DISTANCE_BYTES)
+    getConf(MAX_COALESCED_DISTANCE_BYTES)
   def maxCoalescedBytes: Long =
-    conf.getConf(MAX_COALESCED_BYTES)
+    getConf(MAX_COALESCED_BYTES)
   def cachePrefetchMinPct: Int =
-    conf.getConf(CACHE_PREFETCH_MINPCT)
+    getConf(CACHE_PREFETCH_MINPCT)
 
-  def enableColumnarProjectCollapse: Boolean = 
conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
+  def enableColumnarProjectCollapse: Boolean = 
getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
 
-  def enableColumnarPartialProject: Boolean = 
conf.getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
+  def enableColumnarPartialProject: Boolean = 
getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
 
-  def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)
+  def awsSdkLogLevel: String = getConf(AWS_SDK_LOG_LEVEL)
 
-  def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)
+  def awsS3RetryMode: String = getConf(AWS_S3_RETRY_MODE)
 
-  def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)
+  def awsConnectionTimeout: String = getConf(AWS_S3_CONNECT_TIMEOUT)
 
-  def enableCastAvgAggregateFunction: Boolean = 
conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
+  def enableCastAvgAggregateFunction: Boolean = 
getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
 
   def dynamicOffHeapSizingEnabled: Boolean =
-    conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+    getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
 
-  def enableHiveFileFormatWriter: Boolean = 
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
+  def enableHiveFileFormatWriter: Boolean = 
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
 
-  def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
+  def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
 
-  def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
+  def enableHdfsViewfs: Boolean = getConf(HDFS_VIEWFS_ENABLED)
 
   def enableBroadcastBuildRelationInOffheap: Boolean =
-    conf.getConf(VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP)
+    getConf(VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP)
 }
 
 object GlutenConfig {
   import SQLConf._
 
+  def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)
+
+  def buildStaticConf(key: String): ConfigBuilder = {
+    ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
+  }
+
   val GLUTEN_ENABLED_BY_DEFAULT = true
   val GLUTEN_ENABLED_KEY = "spark.gluten.enabled"
   val GLUTEN_LIB_NAME = "spark.gluten.sql.columnar.libname"
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
similarity index 50%
copy from 
shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
copy to 
shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
index 1a45572acd..8293a4a09b 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/ConfigProvider.scala
@@ -16,32 +16,15 @@
  */
 package org.apache.spark.sql.internal
 
-import org.apache.spark.internal.config.ConfigReader
-
-import scala.collection.JavaConverters._
+/** A source of configuration values. */
+trait ConfigProvider {
+  def get(key: String): Option[String]
+}
 
-object GlutenConfigUtil {
-  private def getConfString(reader: ConfigReader, key: String, value: String): 
String = {
-    Option(SQLConf.getConfigEntry(key))
-      .map {
-        _.readFrom(reader) match {
-          case o: Option[_] => o.map(_.toString).getOrElse(value)
-          case null => value
-          case v => v.toString
-        }
-      }
-      .getOrElse(value)
-  }
+class SQLConfProvider(conf: SQLConf) extends ConfigProvider {
+  override def get(key: String): Option[String] = 
Option(conf.settings.get(key))
+}
 
-  def parseConfig(conf: Map[String, String]): Map[String, String] = {
-    val reader = new 
ConfigReader(conf.filter(_._1.startsWith("spark.gluten.")).asJava)
-    conf.map {
-      case (k, v) =>
-        if (k.startsWith("spark.gluten.")) {
-          (k, getConfString(reader, k, v))
-        } else {
-          (k, v)
-        }
-    }.toMap
-  }
+class MapProvider(conf: Map[String, String]) extends ConfigProvider {
+  override def get(key: String): Option[String] = conf.get(key)
 }
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
index 1a45572acd..87b90938a0 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala
@@ -16,15 +16,13 @@
  */
 package org.apache.spark.sql.internal
 
-import org.apache.spark.internal.config.ConfigReader
-
-import scala.collection.JavaConverters._
+import org.apache.gluten.config._
 
 object GlutenConfigUtil {
-  private def getConfString(reader: ConfigReader, key: String, value: String): 
String = {
-    Option(SQLConf.getConfigEntry(key))
+  private def getConfString(configProvider: ConfigProvider, key: String, 
value: String): String = {
+    Option(ConfigEntry.findEntry(key))
       .map {
-        _.readFrom(reader) match {
+        _.readFrom(configProvider) match {
           case o: Option[_] => o.map(_.toString).getOrElse(value)
           case null => value
           case v => v.toString
@@ -34,11 +32,11 @@ object GlutenConfigUtil {
   }
 
   def parseConfig(conf: Map[String, String]): Map[String, String] = {
-    val reader = new 
ConfigReader(conf.filter(_._1.startsWith("spark.gluten.")).asJava)
+    val provider = new 
MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
     conf.map {
       case (k, v) =>
         if (k.startsWith("spark.gluten.")) {
-          (k, getConfString(reader, k, v))
+          (k, getConfString(provider, k, v))
         } else {
           (k, v)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to