This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c3c89c116 [GLUTEN-9881][CORE] Minimize module dependency set of 
module gluten-core (#9900)
2c3c89c116 is described below

commit 2c3c89c116068cbd885e2d38fc470f38fdeaf2d0
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 13 21:09:31 2025 +0100

    [GLUTEN-9881][CORE] Minimize module dependency set of module gluten-core 
(#9900)
---
 .../apache/gluten/component/CHDeltaComponent.scala |   7 +-
 .../backendsapi/clickhouse/CHListenerApi.scala     |   6 +-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  14 +-
 .../backendsapi/clickhouse/CHTransformerApi.scala  |   6 +-
 .../gluten/component/VeloxDeltaComponent.scala     |   7 +-
 .../gluten/component/VeloxHudiComponent.scala      |   7 +-
 .../backendsapi/velox/VeloxListenerApi.scala       |   6 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  23 ++-
 .../gluten/execution/MiscOperatorSuite.scala       |   6 +-
 .../spark/memory/GlobalOffHeapMemorySuite.scala    |   4 +-
 gluten-arrow/pom.xml                               |  14 +-
 gluten-core/pom.xml                                |  17 --
 .../DynamicOffHeapSizingMemoryTarget.java          |   4 +-
 .../gluten/memory/memtarget/MemoryTargets.java     |   6 +-
 .../memory/memtarget/ThrowOnOomMemoryTarget.java   |  25 ++-
 .../memtarget/spark/TreeMemoryConsumers.java       |   4 +-
 .../scala/org/apache/gluten/GlutenPlugin.scala     | 155 +++++---------
 .../org/apache/gluten/component/Component.scala    |   3 +
 .../apache/gluten/config/GlutenCoreConfig.scala    | 222 +++++++++++++++++++++
 .../gluten/extension/GlutenSessionExtensions.scala |   6 +-
 .../extension/columnar/ColumnarRuleApplier.scala   |   5 +-
 .../extension/columnar/ColumnarRuleExecutor.scala  |  36 +---
 .../columnar/enumerated/EnumeratedApplier.scala    |  10 +-
 .../enumerated/planner/plan/GlutenPlanModel.scala  |   6 +-
 .../columnar/heuristic/HeuristicApplier.scala      |  12 +-
 .../extension/columnar/transition/Convention.scala |   3 +-
 .../columnar/transition/ConventionFunc.scala       |   8 +-
 .../extension/columnar/transition/Transition.scala |   4 +-
 .../extension/columnar/transition/package.scala    |   2 +-
 .../gluten/extension/injector/GlutenInjector.scala |  22 +-
 .../apache/spark/memory/GlobalOffHeapMemory.scala  |   4 +-
 .../execution/adaptive/GlutenCostEvaluator.scala   |   6 +-
 .../apache/spark/storage/BlockManagerUtil.scala    |   2 +-
 .../org/apache/spark/task/TaskResources.scala      |  11 +-
 .../org/apache/spark/util/SparkPlanUtil.scala      |  47 +++++
 .../org/apache/spark/util/SparkTaskUtil.scala      | 120 +++++++++++
 .../org/apache/spark/util/SparkVersionUtil.scala   |  15 +-
 .../memtarget/spark/TreeMemoryConsumerTest.java    |   4 +-
 .../org/apache/gluten/task/TaskResourceSuite.scala |   3 +-
 .../gluten/execution/OffloadIcebergScan.scala      |   5 +-
 .../apache/gluten/execution/OffloadKafkaScan.scala |   5 +-
 gluten-substrait/pom.xml                           |  17 ++
 .../gluten/backendsapi/SubstraitBackend.scala      | 110 +++++++++-
 .../org/apache/gluten/config/GlutenConfig.scala    | 207 ++-----------------
 .../org/apache/gluten/config/ReservedKeys.scala    |   0
 .../gluten/extension/columnar/LoggedRule.scala     |  48 +++++
 .../gluten/softaffinity/SoftAffinityManager.scala  |   0
 .../ConsistentHashSoftAffinityStrategy.scala       |   0
 .../strategy/SoftAffinityAllocationTrait.scala     |   0
 .../apache/spark/softaffinity/SoftAffinity.scala   |   0
 .../spark/softaffinity/SoftAffinityListener.scala  |   0
 .../spark/sql/execution/ui/GlutenUIUtils.scala     |   0
 .../gluten/config/SparkConfigUtilSuite.scala       |   0
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  19 +-
 .../apache/spark/util/SparkShimVersionUtil.scala   |  19 +-
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |   8 +-
 .../scala/org/apache/spark/TaskContextUtils.scala  |  47 -----
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  11 +-
 .../scala/org/apache/spark/TaskContextUtils.scala  |  48 -----
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  13 +-
 .../scala/org/apache/spark/TaskContextUtils.scala  |  49 -----
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  13 +-
 .../scala/org/apache/spark/TaskContextUtils.scala  |  49 -----
 67 files changed, 814 insertions(+), 728 deletions(-)

diff --git 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index 85caf73546..a39c56003a 100644
--- 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++ 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.component
 
 import org.apache.gluten.backendsapi.clickhouse.CHBackend
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{OffloadDeltaFilter, OffloadDeltaNode, 
OffloadDeltaProject}
 import org.apache.gluten.extension.DeltaPostTransformRules
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
@@ -44,7 +45,9 @@ class CHDeltaComponent extends Component {
     legacy.injectTransform {
       c =>
         val offload = Seq(OffloadDeltaNode(), OffloadDeltaProject(), 
OffloadDeltaFilter())
-        HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, 
offload), offload)
+        HeuristicTransform.Simple(
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+          offload)
     }
     val offloads: Seq[RasOffload] = Seq(
       RasOffload.from[ProjectExec](OffloadDeltaProject()),
@@ -53,7 +56,7 @@ class CHDeltaComponent extends Component {
     offloads.foreach(
       offload =>
         ras.injectRasRule(
-          c => RasOffload.Rule(offload, Validators.newValidator(c.glutenConf), 
Nil)))
+          c => RasOffload.Rule(offload, Validators.newValidator(new 
GlutenConfig(c.sqlConf)), Nil)))
     DeltaPostTransformRules.rules.foreach {
       r =>
         legacy.injectPostTransform(_ => r)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5cddfb6e8a..ba74884d2c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig}
 import org.apache.gluten.execution.CHBroadcastBuildSideCache
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.expression.UDFMappings
@@ -102,8 +102,8 @@ class CHListenerApi extends ListenerApi with Logging {
 
     // add memory limit for external sort
     if (conf.get(RuntimeSettings.MAX_BYTES_BEFORE_EXTERNAL_SORT) <= 0) {
-      if (conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, defaultValue = 
false)) {
-        val memSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 
0)
+      if (conf.getBoolean(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, 
defaultValue = false)) {
+        val memSize = 
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, 0)
         if (memSize > 0L) {
           val cores = conf.getInt("spark.executor.cores", 1).toLong
           val sortMemLimit = ((memSize / cores) * 0.8).toLong
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index f9867fea34..f0c81a62d9 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -104,7 +104,8 @@ object CHRuleApi {
     injector.injectTransform(
       c =>
         intercept(
-          HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), 
rewrites, offloads)))
+          HeuristicTransform
+            .WithRewrites(validatorBuilder(new GlutenConfig(c.sqlConf)), 
rewrites, offloads)))
 
     // Legacy: Post-transform rules.
     injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
@@ -120,8 +121,8 @@ object CHRuleApi {
     injector.injectPostTransform(
       c =>
         intercept(
-          
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
-            c.session)))
+          SparkPlanRules.extendedColumnarRule(
+            new 
GlutenConfig(c.sqlConf).extendedColumnarTransformRules)(c.session)))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatchType))
@@ -140,16 +141,17 @@ object CHRuleApi {
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPost(c => intercept(each(c.session))))
-    injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+    injector.injectPost(c => ColumnarCollapseTransformStages(new 
GlutenConfig(c.sqlConf)))
     injector.injectPost(
       c =>
         intercept(
-          
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))
+          SparkPlanRules.extendedColumnarRule(
+            new GlutenConfig(c.sqlConf).extendedColumnarPostRules)(c.session)))
     injector.injectPost(c => GlutenNoopWriterRule.apply(c.session))
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
-    injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
+    injector.injectFinal(c => GlutenFallbackReporter(new 
GlutenConfig(c.sqlConf), c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
   }
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index e0d4b9f83e..9da8eed202 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.TransformerApi
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.execution.{CHHashAggregateExecTransformer, 
WriteFilesExecTransformer}
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.substrait.SubstraitContext
@@ -98,9 +98,9 @@ class CHTransformerApi extends TransformerApi with Logging {
       backendPrefix: String): Unit = {
 
     require(backendPrefix == CHConfig.CONF_PREFIX)
-    if (nativeConfMap.getOrDefault(GlutenConfig.SPARK_OFFHEAP_ENABLED, 
"false").toBoolean) {
+    if (nativeConfMap.getOrDefault(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, 
"false").toBoolean) {
       val offHeapSize: Long =
-        SparkConfigUtil.get(nativeConfMap, 
GlutenConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+        SparkConfigUtil.get(nativeConfMap, 
GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
       if (offHeapSize > 0) {
         // Only set default max_bytes_before_external_group_by for CH when it 
is not set explicitly.
         val groupBySpillKey = 
CHConfig.runtimeSettings("max_bytes_before_external_group_by")
diff --git 
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index e09edb371a..d8648c987d 100644
--- 
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++ 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.component
 
 import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{OffloadDeltaFilter, OffloadDeltaProject, 
OffloadDeltaScan}
 import org.apache.gluten.extension.DeltaPostTransformRules
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
@@ -38,7 +39,9 @@ class VeloxDeltaComponent extends Component {
       c =>
         val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), 
OffloadDeltaFilter())
           .map(_.toStrcitRule())
-        HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, 
offload), offload)
+        HeuristicTransform.Simple(
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+          offload)
     }
     val offloads: Seq[RasOffload] = Seq(
       RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
@@ -48,7 +51,7 @@ class VeloxDeltaComponent extends Component {
     offloads.foreach(
       offload =>
         ras.injectRasRule(
-          c => RasOffload.Rule(offload, Validators.newValidator(c.glutenConf), 
Nil)))
+          c => RasOffload.Rule(offload, Validators.newValidator(new 
GlutenConfig(c.sqlConf)), Nil)))
     DeltaPostTransformRules.rules.foreach {
       r =>
         legacy.injectPostTransform(_ => r)
diff --git 
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
 
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
index 146b7be7e8..6685066efa 100644
--- 
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
+++ 
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.component
 
 import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.OffloadHudiScan
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
@@ -36,13 +37,15 @@ class VeloxHudiComponent extends Component {
     legacy.injectTransform {
       c =>
         val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule())
-        HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, 
offload), offload)
+        HeuristicTransform.Simple(
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+          offload)
     }
     ras.injectRasRule {
       c =>
         RasOffload.Rule(
           RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
-          Validators.newValidator(c.glutenConf),
+          Validators.newValidator(new GlutenConfig(c.sqlConf)),
           Nil)
     }
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 44bfd0aab3..e8e793be5c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.ListenerApi
 import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
-import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
 import org.apache.gluten.config.VeloxConfig._
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.expression.UDFMappings
@@ -79,7 +79,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
     HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
 
     // Overhead memory limits.
-    val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
+    val offHeapSize = 
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY)
     val desiredOverheadSize = (0.3 * 
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
     if (!SparkResourceUtil.isMemoryOverheadSet(conf)) {
       // If memory overhead is not set by user, automatically set it according 
to off-heap settings.
@@ -98,7 +98,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
           s" the recommended size 
${ByteUnit.BYTE.toMiB(desiredOverheadSize)}MiB." +
           s" This may cause OOM.")
     }
-    conf.set(GlutenConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES, overheadSize)
+    conf.set(GlutenCoreConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES, overheadSize)
 
     // Sql table cache serializer.
     if (conf.get(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index f53e28bd93..48e5ef1a43 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -86,7 +86,11 @@ object VeloxRuleApi {
         PullOutPostProject,
         ProjectColumnPruning)
     injector.injectTransform(
-      c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), 
rewrites, offloads))
+      c =>
+        HeuristicTransform.WithRewrites(
+          validatorBuilder(new GlutenConfig(c.sqlConf)),
+          rewrites,
+          offloads))
 
     // Legacy: Post-transform rules.
     injector.injectPostTransform(_ => 
AppendBatchResizeForShuffleInputAndOutput())
@@ -113,15 +117,16 @@ object VeloxRuleApi {
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPost(c => each(c.session)))
-    injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+    injector.injectPost(c => ColumnarCollapseTransformStages(new 
GlutenConfig(c.sqlConf)))
     injector.injectPost(c => GlutenNoopWriterRule(c.session))
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectFinal(
       c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatchType)))
-    injector.injectFinal(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
-    injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
+    injector.injectFinal(
+      c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), 
c.session))
+    injector.injectFinal(c => GlutenFallbackReporter(new 
GlutenConfig(c.sqlConf), c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
   }
 
@@ -176,7 +181,7 @@ object VeloxRuleApi {
     offloads.foreach(
       offload =>
         injector.injectRasRule(
-          c => RasOffload.Rule(offload, validatorBuilder(c.glutenConf), 
rewrites)))
+          c => RasOffload.Rule(offload, validatorBuilder(new 
GlutenConfig(c.sqlConf)), rewrites)))
 
     // Gluten RAS: Post rules.
     injector.injectPostTransform(_ => 
AppendBatchResizeForShuffleInputAndOutput())
@@ -199,13 +204,15 @@ object VeloxRuleApi {
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPostTransform(c => each(c.session)))
-    injector.injectPostTransform(c => 
ColumnarCollapseTransformStages(c.glutenConf))
+    injector.injectPostTransform(c => ColumnarCollapseTransformStages(new 
GlutenConfig(c.sqlConf)))
     injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectPostTransform(
       c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatchType)))
-    injector.injectPostTransform(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
-    injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, 
c.session))
+    injector.injectPostTransform(
+      c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), 
c.session))
+    injector.injectPostTransform(
+      c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 4bd6012471..33b7f140fb 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
 import org.apache.gluten.expression.VeloxDummyExpression
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -2012,11 +2012,11 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
   }
 
   test("test 'spark.gluten.enabled'") {
-    withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") {
+    withSQLConf(GlutenCoreConfig.GLUTEN_ENABLED.key -> "true") {
       runQueryAndCompare("select * from lineitem limit 1") {
         checkGlutenOperatorMatch[FileSourceScanExecTransformer]
       }
-      withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
+      withSQLConf(GlutenCoreConfig.GLUTEN_ENABLED.key -> "false") {
         runQueryAndCompare("select * from lineitem limit 1") {
           checkSparkOperatorMatch[FileSourceScanExec]
         }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 288acabe2b..d4d36c6560 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.memory;
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
@@ -36,7 +36,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
     val conf = SQLConf.get
     conf.setConfString("spark.memory.offHeap.enabled", "true")
     conf.setConfString("spark.memory.offHeap.size", "400")
-    
conf.setConfString(GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
 "100")
+    
conf.setConfString(GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
 "100")
   }
 
   test("Sanity") {
diff --git a/gluten-arrow/pom.xml b/gluten-arrow/pom.xml
index cb88e4fbc8..c6e036ca5c 100644
--- a/gluten-arrow/pom.xml
+++ b/gluten-arrow/pom.xml
@@ -47,7 +47,19 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.gluten</groupId>
-      <artifactId>gluten-core</artifactId>
+      <artifactId>gluten-substrait</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>${sparkshim.artifactId}</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>spark-sql-columnar-shims-common</artifactId>
       <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 172ed5930e..ba0e8fb99b 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -19,23 +19,6 @@
       <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.gluten</groupId>
-      <artifactId>${sparkshim.artifactId}</artifactId>
-      <version>${project.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.gluten</groupId>
-      <artifactId>spark-sql-columnar-shims-common</artifactId>
-      <version>${project.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.gluten</groupId>
-      <artifactId>gluten-ui</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <!-- Prevent our dummy JAR from being included in Spark distributions or 
uploaded to YARN -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index 6b981903f4..c9e9aed7b9 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.memory.memtarget;
 
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
 import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
 import org.apache.gluten.proto.MemoryUsageStats;
 
@@ -40,7 +40,7 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget, KnownName
 
   static {
     final long maxOnHeapSize = Runtime.getRuntime().maxMemory();
-    final double fractionForSizing = 
GlutenConfig.get().dynamicOffHeapSizingMemoryFraction();
+    final double fractionForSizing = 
GlutenCoreConfig.get().dynamicOffHeapSizingMemoryFraction();
     // Since when dynamic off-heap sizing is enabled, we commingle on-heap
     // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
     // size it with a memory fraction, which can be aggressively set, but the 
default
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index ff2a4da03e..135a82fa58 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.memory.memtarget;
 
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
 import org.apache.gluten.memory.MemoryUsageStatsBuilder;
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
 
@@ -50,7 +50,7 @@ public final class MemoryTargets {
 
   @Experimental
   public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget 
memoryTarget) {
-    if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) {
+    if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
       return new DynamicOffHeapSizingMemoryTarget();
     }
 
@@ -63,7 +63,7 @@ public final class MemoryTargets {
       Spiller spiller,
       Map<String, MemoryUsageStatsBuilder> virtualChildren) {
     final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.factory(tmm);
-    if (GlutenConfig.get().memoryIsolation()) {
+    if (GlutenCoreConfig.get().memoryIsolation()) {
       return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller, 
virtualChildren);
     }
     final TreeMemoryTarget root = factory.legacyRoot();
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index af5a3ff0ce..b03e23a898 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.memory.memtarget;
 
-import org.apache.gluten.config.GlutenConfig$;
+import org.apache.gluten.config.GlutenCoreConfig$;
 
 import org.apache.spark.memory.SparkMemoryUtil;
 import org.apache.spark.sql.internal.SQLConf;
@@ -62,44 +62,47 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget 
{
         .append(
             String.format(
                 "\t%s=%s",
-                GlutenConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
+                
GlutenCoreConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
                 reformatBytes(
                     SQLConf.get()
                         .getConfString(
-                            
GlutenConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
+                            
GlutenCoreConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
         .append(System.lineSeparator())
         .append(
             String.format(
                 "\t%s=%s",
-                
GlutenConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+                
GlutenCoreConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
                 reformatBytes(
                     SQLConf.get()
                         .getConfString(
-                            
GlutenConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key()))))
+                            GlutenCoreConfig$.MODULE$
+                                .COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES()
+                                .key()))))
         .append(System.lineSeparator())
         .append(
             String.format(
                 "\t%s=%s",
-                
GlutenConfig$.MODULE$.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+                
GlutenCoreConfig$.MODULE$.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
                 reformatBytes(
                     SQLConf.get()
                         .getConfString(
-                            GlutenConfig$.MODULE$
+                            GlutenCoreConfig$.MODULE$
                                 
.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES()
                                 .key()))))
         .append(System.lineSeparator())
         .append(
             String.format(
                 "\t%s=%s",
-                GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
-                
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
+                GlutenCoreConfig$.MODULE$.SPARK_OFFHEAP_ENABLED_KEY(),
+                
SQLConf.get().getConfString(GlutenCoreConfig$.MODULE$.SPARK_OFFHEAP_ENABLED_KEY())))
         .append(System.lineSeparator())
         .append(
             String.format(
                 "\t%s=%s",
-                GlutenConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
+                
GlutenCoreConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
                 SQLConf.get()
-                    
.getConfString(GlutenConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key())))
+                    .getConfString(
+                        
GlutenCoreConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key())))
         .append(System.lineSeparator());
     // Dump all consumer usages to exception body
     errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target));
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index f158ded4c5..a1f2106e3c 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.memory.memtarget.spark;
 
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
 import org.apache.gluten.memory.memtarget.Spillers;
 import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
 
@@ -77,7 +77,7 @@ public final class TreeMemoryConsumers {
      * <p>See <a 
href="https://github.com/oap-project/gluten/issues/3030";>GLUTEN-3030</a>
      */
     public TreeMemoryTarget isolatedRoot() {
-      return 
ofCapacity(GlutenConfig.get().conservativeTaskOffHeapMemorySize());
+      return 
ofCapacity(GlutenCoreConfig.get().conservativeTaskOffHeapMemorySize());
     }
   }
 }
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 0208d6d0db..19bddf0fd7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -16,11 +16,8 @@
  */
 package org.apache.gluten
 
-import org.apache.gluten.GlutenBuildInfo._
 import org.apache.gluten.component.Component
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.config.GlutenConfig._
-import org.apache.gluten.events.GlutenBuildInfoEvent
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.initializer.CodedInputStreamClassInitializer
@@ -30,11 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext, 
TaskFailedReason}
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.softaffinity.SoftAffinityListener
-import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
-import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, 
GlutenUIUtils}
 import org.apache.spark.sql.internal.SparkConfigUtil._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
 import org.apache.spark.task.TaskResources
 import org.apache.spark.util.SparkResourceUtil
@@ -55,92 +48,44 @@ class GlutenPlugin extends SparkPlugin {
 }
 
 private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
-  private var _sc: Option[SparkContext] = None
+  import GlutenDriverPlugin._
 
   override def init(sc: SparkContext, pluginContext: PluginContext): 
util.Map[String, String] = {
-    _sc = Some(sc)
     val conf = pluginContext.conf()
-
-    // Register Gluten listeners
-    GlutenSQLAppStatusListener.register(sc)
-    if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
-      SoftAffinityListener.register(sc)
+    // Spark SQL extensions
+    val extensionSeq = conf.get(SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
+    if 
(!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
 {
+      conf.set(
+        SPARK_SESSION_EXTENSIONS,
+        extensionSeq :+ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)
     }
 
-    postBuildInfoEvent(sc)
-
     setPredefinedConfigs(conf)
 
-    // Initialize Backend.
-    Component.sorted().foreach(_.onDriverStart(sc, pluginContext))
-
+    val components = Component.sorted()
+    printComponentInfo(components)
+    components.foreach(_.onDriverStart(sc, pluginContext))
     Collections.emptyMap()
   }
 
   override def registerMetrics(appId: String, pluginContext: PluginContext): 
Unit = {
-    _sc.foreach {
-      sc =>
-        if (GlutenUIUtils.uiEnabled(sc)) {
-          GlutenUIUtils.attachUI(sc)
-          logInfo("Gluten SQL Tab has been attached.")
-        }
-    }
+    Component.sorted().foreach(_.registerMetrics(appId, pluginContext))
   }
 
   override def shutdown(): Unit = {
     Component.sorted().reverse.foreach(_.onDriverShutdown())
   }
+}
 
-  private def postBuildInfoEvent(sc: SparkContext): Unit = {
-    // export gluten version to property to spark
-    System.setProperty("gluten.version", VERSION)
-
-    val glutenBuildInfo = new mutable.LinkedHashMap[String, String]()
-
-    val components = Component.sorted()
-    glutenBuildInfo.put("Components", 
components.map(_.buildInfo().name).mkString(", "))
-    components.foreach {
-      comp =>
-        val buildInfo = comp.buildInfo()
-        glutenBuildInfo.put(s"Component ${buildInfo.name} Branch", 
buildInfo.branch)
-        glutenBuildInfo.put(s"Component ${buildInfo.name} Revision", 
buildInfo.revision)
-        glutenBuildInfo.put(s"Component ${buildInfo.name} Revision Time", 
buildInfo.revisionTime)
-    }
-
-    glutenBuildInfo.put("Gluten Version", VERSION)
-    glutenBuildInfo.put("GCC Version", GCC_VERSION)
-    glutenBuildInfo.put("Java Version", JAVA_COMPILE_VERSION)
-    glutenBuildInfo.put("Scala Version", SCALA_COMPILE_VERSION)
-    glutenBuildInfo.put("Spark Version", SPARK_COMPILE_VERSION)
-    glutenBuildInfo.put("Hadoop Version", HADOOP_COMPILE_VERSION)
-    glutenBuildInfo.put("Gluten Branch", BRANCH)
-    glutenBuildInfo.put("Gluten Revision", REVISION)
-    glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME)
-    glutenBuildInfo.put("Gluten Build Time", BUILD_DATE)
-    glutenBuildInfo.put("Gluten Repo URL", REPO_URL)
-
-    val loggingInfo = glutenBuildInfo
-      .map { case (name, value) => s"$name: $value" }
-      .mkString(
-        "Gluten build 
info:\n==============================================================\n",
-        "\n",
-        "\n=============================================================="
-      )
-    logInfo(loggingInfo)
-    if (GlutenUIUtils.uiEnabled(sc)) {
-      val event = GlutenBuildInfoEvent(glutenBuildInfo.toMap)
-      GlutenUIUtils.postEvent(sc, event)
-    }
-  }
-
+private object GlutenDriverPlugin extends Logging {
   private def checkOffHeapSettings(conf: SparkConf): Unit = {
-    if (conf.get(DYNAMIC_OFFHEAP_SIZING_ENABLED)) {
+    if (conf.get(GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED)) {
       // When dynamic off-heap sizing is enabled, off-heap mode is not 
strictly required to be
       // enabled. Skip the check.
       return
     }
 
-    if (conf.get(COLUMNAR_MEMORY_UNTRACKED)) {
+    if (conf.get(GlutenCoreConfig.COLUMNAR_MEMORY_UNTRACKED)) {
       // When untracked memory mode is enabled, off-heap mode is not strictly 
required to be
       // enabled. Skip the check.
       return
@@ -148,66 +93,60 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
     val minOffHeapSize = "1MB"
     if (
-      !conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
-      conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < 
JavaUtils.byteStringAsBytes(
+      !conf.getBoolean(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, 
defaultValue = false) ||
+      conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < 
JavaUtils.byteStringAsBytes(
         minOffHeapSize)
     ) {
       throw new GlutenException(
-        s"Must set '$SPARK_OFFHEAP_ENABLED' to true " +
-          s"and set '$SPARK_OFFHEAP_SIZE_KEY' to be greater than 
$minOffHeapSize")
+        s"Must set '${GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY}' to true " +
+          s"and set '${GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater 
" +
+          s"than $minOffHeapSize")
     }
   }
 
   private def setPredefinedConfigs(conf: SparkConf): Unit = {
-    // Spark SQL extensions
-    val extensionSeq = conf.get(SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
-    if 
(!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
 {
-      conf.set(
-        SPARK_SESSION_EXTENSIONS,
-        extensionSeq :+ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)
-    }
-
-    // adaptive custom cost evaluator class
-    val enableGlutenCostEvaluator = 
conf.get(GlutenConfig.COST_EVALUATOR_ENABLED)
-    if (enableGlutenCostEvaluator) {
-      conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS, 
classOf[GlutenCostEvaluator].getName)
-    }
-
     // check memory off-heap enabled and size.
     checkOffHeapSettings(conf)
 
     // Get the off-heap size set by user.
-    val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
+    val offHeapSize = 
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY)
 
     // Set off-heap size in bytes.
-    conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES, offHeapSize)
+    conf.set(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES, offHeapSize)
 
     // Set off-heap size in bytes per task.
     val taskSlots = SparkResourceUtil.getTaskSlots(conf)
-    conf.set(NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)
+    conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)
     val offHeapPerTask = offHeapSize / taskSlots
-    conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES, offHeapPerTask)
+    conf.set(GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES, 
offHeapPerTask)
 
     // Pessimistic off-heap sizes, with the assumption that all non-borrowable 
storage memory
     // determined by spark.memory.storageFraction was used.
     val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
     val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / 
taskSlots
-    conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES, 
conservativeOffHeapPerTask)
-
-    // Disable vanilla columnar readers, to prevent columnar-to-columnar 
conversions.
-    // FIXME: Do we still need this trick since
-    //  https://github.com/apache/incubator-gluten/pull/1931 was merged?
-    if (!conf.get(VANILLA_VECTORIZED_READERS_ENABLED)) {
-      // FIXME Hongze 22/12/06
-      //  BatchScan.scala in shim was not always loaded by class loader.
-      //  The file should be removed and the "ClassCastException" issue caused 
by
-      //  spark.sql.<format>.enableVectorizedReader=true should be fixed in 
another way.
-      //  Before the issue is fixed we force the use of vanilla row reader by 
using
-      //  the following statement.
-      conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, false)
-      conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED, false)
-      conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED, false)
+    conf.set(
+      GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES,
+      conservativeOffHeapPerTask)
+  }
+
+  private def printComponentInfo(components: Seq[Component]): Unit = {
+    val componentInfo = mutable.LinkedHashMap[String, String]()
+    componentInfo.put("Components", 
components.map(_.buildInfo().name).mkString(", "))
+    components.foreach {
+      comp =>
+        val buildInfo = comp.buildInfo()
+        componentInfo.put(s"Component ${buildInfo.name} Branch", 
buildInfo.branch)
+        componentInfo.put(s"Component ${buildInfo.name} Revision", 
buildInfo.revision)
+        componentInfo.put(s"Component ${buildInfo.name} Revision Time", 
buildInfo.revisionTime)
     }
+    val loggingInfo = componentInfo
+      .map { case (name, value) => s"$name: $value" }
+      .mkString(
+        "Gluten 
components:\n==============================================================\n",
+        "\n",
+        "\n=============================================================="
+      )
+    logInfo(loggingInfo)
   }
 }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala 
b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
index 642ada4fcd..fdd76870f7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
@@ -61,6 +61,9 @@ trait Component {
   def onExecutorStart(pc: PluginContext): Unit = {}
   def onExecutorShutdown(): Unit = {}
 
+  /** Metrics register, only called on Driver. */
+  def registerMetrics(appId: String, pluginContext: PluginContext): Unit = {}
+
   /**
    * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
    * determine the convention (its row-based processing / columnar-batch 
processing support) of a
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala 
b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala
new file mode 100644
index 0000000000..f3912007cd
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
+
+class GlutenCoreConfig(conf: SQLConf) extends Logging {
+  import GlutenCoreConfig._
+
+  private lazy val configProvider = new SQLConfProvider(conf)
+
+  def getConf[T](entry: ConfigEntry[T]): T = {
+    require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
+    entry.readFrom(configProvider)
+  }
+
+  def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
+
+  def enableRas: Boolean = getConf(RAS_ENABLED)
+
+  def rasCostModel: String = getConf(RAS_COST_MODEL)
+
+  def memoryUntracked: Boolean = getConf(COLUMNAR_MEMORY_UNTRACKED)
+
+  def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+
+  def taskOffHeapMemorySize: Long = 
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
+
+  def conservativeTaskOffHeapMemorySize: Long =
+    getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
+
+  def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
+
+  def memoryOverAcquiredRatio: Double = 
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
+
+  def memoryReservationBlockSize: Long = 
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
+
+  def dynamicOffHeapSizingEnabled: Boolean =
+    getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+
+  def dynamicOffHeapSizingMemoryFraction: Double =
+    getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
+}
+
+object GlutenCoreConfig {
+  def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)
+
+  def buildStaticConf(key: String): ConfigBuilder = {
+    ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
+  }
+
+  def get: GlutenCoreConfig = {
+    new GlutenCoreConfig(SQLConf.get)
+  }
+
+  val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
+  val SPARK_OFFHEAP_ENABLED_KEY = "spark.memory.offHeap.enabled"
+
+  val GLUTEN_ENABLED =
+    buildConf("spark.gluten.enabled")
+      .internal()
+      .doc("Whether to enable gluten. Default value is true. Just an 
experimental property." +
+        " Recommend to enable/disable Gluten through the setting for 
spark.plugins.")
+      .booleanConf
+      .createWithDefault(true)
+
+  // Options used by RAS.
+  val RAS_ENABLED =
+    buildConf("spark.gluten.ras.enabled")
+      .doc(
+        "Enables RAS (relational algebra selector) during physical " +
+          "planning to generate more efficient query plan. Note, this feature 
doesn't bring " +
+          "performance profits by default. Try exploring option 
`spark.gluten.ras.costModel` " +
+          "for advanced usage.")
+      .booleanConf
+      .createWithDefault(false)
+
+  // FIXME: This option is no longer only used by RAS. Should change key to
+  //  `spark.gluten.costModel` or something similar.
+  val RAS_COST_MODEL =
+    buildConf("spark.gluten.ras.costModel")
+      .doc(
+        "The class name of user-defined cost model that will be used by 
Gluten's transition " +
+          "planner as well as by RAS. If not specified, a legacy built-in cost 
model will be " +
+          "used. The legacy cost model helps RAS planner exhaustively offload 
computations, and " +
+          "helps transition planner choose columnar-to-columnar transition 
over others.")
+      .stringConf
+      .createWithDefaultString("legacy")
+
+  val COLUMNAR_MEMORY_UNTRACKED =
+    buildStaticConf("spark.gluten.memory.untracked")
+      .internal()
+      .doc(
+        "When enabled, turn all native memory allocations in Gluten into 
untracked. Spark " +
+          "will be unaware of the allocations so will not trigger 
spill-to-disk operations " +
+          "or Spark OOMs. Should only be used for testing or other 
non-production use cases.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val COLUMNAR_MEMORY_ISOLATION =
+    buildConf("spark.gluten.memory.isolation")
+      .internal()
+      .doc("Enable isolated memory mode. If true, Gluten controls the maximum 
off-heap memory " +
+        "can be used by each task to X, X = executor memory / max task slots. 
It's recommended " +
+        "to set true if Gluten serves concurrent queries within a single 
session, since not all " +
+        "memory Gluten allocated is guaranteed to be spillable. In the case, 
the feature should " +
+        "be enabled to avoid OOM.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
+    buildConf("spark.gluten.memoryOverhead.size.in.bytes")
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("0")
+
+  val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
+    buildConf("spark.gluten.memory.offHeap.size.in.bytes")
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("0")
+
+  val COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES =
+    buildConf("spark.gluten.memory.task.offHeap.size.in.bytes")
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("0")
+
+  val COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES =
+    buildConf("spark.gluten.memory.conservative.task.offHeap.size.in.bytes")
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("0")
+
+  val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
+    buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
+      .internal()
+      .doc("If larger than 0, Velox backend will try over-acquire this ratio 
of the total " +
+        "allocated memory as backup to avoid OOM.")
+      .doubleConf
+      .checkValue(d => d >= 0.0d, "Over-acquired ratio should be larger than 
or equals 0")
+      .createWithDefault(0.3d)
+
+  val COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE =
+    buildConf("spark.gluten.memory.reservationBlockSize")
+      .internal()
+      .doc("Block size of native reservation listener reserve memory from 
Spark.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("8MB")
+
+  val NUM_TASK_SLOTS_PER_EXECUTOR =
+    buildConf("spark.gluten.numTaskSlotsPerExecutor")
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .intConf
+      .createWithDefaultString("-1")
+
+  // Since https://github.com/apache/incubator-gluten/issues/5439.
+  val DYNAMIC_OFFHEAP_SIZING_ENABLED =
+    buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
+      .internal()
+      .doc(
+        "Experimental: When set to true, the offheap config 
(spark.memory.offHeap.size) will " +
+          "be ignored and instead we will consider onheap and offheap memory 
in combination, " +
+          "both counting towards the executor memory config 
(spark.executor.memory). We will " +
+          "make use of JVM APIs to determine how much onheap memory is use, 
alongside tracking " +
+          "offheap allocations made by Gluten. We will then proceed to 
enforcing a total memory " +
+          "quota, calculated by the sum of what memory is committed and in use 
in the Java " +
+          "heap. Since the calculation of the total quota happens as offheap 
allocation happens " +
+          "and not as JVM heap memory is allocated, it is possible that we can 
oversubscribe " +
+          "memory. Additionally, note that this change is experimental and may 
have performance " +
+          "implications.")
+      .booleanConf
+      .createWithDefault(false)
+
+  // Since https://github.com/apache/incubator-gluten/issues/5439.
+  val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
+    
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
+      .internal()
+      .doc(
+        "Experimental: Determines the memory fraction used to determine the 
total " +
+          "memory available for offheap and onheap allocations when the 
dynamic offheap " +
+          "sizing feature is enabled. The default is set to match 
spark.executor.memoryFraction.")
+      .doubleConf
+      .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must 
between [0, 1]")
+      .createWithDefault(0.6)
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 4e7b2a034c..6097b821e0 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.component.Component
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.extension.injector.Injector
 
 import org.apache.spark.internal.Logging
@@ -32,7 +32,9 @@ private[gluten] class GlutenSessionExtensions
     injector.control.disableOn {
       session =>
         val glutenEnabledGlobally = session.conf
-          .get(GlutenConfig.GLUTEN_ENABLED.key, 
GlutenConfig.GLUTEN_ENABLED.defaultValueString)
+          .get(
+            GlutenCoreConfig.GLUTEN_ENABLED.key,
+            GlutenCoreConfig.GLUTEN_ENABLED.defaultValueString)
           .toBoolean
         val disabled = !glutenEnabledGlobally
         logDebug(s"Gluten is disabled by variable: glutenEnabledGlobally: 
$glutenEnabledGlobally")
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 7257865507..70659f58ce 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension.caller.CallerInfo
 
 import org.apache.spark.sql.SparkSession
@@ -31,8 +30,6 @@ object ColumnarRuleApplier {
       val session: SparkSession,
       val caller: CallerInfo,
       val outputsColumnar: Boolean) {
-    val glutenConf: GlutenConfig = {
-      new GlutenConfig(session.sessionState.conf)
-    }
+    val sqlConf = session.sessionState.conf
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
index e10d9bd0b4..d62f27a297 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
@@ -16,49 +16,15 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.metrics.GlutenTimeMetric
-
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.SparkPlan
 
 class ColumnarRuleExecutor(phase: String, rules: Seq[Rule[SparkPlan]])
   extends RuleExecutor[SparkPlan] {
-  import ColumnarRuleExecutor._
-  private val batch: Batch =
-    Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new 
LoggedRule(r)): _*)
+  private val batch: Batch = Batch(s"Columnar (Phase [$phase])", Once, rules: 
_*)
 
   // TODO: Remove this exclusion then manage to pass Spark's idempotence check.
   override protected val excludedOnceBatches: Set[String] = Set(batch.name)
 
   override protected def batches: Seq[Batch] = Seq(batch)
 }
-
-object ColumnarRuleExecutor {
-  private class LoggedRule(delegate: Rule[SparkPlan])
-    extends Rule[SparkPlan]
-    with Logging
-    with LogLevelUtil {
-
-    override val ruleName: String = delegate.ruleName
-
-    private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime: 
Long): String =
-      if (!oldPlan.fastEquals(newPlan)) {
-        s"""
-           |=== Applying Rule $ruleName took $millisTime ms ===
-           |${sideBySide(oldPlan.treeString, 
newPlan.treeString).mkString("\n")}
-           """.stripMargin
-      } else {
-        s"Rule $ruleName has no effect, took $millisTime ms."
-      }
-
-    override def apply(plan: SparkPlan): SparkPlan = {
-      val (out, millisTime) = 
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
-      logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out, 
millisTime))
-      out
-    }
-  }
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 2f5c8c4472..762069c373 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -35,13 +35,19 @@ import org.apache.spark.sql.execution.SparkPlan
  */
 class EnumeratedApplier(
     session: SparkSession,
-    ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
+    ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+    ruleWrappers: Seq[Rule[SparkPlan] => Rule[SparkPlan]])
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
+
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
     val call = new ColumnarRuleCall(session, CallerInfo.create(), 
outputsColumnar)
-    val finalPlan = apply0(ruleBuilders.map(b => b(call)), plan)
+    val finalPlan = apply0(
+      ruleBuilders
+        .map(b => b(call))
+        .map(r => ruleWrappers.foldLeft(r) { case (r, wrapper) => wrapper(r) 
}),
+      plan)
     finalPlan
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index fed7bca248..ca542f3d15 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -17,12 +17,12 @@
 package org.apache.gluten.extension.columnar.enumerated.planner.plan
 
 import org.apache.gluten.ras.PlanModel
-import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
-import org.apache.spark.task.{SparkTaskUtil, TaskResources}
+import org.apache.spark.task.TaskResources
+import org.apache.spark.util.SparkTaskUtil
 
 import java.util.{Objects, Properties}
 
@@ -32,7 +32,7 @@ object GlutenPlanModel {
   }
 
   private object PlanModelImpl extends PlanModel[SparkPlan] {
-    private val fakeTc = 
SparkShimLoader.getSparkShims.createTestTaskContext(new Properties())
+    private val fakeTc = SparkTaskUtil.createTestTaskContext(new Properties())
     private def fakeTc[T](body: => T): T = {
       assert(!TaskResources.inSparkTask())
       SparkTaskUtil.setTaskContext(fakeTc)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 493319d423..5fa74f6416 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -35,7 +35,8 @@ class HeuristicApplier(
     transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
     fallbackPolicyBuilders: Seq[ColumnarRuleCall => SparkPlan => 
Rule[SparkPlan]],
     postBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
-    finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
+    finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+    ruleWrappers: Seq[Rule[SparkPlan] => Rule[SparkPlan]])
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
@@ -65,8 +66,13 @@ class HeuristicApplier(
   private def transformPlan(
       phase: String,
       rules: Seq[Rule[SparkPlan]],
-      plan: SparkPlan): SparkPlan =
-    new ColumnarRuleExecutor(phase, rules).execute(plan)
+      plan: SparkPlan): SparkPlan = {
+    val wrappedRules = ruleWrappers.foldLeft(rules) {
+      case (rules, wrapper) =>
+        rules.map(wrapper)
+    }
+    new ColumnarRuleExecutor(phase, wrappedRules).execute(plan)
+  }
 
   /**
    * Rules to let planner create a suggested Gluten plan being sent to 
`fallbackPolicies` in which
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 75ca8a775b..8ab66bf561 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -193,8 +193,7 @@ object Convention {
 
   object KnownRowTypeForSpark33OrLater {
     private val lteSpark32: Boolean = {
-      val v = SparkVersionUtil.majorMinorVersion()
-      SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
+      SparkVersionUtil.compareMajorMinorVersion((3, 2)) <= 0
     }
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index 83711e71f6..b28fbe4f36 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -18,12 +18,12 @@ package org.apache.gluten.extension.columnar.transition
 
 import org.apache.gluten.component.Component
 import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
-import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
 import org.apache.spark.sql.execution.adaptive.QueryStageExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.util.SparkPlanUtil
 
 /** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]] 
from a query plan. */
 sealed trait ConventionFunc {
@@ -98,7 +98,7 @@ object ConventionFunc {
       val out = plan match {
         case k: Convention.KnownRowType =>
           k.rowType()
-        case _ if SparkShimLoader.getSparkShims.supportsRowBased(plan) =>
+        case _ if SparkPlanUtil.supportsRowBased(plan) =>
           Convention.RowType.VanillaRowType
         case _ =>
           Convention.RowType.None
@@ -108,7 +108,7 @@ object ConventionFunc {
     }
 
     private def checkRowType(plan: SparkPlan, rowType: Convention.RowType): 
Unit = {
-      if (SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+      if (SparkPlanUtil.supportsRowBased(plan)) {
         assert(
           rowType != Convention.RowType.None,
           s"Plan ${plan.nodeName} supports row-based execution, " +
@@ -173,7 +173,7 @@ object ConventionFunc {
           ConventionReq.of(
             ConventionReq.RowType.Any,
             ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatchType)))
-      case write: DataWritingCommandExec if 
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
+      case write: DataWritingCommandExec if 
SparkPlanUtil.isPlannedV1Write(write) =>
         // To align with 
ApplyColumnarRulesAndInsertTransitions#insertTransitions
         Seq(ConventionReq.any)
       case u: UnionExec =>
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
index 83289fb973..6a9dfa0623 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.extension.columnar.cost.GlutenCostModel
 
@@ -94,7 +94,7 @@ object Transition {
       private val graphCache = mutable.Map[String, TransitionGraph]()
 
       private def graph(): TransitionGraph = synchronized {
-        val aliasOrClass = GlutenConfig.get.rasCostModel
+        val aliasOrClass = GlutenCoreConfig.get.rasCostModel
         graphCache.getOrElseUpdate(
           aliasOrClass, {
             val base = GlutenCostModel.find(aliasOrClass)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 7a90947527..56f3cc612f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SparkVersionUtil
 
 package object transition {
   private val gteSpark33: Boolean = {
-    
SparkVersionUtil.compareMajorMinorVersion(SparkVersionUtil.majorMinorVersion(), 
(3, 3)) >= 0
+    SparkVersionUtil.compareMajorMinorVersion((3, 3)) >= 0
   }
 
   type TransitionGraph = FloydWarshallGraph[TransitionGraph.Vertex, Transition]
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index fa8704509e..f6b8957f7e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.injector
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.extension.columnar.ColumnarRuleApplier
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
@@ -44,7 +44,7 @@ class GlutenInjector private[injector] (control: 
InjectorControl) {
   }
 
   private def applier(session: SparkSession): ColumnarRuleApplier = {
-    val conf = new GlutenConfig(session.sessionState.conf)
+    val conf = new GlutenCoreConfig(session.sessionState.conf)
     if (conf.enableRas) {
       return ras.createApplier(session)
     }
@@ -61,6 +61,7 @@ object GlutenInjector {
       mutable.Buffer.empty[ColumnarRuleCall => SparkPlan => Rule[SparkPlan]]
     private val postBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
     private val finalBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
+    private val ruleWrappers = mutable.Buffer.empty[Rule[SparkPlan] => 
Rule[SparkPlan]]
 
     def injectPreTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit 
= {
       preTransformBuilders += builder
@@ -86,6 +87,10 @@ object GlutenInjector {
       finalBuilders += builder
     }
 
+    def injectRuleWrapper(wrapper: Rule[SparkPlan] => Rule[SparkPlan]): Unit = 
{
+      ruleWrappers += wrapper
+    }
+
     private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
       new HeuristicApplier(
         session,
@@ -93,7 +98,8 @@ object GlutenInjector {
           c => createHeuristicTransform(c)) ++ postTransformBuilders).toSeq,
         fallbackPolicyBuilders.toSeq,
         postBuilders.toSeq,
-        finalBuilders.toSeq
+        finalBuilders.toSeq,
+        ruleWrappers.toSeq
       )
     }
 
@@ -107,6 +113,7 @@ object GlutenInjector {
     private val preTransformBuilders = mutable.Buffer.empty[ColumnarRuleCall 
=> Rule[SparkPlan]]
     private val rasRuleBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
RasRule[SparkPlan]]
     private val postTransformBuilders = mutable.Buffer.empty[ColumnarRuleCall 
=> Rule[SparkPlan]]
+    private val ruleWrappers = mutable.Buffer.empty[Rule[SparkPlan] => 
Rule[SparkPlan]]
 
     def injectPreTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit 
= {
       preTransformBuilders += builder
@@ -120,17 +127,22 @@ object GlutenInjector {
       postTransformBuilders += builder
     }
 
+    def injectRuleWrapper(wrapper: Rule[SparkPlan] => Rule[SparkPlan]): Unit = 
{
+      ruleWrappers += wrapper
+    }
+
     private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
       new EnumeratedApplier(
         session,
         (preTransformBuilders ++ Seq(
-          c => createEnumeratedTransform(c)) ++ postTransformBuilders).toSeq)
+          c => createEnumeratedTransform(c)) ++ postTransformBuilders).toSeq,
+        ruleWrappers.toSeq)
     }
 
     def createEnumeratedTransform(call: ColumnarRuleCall): EnumeratedTransform 
= {
       // Build RAS rules.
       val rules = rasRuleBuilders.map(_(call))
-      val costModel = GlutenCostModel.find(call.glutenConf.rasCostModel)
+      val costModel = GlutenCostModel.find(new 
GlutenCoreConfig(call.sqlConf).rasCostModel)
       // Create transform.
       EnumeratedTransform(costModel, rules.toSeq)
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index 359aa86dd1..c0988b8eca 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.memory
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.memory.memtarget.{MemoryTarget, NoopMemoryTarget}
 
@@ -30,7 +30,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, 
NoopMemoryTarget}
  * BlockId to be extended by user, TestBlockId is chosen for the storage 
memory reservations.
  */
 object GlobalOffHeapMemory {
-  private val target: MemoryTarget = if (GlutenConfig.get.memoryUntracked) {
+  private val target: MemoryTarget = if (GlutenCoreConfig.get.memoryUntracked) 
{
     new NoopMemoryTarget()
   } else {
     new GlobalOffHeapMemoryTarget()
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 0d006a4848..e4204770f4 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.execution.SparkPlan
@@ -29,7 +29,7 @@ import org.apache.spark.util.{SparkVersionUtil, Utils}
  */
 case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
   private val ltSpark33: Boolean = {
-    
SparkVersionUtil.compareMajorMinorVersion(SparkVersionUtil.majorMinorVersion(), 
(3, 3)) < 0
+    SparkVersionUtil.compareMajorMinorVersion((3, 3)) < 0
   }
 
   private val vanillaCostEvaluator: CostEvaluator = {
@@ -46,7 +46,7 @@ case class GlutenCostEvaluator() extends CostEvaluator with 
SQLConfHelper {
   }
 
   override def evaluateCost(plan: SparkPlan): Cost = {
-    if (GlutenConfig.get.enableGluten) {
+    if (GlutenCoreConfig.get.enableGluten) {
       new GlutenCost(vanillaCostEvaluator, plan)
     } else {
       vanillaCostEvaluator.evaluateCost(plan)
diff --git 
a/shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala 
b/gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
similarity index 98%
rename from 
shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
index 5a68d15d2a..f2692d566d 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 
 import scala.reflect.ClassTag
 
-object BlockManagerUtils {
+object BlockManagerUtil {
   def setTestMemoryStore(conf: SparkConf, memoryManager: MemoryManager, 
isDriver: Boolean): Unit = {
     val store = new MemoryStore(
       conf,
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala 
b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
index 4c46457faf..fb06465c19 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
@@ -16,15 +16,14 @@
  */
 package org.apache.spark.task
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.memory.SimpleMemoryUsageRecorder
-import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.task.TaskListener
 
 import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException, 
UnknownReason}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}
+import org.apache.spark.util.{SparkTaskUtil, TaskCompletionListener, 
TaskFailureListener}
 
 import java.util
 import java.util.{Collections, Properties, UUID}
@@ -43,7 +42,7 @@ object TaskResources extends TaskListener with Logging {
   val ACCUMULATED_LEAK_BYTES = new AtomicLong(0L)
 
   private def newUnsafeTaskContext(properties: Properties): TaskContext = {
-    SparkShimLoader.getSparkShims.createTestTaskContext(properties)
+    SparkTaskUtil.createTestTaskContext(properties)
   }
 
   implicit private class PropertiesOps(properties: Properties) {
@@ -65,8 +64,8 @@ object TaskResources extends TaskListener with Logging {
         properties.put(key, value)
       case _ =>
     }
-    properties.setIfMissing(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
-    properties.setIfMissing(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, "1TB")
+    properties.setIfMissing(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, "true")
+    properties.setIfMissing(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, "1TB")
     TaskContext.setTaskContext(newUnsafeTaskContext(properties))
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala
new file mode 100644
index 0000000000..4f5c213e5a
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.internal.SQLConf
+
+object SparkPlanUtil {
+  private val lteSpark32 = SparkVersionUtil.compareMajorMinorVersion((3, 2)) 
<= 0
+  private val lteSpark33 = SparkVersionUtil.compareMajorMinorVersion((3, 3)) 
<= 0
+
+  def supportsRowBased(plan: SparkPlan): Boolean = {
+    if (lteSpark32) {
+      return !plan.supportsColumnar
+    }
+
+    val m = classOf[SparkPlan].getMethod("supportsRowBased")
+    m.invoke(plan).asInstanceOf[Boolean]
+  }
+
+  def isPlannedV1Write(plan: DataWritingCommandExec): Boolean = {
+    if (lteSpark33) {
+      return false
+    }
+
+    val v1WriteCommandClass =
+      
Utils.classForName("org.apache.spark.sql.execution.datasources.V1WriteCommand")
+    val plannedWriteEnabled =
+      SQLConf.get.getConfString("spark.sql.optimizer.plannedWrite.enabled", 
"true").toBoolean
+    v1WriteCommandClass.isAssignableFrom(plan.cmd.getClass) && 
plannedWriteEnabled
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
new file mode 100644
index 0000000000..02df5e224f
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.{SparkConf, TaskContext, TaskContextImpl}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.storage.BlockManagerUtil
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+object SparkTaskUtil {
+  def setTaskContext(taskContext: TaskContext): Unit = {
+    TaskContext.setTaskContext(taskContext)
+  }
+
+  def unsetTaskContext(): Unit = {
+    TaskContext.unset()
+  }
+
+  def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
+    taskContext.taskMemoryManager()
+  }
+
+  def createTestTaskContext(properties: Properties): TaskContext = {
+    val conf = new SparkConf()
+    conf.setAll(properties.asScala)
+    val memoryManager = UnifiedMemoryManager(conf, 1)
+    BlockManagerUtil.setTestMemoryStore(conf, memoryManager, isDriver = false)
+    val stageId = -1.asInstanceOf[Object]
+    val stageAttemptNumber = -1.asInstanceOf[Object]
+    val partitionId = -1.asInstanceOf[Object]
+    val taskAttemptId = -1L.asInstanceOf[Object]
+    val attemptNumber = -1.asInstanceOf[Object]
+    val numPartitions = -1.asInstanceOf[Object] // Added in Spark 3.4.
+    val taskMemoryManager = new TaskMemoryManager(memoryManager, 
-1L).asInstanceOf[Object]
+    val localProperties = properties.asInstanceOf[Object]
+    val metricsSystem =
+      MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", 
conf).asInstanceOf[Object]
+    val taskMetrics = TaskMetrics.empty.asInstanceOf[Object]
+    val cpus = 1.asInstanceOf[Object] // Added in Spark 3.3.
+    val resources = Map.empty.asInstanceOf[Object]
+
+    val ctor = {
+      val ctors = classOf[TaskContextImpl].getDeclaredConstructors
+      assert(ctors.size == 1)
+      ctors.head
+    }
+
+    if (SparkVersionUtil.compareMajorMinorVersion((3, 2)) <= 0) {
+      return ctor
+        .newInstance(
+          stageId,
+          stageAttemptNumber,
+          partitionId,
+          taskAttemptId,
+          attemptNumber,
+          taskMemoryManager,
+          localProperties,
+          metricsSystem,
+          taskMetrics,
+          resources
+        )
+        .asInstanceOf[TaskContext]
+    }
+
+    if (SparkVersionUtil.compareMajorMinorVersion((3, 3)) == 0) {
+      return ctor
+        .newInstance(
+          stageId,
+          stageAttemptNumber,
+          partitionId,
+          taskAttemptId,
+          attemptNumber,
+          taskMemoryManager,
+          localProperties,
+          metricsSystem,
+          taskMetrics,
+          cpus,
+          resources
+        )
+        .asInstanceOf[TaskContext]
+    }
+
+    // Since Spark 3.4.
+    ctor
+      .newInstance(
+        stageId,
+        stageAttemptNumber,
+        partitionId,
+        taskAttemptId,
+        attemptNumber,
+        numPartitions,
+        taskMemoryManager,
+        localProperties,
+        metricsSystem,
+        taskMetrics,
+        cpus,
+        resources
+      )
+      .asInstanceOf[TaskContext]
+  }
+}
diff --git 
a/shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
similarity index 68%
rename from 
shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 22917639bf..0313432427 100644
--- a/shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -17,18 +17,11 @@
 package org.apache.spark.util
 
 object SparkVersionUtil {
-  def majorMinorVersion(version: String = org.apache.spark.SPARK_VERSION): 
(Int, Int) = {
-    VersionUtils.majorMinorVersion(version)
-  }
-
-  def majorMinorPatchVersion(version: String): Option[(Int, Int, Int)] = {
-    VersionUtils.majorMinorPatchVersion(version)
-  }
-
   // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > 
other.
-  def compareMajorMinorVersion(one: (Int, Int), other: (Int, Int)): Int = {
+  def compareMajorMinorVersion(other: (Int, Int)): Int = {
+    val current = 
VersionUtils.majorMinorVersion(org.apache.spark.SPARK_VERSION)
     val base = 1000
-    assert(one._2 < base && other._2 < base)
-    one._1 * base + one._2 - (other._1 * base + other._2)
+    assert(current._2 < base && other._2 < base)
+    current._1 * base + current._2 - (other._1 * base + other._2)
   }
 }
diff --git 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index a31a1d6d35..b59856eef6 100644
--- 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++ 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.memory.memtarget.spark;
 
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
 import org.apache.gluten.memory.memtarget.MemoryTarget;
 import org.apache.gluten.memory.memtarget.Spiller;
 import org.apache.gluten.memory.memtarget.Spillers;
@@ -42,7 +42,7 @@ public class TreeMemoryConsumerTest {
     conf.setConfString("spark.memory.offHeap.enabled", "true");
     conf.setConfString("spark.memory.offHeap.size", "400");
     conf.setConfString(
-        GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(), 
"100");
+        
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(), 
"100");
   }
 
   @Test
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala 
b/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
index 026b717621..8e4fd48284 100644
--- a/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.task
 import org.apache.spark.memory.{MemoryConsumer, MemoryMode}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.task.{SparkTaskUtil, TaskResource, TaskResources}
+import org.apache.spark.task.{TaskResource, TaskResources}
+import org.apache.spark.util.SparkTaskUtil
 
 import org.scalatest.funsuite.AnyFunSuite
 
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
index 6747b79ffc..44b0597503 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -40,7 +41,7 @@ object OffloadIcebergScan {
       c =>
         val offload = Seq(OffloadIcebergScan())
         HeuristicTransform.Simple(
-          Validators.newValidator(c.glutenConf, offload),
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
           offload
         )
     }
@@ -50,7 +51,7 @@ object OffloadIcebergScan {
       c =>
         RasOffload.Rule(
           RasOffload.from[BatchScanExec](OffloadIcebergScan()),
-          Validators.newValidator(c.glutenConf),
+          Validators.newValidator(new GlutenConfig(c.sqlConf)),
           Nil)
     }
   }
diff --git 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
index 262fa82f20..4538e41f37 100644
--- 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
+++ 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -40,7 +41,7 @@ object OffloadKafkaScan {
       c =>
         val offload = Seq(OffloadKafkaScan())
         HeuristicTransform.Simple(
-          Validators.newValidator(c.glutenConf, offload),
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
           offload
         )
     }
@@ -50,7 +51,7 @@ object OffloadKafkaScan {
       c =>
         RasOffload.Rule(
           RasOffload.from[BatchScanExec](OffloadKafkaScan()),
-          Validators.newValidator(c.glutenConf),
+          Validators.newValidator(new GlutenConfig(c.sqlConf)),
           Nil)
     }
   }
diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml
index b91ada96e4..2eecf49407 100644
--- a/gluten-substrait/pom.xml
+++ b/gluten-substrait/pom.xml
@@ -32,6 +32,23 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>${sparkshim.artifactId}</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>spark-sql-columnar-shims-common</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-ui</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- Prevent our dummy JAR from being included in Spark distributions or 
uploaded to YARN -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 37be117105..e90df4ad06 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -16,16 +16,50 @@
  */
 package org.apache.gluten.backendsapi
 
+import org.apache.gluten.GlutenBuildInfo
 import org.apache.gluten.backend.Backend
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED
+import org.apache.gluten.events.GlutenBuildInfoEvent
+import org.apache.gluten.extension.columnar.LoggedRule
 import org.apache.gluten.extension.injector.Injector
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
+import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, 
GlutenUIUtils}
+import org.apache.spark.sql.internal.SparkConfigUtil._
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util.Collections
+
+import scala.collection.mutable
+
+trait SubstraitBackend extends Backend with Logging {
+  import SubstraitBackend._
+  private var _sc: Option[SparkContext] = None
 
-trait SubstraitBackend extends Backend {
   final override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit 
= {
+    _sc = Some(sc)
+    val conf = pc.conf()
+
+    // Register Gluten listeners
+    GlutenSQLAppStatusListener.register(sc)
+    if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
+      SoftAffinityListener.register(sc)
+    }
+
+    postBuildInfoEvent(sc)
+
+    setPredefinedConfigs(conf)
+
+    Collections.emptyMap()
+
     listenerApi().onDriverStart(sc, pc)
   }
+
   final override def onDriverShutdown(): Unit = {
     listenerApi().onDriverShutdown()
   }
@@ -36,8 +70,21 @@ trait SubstraitBackend extends Backend {
     listenerApi().onExecutorShutdown()
   }
   final override def injectRules(injector: Injector): Unit = {
+    injector.gluten.legacy.injectRuleWrapper(r => new LoggedRule(r))
+    injector.gluten.ras.injectRuleWrapper(r => new LoggedRule(r))
     ruleApi().injectRules(injector)
   }
+
+  final override def registerMetrics(appId: String, pluginContext: 
PluginContext): Unit = {
+    _sc.foreach {
+      sc =>
+        if (GlutenUIUtils.uiEnabled(sc)) {
+          GlutenUIUtils.attachUI(sc)
+          logInfo("Gluten SQL Tab has been attached.")
+        }
+    }
+  }
+
   def iteratorApi(): IteratorApi
   def sparkPlanExecApi(): SparkPlanExecApi
   def transformerApi(): TransformerApi
@@ -47,3 +94,62 @@ trait SubstraitBackend extends Backend {
   def ruleApi(): RuleApi
   def settings(): BackendSettingsApi
 }
+
+object SubstraitBackend extends Logging {
+
+  /** Since https://github.com/apache/incubator-gluten/pull/2247. */
+  private def postBuildInfoEvent(sc: SparkContext): Unit = {
+    // export gluten version to property to spark
+    System.setProperty("gluten.version", GlutenBuildInfo.VERSION)
+
+    val glutenBuildInfo = new mutable.LinkedHashMap[String, String]()
+
+    glutenBuildInfo.put("Gluten Version", GlutenBuildInfo.VERSION)
+    glutenBuildInfo.put("GCC Version", GlutenBuildInfo.GCC_VERSION)
+    glutenBuildInfo.put("Java Version", GlutenBuildInfo.JAVA_COMPILE_VERSION)
+    glutenBuildInfo.put("Scala Version", GlutenBuildInfo.SCALA_COMPILE_VERSION)
+    glutenBuildInfo.put("Spark Version", GlutenBuildInfo.SPARK_COMPILE_VERSION)
+    glutenBuildInfo.put("Hadoop Version", 
GlutenBuildInfo.HADOOP_COMPILE_VERSION)
+    glutenBuildInfo.put("Gluten Branch", GlutenBuildInfo.BRANCH)
+    glutenBuildInfo.put("Gluten Revision", GlutenBuildInfo.REVISION)
+    glutenBuildInfo.put("Gluten Revision Time", GlutenBuildInfo.REVISION_TIME)
+    glutenBuildInfo.put("Gluten Build Time", GlutenBuildInfo.BUILD_DATE)
+    glutenBuildInfo.put("Gluten Repo URL", GlutenBuildInfo.REPO_URL)
+
+    val loggingInfo = glutenBuildInfo
+      .map { case (name, value) => s"$name: $value" }
+      .mkString(
+        "Gluten build 
info:\n==============================================================\n",
+        "\n",
+        "\n=============================================================="
+      )
+    logInfo(loggingInfo)
+    if (GlutenUIUtils.uiEnabled(sc)) {
+      val event = GlutenBuildInfoEvent(glutenBuildInfo.toMap)
+      GlutenUIUtils.postEvent(sc, event)
+    }
+  }
+
+  private def setPredefinedConfigs(conf: SparkConf): Unit = {
+    // adaptive custom cost evaluator class
+    val enableGlutenCostEvaluator = 
conf.get(GlutenConfig.COST_EVALUATOR_ENABLED)
+    if (enableGlutenCostEvaluator) {
+      conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS, 
classOf[GlutenCostEvaluator].getName)
+    }
+
+    // Disable vanilla columnar readers, to prevent columnar-to-columnar 
conversions.
+    // FIXME: Do we still need this trick since
+    //  https://github.com/apache/incubator-gluten/pull/1931 was merged?
+    if (!conf.get(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)) {
+      // FIXME Hongze 22/12/06
+      //  BatchScan.scala in shim was not always loaded by class loader.
+      //  The file should be removed and the "ClassCastException" issue caused 
by
+      //  spark.sql.<format>.enableVectorizedReader=true should be fixed in 
another way.
+      //  Before the issue is fixed we force the use of vanilla row reader by 
using
+      //  the following statement.
+      conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, false)
+      conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED, false)
+      conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED, false)
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
similarity index 86%
rename from 
gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 194e5da239..15f6ecda94 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -16,9 +16,8 @@
  */
 package org.apache.gluten.config
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf, 
SQLConfProvider}
+import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
 
 import com.google.common.collect.ImmutableList
 import org.apache.hadoop.security.UserGroupInformation
@@ -33,20 +32,11 @@ case class GlutenNumaBindingInfo(
     totalCoreRange: Array[String] = null,
     numCoresPerExecutor: Int = -1) {}
 
-class GlutenConfig(conf: SQLConf) extends Logging {
+class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
   import GlutenConfig._
 
-  private lazy val configProvider = new SQLConfProvider(conf)
-
-  def getConf[T](entry: ConfigEntry[T]): T = {
-    require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
-    entry.readFrom(configProvider)
-  }
-
   def enableAnsiMode: Boolean = conf.ansiEnabled
 
-  def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
-
   def glutenUiEnabled: Boolean = getConf(GLUTEN_UI_ENABLED)
 
   // FIXME the option currently controls both JVM and native validation 
against a Substrait plan.
@@ -247,26 +237,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
     }
   }
 
-  def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
-
-  def memoryUntracked: Boolean = getConf(COLUMNAR_MEMORY_UNTRACKED)
-
-  def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
-
-  def taskOffHeapMemorySize: Long = 
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
-
-  def memoryOverAcquiredRatio: Double = 
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
-
-  def memoryReservationBlockSize: Long = 
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
-
-  def conservativeTaskOffHeapMemorySize: Long =
-    getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
-
-  // Options used by RAS.
-  def enableRas: Boolean = getConf(RAS_ENABLED)
-
-  def rasCostModel: String = getConf(RAS_COST_MODEL)
-
   def cartesianProductTransformerEnabled: Boolean =
     getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
 
@@ -342,12 +312,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def enableCastAvgAggregateFunction: Boolean = 
getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
 
-  def dynamicOffHeapSizingEnabled: Boolean =
-    getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
-
-  def dynamicOffHeapSizingMemoryFraction: Double =
-    getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
-
   def enableHiveFileFormatWriter: Boolean = 
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
 
   def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
@@ -441,8 +405,6 @@ object GlutenConfig {
   val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
   val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
   val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
-  val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
-  val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
   val SPARK_REDACTION_REGEX = "spark.redaction.regex"
   val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
   val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = 
"spark.unsafe.sorter.spill.reader.buffer.size"
@@ -471,7 +433,7 @@ object GlutenConfig {
     val keys = Set(
       DEBUG_ENABLED.key,
       BENCHMARK_SAVE_DIR.key,
-      COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+      GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
       COLUMNAR_MAX_BATCH_SIZE.key,
       SHUFFLE_WRITER_BUFFER_SIZE.key,
       SQLConf.LEGACY_SIZE_OF_NULL.key,
@@ -598,8 +560,8 @@ object GlutenConfig {
       (
         "spark.gluten.sql.columnar.backend.velox.IOThreads",
         conf.getOrElse(
-          NUM_TASK_SLOTS_PER_EXECUTOR.key,
-          NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
+          GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key,
+          GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
       (COLUMNAR_SHUFFLE_CODEC.key, ""),
       (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
       (DEBUG_CUDF.key, DEBUG_CUDF.defaultValueString),
@@ -622,11 +584,10 @@ object GlutenConfig {
       // datasource config
       SPARK_SQL_PARQUET_COMPRESSION_CODEC,
       // datasource config end
-
-      COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key,
-      COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,
-      COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
-      SPARK_OFFHEAP_ENABLED,
+      GlutenCoreConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key,
+      GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,
+      GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+      GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
       DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
       SPARK_REDACTION_REGEX,
       LEGACY_TIME_PARSER_POLICY.key,
@@ -659,13 +620,11 @@ object GlutenConfig {
     nativeConfMap
   }
 
-  val GLUTEN_ENABLED =
-    buildConf("spark.gluten.enabled")
-      .internal()
-      .doc("Whether to enable gluten. Default value is true. Just an 
experimental property." +
-        " Recommend to enable/disable Gluten through the setting for 
spark.plugins.")
-      .booleanConf
-      .createWithDefault(true)
+  val GLUTEN_ENABLED = GlutenCoreConfig.GLUTEN_ENABLED
+
+  val RAS_ENABLED = GlutenCoreConfig.RAS_ENABLED
+
+  val RAS_COST_MODEL = GlutenCoreConfig.RAS_COST_MODEL
 
   val GLUTEN_UI_ENABLED = buildStaticConf("spark.gluten.ui.enabled")
     .doc(
@@ -1180,77 +1139,6 @@ object GlutenConfig {
       .stringConf
       .createOptional
 
-  val NUM_TASK_SLOTS_PER_EXECUTOR =
-    buildConf("spark.gluten.numTaskSlotsPerExecutor")
-      .internal()
-      .doc(
-        "Must provide default value since non-execution operations " +
-          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
-          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
-      .intConf
-      .createWithDefaultString("-1")
-
-  val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
-    buildConf("spark.gluten.memoryOverhead.size.in.bytes")
-      .internal()
-      .doc(
-        "Must provide default value since non-execution operations " +
-          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
-          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("0")
-
-  val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
-    buildConf("spark.gluten.memory.offHeap.size.in.bytes")
-      .internal()
-      .doc(
-        "Must provide default value since non-execution operations " +
-          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
-          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("0")
-
-  val COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES =
-    buildConf("spark.gluten.memory.task.offHeap.size.in.bytes")
-      .internal()
-      .doc(
-        "Must provide default value since non-execution operations " +
-          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
-          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("0")
-
-  val COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES =
-    buildConf("spark.gluten.memory.conservative.task.offHeap.size.in.bytes")
-      .internal()
-      .doc(
-        "Must provide default value since non-execution operations " +
-          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
-          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("0")
-
-  val COLUMNAR_MEMORY_ISOLATION =
-    buildConf("spark.gluten.memory.isolation")
-      .internal()
-      .doc("Enable isolated memory mode. If true, Gluten controls the maximum 
off-heap memory " +
-        "can be used by each task to X, X = executor memory / max task slots. 
It's recommended " +
-        "to set true if Gluten serves concurrent queries within a single 
session, since not all " +
-        "memory Gluten allocated is guaranteed to be spillable. In the case, 
the feature should " +
-        "be enabled to avoid OOM.")
-      .booleanConf
-      .createWithDefault(false)
-
-  val COLUMNAR_MEMORY_UNTRACKED =
-    buildStaticConf("spark.gluten.memory.untracked")
-      .internal()
-      .doc(
-        "When enabled, turn all native memory allocations in Gluten into 
untracked. Spark " +
-          "will be unaware of the allocations so will not trigger 
spill-to-disk operations " +
-          "or Spark OOMs. Should only be used for testing or other 
non-production use cases.")
-      .booleanConf
-      .createWithDefault(false)
-
   val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
     buildConf("spark.gluten.memory.backtrace.allocation")
       .internal()
@@ -1259,45 +1147,6 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(false)
 
-  val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
-    buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
-      .internal()
-      .doc("If larger than 0, Velox backend will try over-acquire this ratio 
of the total " +
-        "allocated memory as backup to avoid OOM.")
-      .doubleConf
-      .checkValue(d => d >= 0.0d, "Over-acquired ratio should be larger than 
or equals 0")
-      .createWithDefault(0.3d)
-
-  val COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE =
-    buildConf("spark.gluten.memory.reservationBlockSize")
-      .internal()
-      .doc("Block size of native reservation listener reserve memory from 
Spark.")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("8MB")
-
-  // Options used by RAS.
-  val RAS_ENABLED =
-    buildConf("spark.gluten.ras.enabled")
-      .doc(
-        "Enables RAS (relational algebra selector) during physical " +
-          "planning to generate more efficient query plan. Note, this feature 
doesn't bring " +
-          "performance profits by default. Try exploring option 
`spark.gluten.ras.costModel` " +
-          "for advanced usage.")
-      .booleanConf
-      .createWithDefault(false)
-
-  // FIXME: This option is no longer only used by RAS. Should change key to
-  //  `spark.gluten.costModel` or something similar.
-  val RAS_COST_MODEL =
-    buildConf("spark.gluten.ras.costModel")
-      .doc(
-        "The class name of user-defined cost model that will be used by 
Gluten's transition " +
-          "planner as well as by RAS. If not specified, a legacy built-in cost 
model will be " +
-          "used. The legacy cost model helps RAS planner exhaustively offload 
computations, and " +
-          "helps transition planner choose columnar-to-columnar transition 
over others.")
-      .stringConf
-      .createWithDefaultString("legacy")
-
   val TRANSFORM_PLAN_LOG_LEVEL =
     buildConf("spark.gluten.sql.transform.logLevel")
       .internal()
@@ -1634,34 +1483,6 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
-  val DYNAMIC_OFFHEAP_SIZING_ENABLED =
-    buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
-      .internal()
-      .doc(
-        "Experimental: When set to true, the offheap config 
(spark.memory.offHeap.size) will " +
-          "be ignored and instead we will consider onheap and offheap memory 
in combination, " +
-          "both counting towards the executor memory config 
(spark.executor.memory). We will " +
-          "make use of JVM APIs to determine how much onheap memory is use, 
alongside tracking " +
-          "offheap allocations made by Gluten. We will then proceed to 
enforcing a total memory " +
-          "quota, calculated by the sum of what memory is committed and in use 
in the Java " +
-          "heap. Since the calculation of the total quota happens as offheap 
allocation happens " +
-          "and not as JVM heap memory is allocated, it is possible that we can 
oversubscribe " +
-          "memory. Additionally, note that this change is experimental and may 
have performance " +
-          "implications.")
-      .booleanConf
-      .createWithDefault(false)
-
-  val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
-    
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
-      .internal()
-      .doc(
-        "Experimental: Determines the memory fraction used to determine the 
total " +
-          "memory available for offheap and onheap allocations when the 
dynamic offheap " +
-          "sizing feature is enabled. The default is set to match 
spark.executor.memoryFraction.")
-      .doubleConf
-      .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must 
between [0, 1]")
-      .createWithDefault(0.6)
-
   val CELEBORN_FALLBACK_ENABLED =
     
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
       .internal()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/config/ReservedKeys.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
new file mode 100644
index 0000000000..8aa68c4ad1
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.logging.LogLevelUtil
+import org.apache.gluten.metrics.GlutenTimeMetric
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.execution.SparkPlan
+
+/** Since https://github.com/apache/incubator-gluten/pull/7606. */
+class LoggedRule(delegate: Rule[SparkPlan]) extends Rule[SparkPlan] with 
Logging with LogLevelUtil {
+
+  override val ruleName: String = delegate.ruleName
+
+  private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime: 
Long): String =
+    if (!oldPlan.fastEquals(newPlan)) {
+      s"""
+         |=== Applying Rule $ruleName took $millisTime ms ===
+         |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+           """.stripMargin
+    } else {
+      s"Rule $ruleName has no effect, took $millisTime ms."
+    }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    val (out, millisTime) = 
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
+    logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out, 
millisTime))
+    out
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala 
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
rename to 
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
rename to 
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
rename to 
gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
similarity index 100%
rename from 
gluten-core/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
rename to 
gluten-substrait/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 0eacdcedd2..c0e3d1a349 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -179,7 +179,8 @@ private object FallbackStrategiesSuite {
         c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
         _ => ColumnarCollapseTransformStages(GlutenConfig.get)
       ),
-      List(_ => RemoveFallbackTagRule())
+      List(_ => RemoveFallbackTagRule()),
+      Nil
     )
   }
 
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 4ca6ce7b04..6f3894d23b 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -189,7 +189,8 @@ private object FallbackStrategiesSuite {
         c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
         _ => ColumnarCollapseTransformStages(GlutenConfig.get)
       ),
-      List(_ => RemoveFallbackTagRule())
+      List(_ => RemoveFallbackTagRule()),
+      Nil
     )
   }
 
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 4ca6ce7b04..6f3894d23b 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -189,7 +189,8 @@ private object FallbackStrategiesSuite {
         c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
         _ => ColumnarCollapseTransformStages(GlutenConfig.get)
       ),
-      List(_ => RemoveFallbackTagRule())
+      List(_ => RemoveFallbackTagRule()),
+      Nil
     )
   }
 
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 31b622e51c..de2436a39e 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -190,7 +190,8 @@ private object FallbackStrategiesSuite {
         c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
         _ => ColumnarCollapseTransformStages(GlutenConfig.get)
       ),
-      List(_ => RemoveFallbackTagRule())
+      List(_ => RemoveFallbackTagRule()),
+      Nil
     )
   }
 
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index c777873758..942c74e41c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.sql.shims
 import org.apache.gluten.GlutenBuildInfo.SPARK_COMPILE_VERSION
 import org.apache.gluten.expression.Sig
 
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.TaskInfo
@@ -37,8 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
-import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec, 
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2ScanExecBase}
@@ -48,13 +47,13 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.spark.util.SparkVersionUtil
+import org.apache.spark.util.SparkShimVersionUtil
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
-import java.util.{Map => JMap, Properties}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -69,10 +68,10 @@ case class SparkShimDescriptor(major: Int, minor: Int, 
patch: Int) {
 
 object SparkShimDescriptor {
   def apply(version: String): SparkShimDescriptor = {
-    SparkVersionUtil.majorMinorPatchVersion(version) match {
+    SparkShimVersionUtil.sparkMajorMinorPatchVersion(version) match {
       case Some((major, minor, patch)) => SparkShimDescriptor(major, minor, 
patch)
       case None =>
-        val (major, minor) = SparkVersionUtil.majorMinorVersion(version)
+        val (major, minor) = 
SparkShimVersionUtil.sparkMajorMinorVersion(version)
         SparkShimDescriptor(major, minor, 0)
     }
   }
@@ -175,8 +174,6 @@ trait SparkShims {
 
   def enableNativeWriteFilesByDefault(): Boolean = false
 
-  def createTestTaskContext(properties: Properties): TaskContext
-
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     // Since Spark 3.4, the `sc.broadcast` has been optimized to use 
`sc.broadcastInternal`.
     // More details see SPARK-39983.
@@ -258,16 +255,12 @@ trait SparkShims {
   def extractExpressionTimestampAddUnit(timestampAdd: Expression): 
Option[Seq[String]] =
     Option.empty
 
-  def supportsRowBased(plan: SparkPlan): Boolean = !plan.supportsColumnar
-
   def withTryEvalMode(expr: Expression): Boolean = false
 
   def withAnsiEvalMode(expr: Expression): Boolean = false
 
   def dateTimestampFormatInReadIsDefaultValue(csvOptions: CSVOptions, 
timeZone: String): Boolean
 
-  def isPlannedV1Write(write: DataWritingCommandExec): Boolean = false
-
   def createParquetFilters(
       conf: SQLConf,
       schema: MessageType,
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala 
b/shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
similarity index 66%
rename from gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala
rename to 
shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
index ee6d357d55..ec3baef921 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
@@ -14,21 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.task
+package org.apache.spark.util
 
-import org.apache.spark.TaskContext
-import org.apache.spark.memory.TaskMemoryManager
-
-object SparkTaskUtil {
-  def setTaskContext(taskContext: TaskContext): Unit = {
-    TaskContext.setTaskContext(taskContext)
-  }
-
-  def unsetTaskContext(): Unit = {
-    TaskContext.unset()
+object SparkShimVersionUtil {
+  def sparkMajorMinorVersion(version: String = 
org.apache.spark.SPARK_VERSION): (Int, Int) = {
+    VersionUtils.majorMinorVersion(version)
   }
 
-  def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
-    taskContext.taskMemoryManager()
+  def sparkMajorMinorPatchVersion(version: String): Option[(Int, Int, Int)] = {
+    VersionUtils.majorMinorPatchVersion(version)
   }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 1a71371c71..5e690fd09e 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.sql.shims.SparkShims
 import org.apache.gluten.utils.ExceptionUtils
 
-import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, 
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext}
 import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.shuffle.ShuffleHandle
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -58,7 +58,7 @@ import org.apache.parquet.crypto.ParquetCryptoRuntimeException
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
 class Spark32Shims extends SparkShims {
 
@@ -162,10 +162,6 @@ class Spark32Shims extends SparkShims {
     List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session))
   }
 
-  override def createTestTaskContext(properties: Properties): TaskContext = {
-    TaskContextUtils.createTestTaskContext(properties)
-  }
-
   def setJobDescriptionOrTagForBroadcastExchange(
       sc: SparkContext,
       broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 8ff7717fb9..0000000000
--- a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
-  def createTestTaskContext(properties: Properties): TaskContext = {
-    val conf = new SparkConf()
-    conf.setAll(properties.asScala)
-    val memoryManager = UnifiedMemoryManager(conf, 1)
-    BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
-    new TaskContextImpl(
-      -1,
-      -1,
-      -1,
-      -1L,
-      -1,
-      new TaskMemoryManager(memoryManager, -1L),
-      properties,
-      MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
-      TaskMetrics.empty,
-      Map.empty
-    )
-  }
-}
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index e8a827fe79..9be098288b 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -35,9 +35,8 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, SparkPlan}
@@ -61,7 +60,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
 class Spark33Shims extends SparkShims {
   override def getDistribution(
@@ -259,10 +258,6 @@ class Spark33Shims extends SparkShims {
     List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session))
   }
 
-  override def createTestTaskContext(properties: Properties): TaskContext = {
-    TaskContextUtils.createTestTaskContext(properties)
-  }
-
   def setJobDescriptionOrTagForBroadcastExchange(
       sc: SparkContext,
       broadcastExchange: BroadcastExchangeLike): Unit = {
@@ -349,8 +344,6 @@ class Spark33Shims extends SparkShims {
     }
   }
 
-  override def supportsRowBased(plan: SparkPlan): Boolean = 
plan.supportsRowBased
-
   override def dateTimestampFormatInReadIsDefaultValue(
       csvOptions: CSVOptions,
       timeZone: String): Boolean = {
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 058467888f..0000000000
--- a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
-  def createTestTaskContext(properties: Properties): TaskContext = {
-    val conf = new SparkConf()
-    conf.setAll(properties.asScala)
-    val memoryManager = UnifiedMemoryManager(conf, 1)
-    BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
-    new TaskContextImpl(
-      -1,
-      -1,
-      -1,
-      -1L,
-      -1,
-      new TaskMemoryManager(memoryManager, -1L),
-      properties,
-      MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
-      TaskMetrics.empty,
-      1,
-      Map.empty
-    )
-  }
-}
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index b1afb8045c..508e256106 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -43,7 +43,6 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2ScanExecBase}
@@ -63,7 +62,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
 import scala.reflect.ClassTag
 
@@ -318,10 +317,6 @@ class Spark34Shims extends SparkShims {
 
   override def enableNativeWriteFilesByDefault(): Boolean = true
 
-  override def createTestTaskContext(properties: Properties): TaskContext = {
-    TaskContextUtils.createTestTaskContext(properties)
-  }
-
   override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
     SparkContextUtils.broadcastInternal(sc, value)
   }
@@ -546,8 +541,6 @@ class Spark34Shims extends SparkShims {
     }
   }
 
-  override def supportsRowBased(plan: SparkPlan): Boolean = 
plan.supportsRowBased
-
   override def withTryEvalMode(expr: Expression): Boolean = {
     expr match {
       case a: Add => a.evalMode == EvalMode.TRY
@@ -577,10 +570,6 @@ class Spark34Shims extends SparkShims {
     csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
   }
 
-  override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
-    write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
-  }
-
   override def createParquetFilters(
       conf: SQLConf,
       schema: MessageType,
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 267b177920..0000000000
--- a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
-  def createTestTaskContext(properties: Properties): TaskContext = {
-    val conf = new SparkConf()
-    conf.setAll(properties.asScala)
-    val memoryManager = UnifiedMemoryManager(conf, 1)
-    BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
-    new TaskContextImpl(
-      -1,
-      -1,
-      -1,
-      -1L,
-      -1,
-      -1,
-      new TaskMemoryManager(memoryManager, -1L),
-      properties,
-      MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
-      TaskMetrics.empty,
-      1,
-      Map.empty
-    )
-  }
-}
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index db9601235c..7ee55675ba 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -44,7 +44,6 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetFilters}
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2ScanExecBase}
@@ -66,7 +65,7 @@ import 
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -352,10 +351,6 @@ class Spark35Shims extends SparkShims {
 
   override def enableNativeWriteFilesByDefault(): Boolean = true
 
-  override def createTestTaskContext(properties: Properties): TaskContext = {
-    TaskContextUtils.createTestTaskContext(properties)
-  }
-
   override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
     SparkContextUtils.broadcastInternal(sc, value)
   }
@@ -579,8 +574,6 @@ class Spark35Shims extends SparkShims {
     }
   }
 
-  override def supportsRowBased(plan: SparkPlan): Boolean = 
plan.supportsRowBased
-
   override def withTryEvalMode(expr: Expression): Boolean = {
     expr match {
       case a: Add => a.evalMode == EvalMode.TRY
@@ -610,10 +603,6 @@ class Spark35Shims extends SparkShims {
     csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
   }
 
-  override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
-    write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
-  }
-
   override def createParquetFilters(
       conf: SQLConf,
       schema: MessageType,
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 267b177920..0000000000
--- a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
-  def createTestTaskContext(properties: Properties): TaskContext = {
-    val conf = new SparkConf()
-    conf.setAll(properties.asScala)
-    val memoryManager = UnifiedMemoryManager(conf, 1)
-    BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
-    new TaskContextImpl(
-      -1,
-      -1,
-      -1,
-      -1L,
-      -1,
-      -1,
-      new TaskMemoryManager(memoryManager, -1L),
-      properties,
-      MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
-      TaskMetrics.empty,
-      1,
-      Map.empty
-    )
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to