This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b69a6e8bc [GLUTEN-7031][VL] Minimized backend API (#7218)
b69a6e8bc is described below

commit b69a6e8bcf6432199fad3863a003913c3ab2cd10
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 13 14:06:33 2024 +0800

    [GLUTEN-7031][VL] Minimized backend API (#7218)
---
 .../LowCopyFileSegmentShuffleInputStream.java      | 12 +--
 .../vectorized/LowCopyNettyShuffleInputStream.java | 10 +--
 .../vectorized/OnHeapCopyShuffleInputStream.java   | 50 ++++++------
 ...i.Backend => org.apache.gluten.backend.Backend} |  0
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  9 ++-
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  2 +-
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  2 +-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  3 +
 .../clickhouse/CHSparkPlanExecApi.scala            |  4 -
 ...i.Backend => org.apache.gluten.backend.Backend} |  0
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 20 +++--
 .../backendsapi/velox/VeloxListenerApi.scala       |  5 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  3 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 20 -----
 .../apache/gluten/test/VeloxBackendTestBase.java   | 26 ++++---
 gluten-core/pom.xml                                |  5 ++
 .../apache/gluten/exception/GlutenException.java   | 10 ---
 .../DynamicOffHeapSizingMemoryTarget.java          |  2 +
 .../gluten/memory/memtarget/MemoryTargets.java     |  2 +
 .../gluten/softaffinity/SoftAffinityManager.scala  |  5 +-
 .../strategy/SoftAffinityAllocationTrait.scala     |  0
 .../strategy/SoftAffinityStrategy.scala            |  6 +-
 .../apache/spark/softaffinity/SoftAffinity.scala   |  3 +-
 .../spark/softaffinity}/SoftAffinityListener.scala | 11 ++-
 .../scala/org/apache/gluten/GlutenPlugin.scala     | 72 ++++++++---------
 .../scala/org/apache/gluten/backend/Backend.scala  | 90 +++++++++-------------
 .../gluten/extension/GlutenColumnarRule.scala      | 10 +--
 .../gluten/extension/GlutenSessionExtensions.scala |  4 +-
 .../extension/columnar/ColumnarRuleApplier.scala   |  9 ++-
 .../columnar/enumerated/EnumeratedApplier.scala    | 22 +++---
 .../columnar/heuristic/FallbackNode.scala          | 22 +++---
 .../columnar/heuristic/HeuristicApplier.scala      | 41 +++++-----
 .../extension/columnar/transition/Convention.scala |  0
 .../columnar/transition/ConventionFunc.scala       |  6 +-
 .../columnar/transition/ConventionReq.scala        |  4 +-
 .../extension/columnar/transition/Transition.scala |  0
 .../columnar/transition/Transitions.scala          |  4 +-
 .../extension/columnar/transition/package.scala    |  0
 .../gluten/extension/injector/GlutenInjector.scala | 26 +++++--
 .../gluten/extension/injector/RuleInjector.scala   |  0
 .../gluten/extension/injector/SparkInjector.scala  |  0
 .../gluten/extension}/util/AdaptiveContext.scala   |  2 +-
 .../src/main/scala/org/apache/gluten/gluten.scala  |  0
 .../org/apache/gluten/logging}/LogLevelUtil.scala  |  2 +-
 .../org/apache/spark/util/SparkResourceUtil.scala  |  0
 gluten-substrait/pom.xml                           |  5 --
 .../java/org/apache/gluten/test/TestStats.java     | 25 +++---
 .../gluten/backendsapi/BackendSettingsApi.scala    |  8 --
 .../gluten/backendsapi/BackendsApiManager.scala    | 26 +------
 .../gluten/backendsapi/SparkPlanExecApi.scala      | 11 ---
 .../{Backend.scala => SubstraitBackend.scala}      | 40 +++++-----
 .../org/apache/gluten/extension/GlutenPlan.scala   |  5 +-
 .../extension/columnar/ExpandFallbackPolicy.scala  | 10 +--
 .../extension/columnar/MiscColumnarRules.scala     |  3 +-
 .../extension/columnar/OffloadSingleNode.scala     |  3 +-
 .../columnar/enumerated/EnumeratedTransform.scala  |  2 +-
 .../scala/org/apache/gluten/utils/PlanUtil.scala   |  6 +-
 .../apache/gluten/utils/QueryPlanSelector.scala    |  6 +-
 .../spark/listener/GlutenListenerFactory.scala     | 35 ---------
 .../ColumnarCollapseTransformStages.scala          |  3 +-
 .../sql/execution/GlutenFallbackReporter.scala     |  2 +-
 .../apache/spark/sql/hive/HiveUDAFInspector.scala  |  0
 .../spark/softaffinity/SoftAffinitySuite.scala     |  1 -
 .../SoftAffinityWithRDDInfoSuite.scala             |  3 +-
 .../spark/sql/execution/ui/GlutenEventUtils.scala  |  6 --
 .../execution/ui/GlutenSQLAppStatusListener.scala  | 10 ++-
 .../sql/execution/FallbackStrategiesSuite.scala    |  3 +-
 .../extension/GlutenSessionExtensionSuite.scala    |  5 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  3 +-
 .../extension/GlutenSessionExtensionSuite.scala    |  5 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  3 +-
 .../extension/GlutenSessionExtensionSuite.scala    |  5 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  3 +-
 .../extension/GlutenSessionExtensionSuite.scala    |  5 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 15 ++--
 75 files changed, 360 insertions(+), 421 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
index 3d9157a8e..62dd57664 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
@@ -89,11 +89,11 @@ public class LowCopyFileSegmentShuffleInputStream 
implements ShuffleInputStream
 
   @Override
   public void close() {
-    GlutenException.wrap(
-        () -> {
-          channel.close();
-          in.close();
-          return null;
-        });
+    try {
+      channel.close();
+      in.close();
+    } catch (Exception e) {
+      throw new GlutenException(e);
+    }
   }
 }
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
index 0eb921579..ca6ad213d 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
@@ -65,10 +65,10 @@ public class LowCopyNettyShuffleInputStream implements 
ShuffleInputStream {
 
   @Override
   public void close() {
-    GlutenException.wrap(
-        () -> {
-          in.close();
-          return null;
-        });
+    try {
+      in.close();
+    } catch (Exception e) {
+      throw new GlutenException(e);
+    }
   }
 }
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
index ea7a1e236..0b1bd60be 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
@@ -37,24 +37,26 @@ public class OnHeapCopyShuffleInputStream implements 
ShuffleInputStream {
 
   @Override
   public long read(long destAddress, long maxReadSize) {
-    return GlutenException.wrap(
-        () -> {
-          int maxReadSize32 = Math.toIntExact(maxReadSize);
-          if (buffer == null || maxReadSize32 > buffer.length) {
-            this.buffer = new byte[maxReadSize32];
-          }
-          // The code conducts copy as long as 'in' wraps off-heap data,
-          // which is about to be moved to heap
-          int read = in.read(buffer, 0, maxReadSize32);
-          if (read == -1 || read == 0) {
-            return 0;
-          }
-          // The code conducts copy, from heap to off-heap
-          // memCopyFromHeap(buffer, destAddress, read);
-          PlatformDependent.copyMemory(buffer, 0, destAddress, read);
-          bytesRead += read;
-          return read;
-        });
+    try {
+
+      int maxReadSize32 = Math.toIntExact(maxReadSize);
+      if (buffer == null || maxReadSize32 > buffer.length) {
+        this.buffer = new byte[maxReadSize32];
+      }
+      // The code conducts copy as long as 'in' wraps off-heap data,
+      // which is about to be moved to heap
+      int read = in.read(buffer, 0, maxReadSize32);
+      if (read == -1 || read == 0) {
+        return 0;
+      }
+      // The code conducts copy, from heap to off-heap
+      // memCopyFromHeap(buffer, destAddress, read);
+      PlatformDependent.copyMemory(buffer, 0, destAddress, read);
+      bytesRead += read;
+      return read;
+    } catch (Exception e) {
+      throw new GlutenException(e);
+    }
   }
 
   @Override
@@ -69,11 +71,11 @@ public class OnHeapCopyShuffleInputStream implements 
ShuffleInputStream {
 
   @Override
   public void close() {
-    GlutenException.wrap(
-        () -> {
-          in.close();
-          in = null;
-          return null;
-        });
+    try {
+      in.close();
+      in = null;
+    } catch (Exception e) {
+      throw new GlutenException(e);
+    }
   }
 }
diff --git 
a/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
 
b/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
similarity index 100%
rename from 
backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
rename to 
backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 45aee4322..9fd66b473 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -17,10 +17,12 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi._
 import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.expression.WindowFunctionsBuilder
 import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
 
@@ -43,10 +45,11 @@ import java.util.Locale
 
 import scala.util.control.Breaks.{break, breakable}
 
-class CHBackend extends Backend {
+class CHBackend extends SubstraitBackend {
   override def name(): String = CHBackend.BACKEND_NAME
-  override def buildInfo(): BackendBuildInfo =
-    BackendBuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
+  override def batchType: Convention.BatchType = CHBatch
+  override def buildInfo(): Backend.BuildInfo =
+    Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
   override def iteratorApi(): IteratorApi = new CHIteratorApi
   override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
   override def transformerApi(): TransformerApi = new CHTransformerApi
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index f33e767e1..d3a0b7e28 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -20,13 +20,13 @@ import org.apache.gluten.GlutenNumaBindingInfo
 import org.apache.gluten.backendsapi.IteratorApi
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.memory.CHThreadGroup
 import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanNode
 import org.apache.gluten.substrait.rel._
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.gluten.utils.LogLevelUtil
 import org.apache.gluten.vectorized.{BatchIterator, 
CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}
 
 import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index c53448cdd..d84181fec 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -17,9 +17,9 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.MetricsApi
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.metrics._
 import org.apache.gluten.substrait.{AggregationParams, JoinParams}
-import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 04644d4a2..d5e079508 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.{RuleInjector, 
SparkInjector}
 import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, 
RasInjector}
 import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, 
GlutenClickhouseSqlParser}
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter}
@@ -34,6 +35,8 @@ import org.apache.spark.util.SparkPlanRules
 class CHRuleApi extends RuleApi {
   import CHRuleApi._
   override def injectRules(injector: RuleInjector): Unit = {
+    injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
+
     injectSpark(injector.spark)
     injectLegacy(injector.gluten.legacy)
     injectRas(injector.gluten.ras)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index f765a75d2..c9c6a14a8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -24,7 +24,6 @@ import org.apache.gluten.expression._
 import org.apache.gluten.extension.ExpressionExtensionTrait
 import org.apache.gluten.extension.columnar.AddFallbackTagRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
-import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
@@ -67,9 +66,6 @@ import scala.collection.mutable.ArrayBuffer
 
 class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
 
-  /** The columnar-batch type this backend is using. */
-  override def batchType: Convention.BatchType = CHBatch
-
   /** Transform GetArrayItem to Substrait. */
   override def genGetArrayItemTransformer(
       substraitExprName: String,
diff --git 
a/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
 
b/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
similarity index 100%
rename from 
backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
rename to 
backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index ecd053a63..19985d6b8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -17,11 +17,14 @@
 package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.{GlutenConfig, VELOX_BRANCH, VELOX_REVISION, 
VELOX_REVISION_TIME}
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi._
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.expression.WindowFunctionsBuilder
 import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+import 
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, 
OrcReadFormat, ParquetReadFormat}
@@ -32,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
CumeDist, DenseRank, De
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -43,10 +48,17 @@ import org.apache.hadoop.fs.Path
 
 import scala.util.control.Breaks.breakable
 
-class VeloxBackend extends Backend {
+class VeloxBackend extends SubstraitBackend {
   override def name(): String = VeloxBackend.BACKEND_NAME
-  override def buildInfo(): BackendBuildInfo =
-    BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, 
VELOX_REVISION_TIME)
+  override def batchType: Convention.BatchType = VeloxBatch
+  override def batchTypeFunc(): BatchOverride = {
+    case i: InMemoryTableScanExec
+        if i.supportsColumnar && i.relation.cacheBuilder.serializer
+          .isInstanceOf[ColumnarCachedBatchSerializer] =>
+      VeloxBatch
+  }
+  override def buildInfo(): Backend.BuildInfo =
+    Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, 
VELOX_REVISION_TIME)
   override def iteratorApi(): IteratorApi = new VeloxIteratorApi
   override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
   override def transformerApi(): TransformerApi = new VeloxTransformerApi
@@ -454,7 +466,5 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportColumnarArrowUdf(): Boolean = true
 
-  override def generateHdfsConfForLibhdfs(): Boolean = true
-
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 02597f815..2cec8d392 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.jni.{JniLibLoader, JniWorkspace}
 import org.apache.gluten.udf.UdfJniWrapper
 import org.apache.gluten.utils._
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext}
 import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.ByteUnit
@@ -44,6 +44,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
   override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
     val conf = pc.conf()
 
+    // Generate HDFS client configurations.
+    HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
+
     // Overhead memory limits.
     val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
     val desiredOverheadSize = (0.1 * 
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 6204ab092..7f95c78be 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -28,6 +28,7 @@ import 
org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
 import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
 import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, 
RasInjector}
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter}
 
@@ -35,6 +36,8 @@ class VeloxRuleApi extends RuleApi {
   import VeloxRuleApi._
 
   override def injectRules(injector: RuleInjector): Unit = {
+    injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
+
     injectSpark(injector.spark)
     injectLegacy(injector.gluten.legacy)
     injectRas(injector.gluten.ras)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 4a9bfef55..64c212690 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -23,8 +23,6 @@ import org.apache.gluten.execution._
 import org.apache.gluten.expression._
 import org.apache.gluten.expression.aggregate.{HLLAdapter, 
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
 import org.apache.gluten.extension.columnar.FallbackTags
-import org.apache.gluten.extension.columnar.transition.Convention
-import 
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.vectorized.{ColumnarBatchSerializer, 
ColumnarBatchSerializeResult}
 
@@ -42,7 +40,6 @@ import org.apache.spark.sql.catalyst.optimizer.BuildSide
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
@@ -61,23 +58,6 @@ import javax.ws.rs.core.UriBuilder
 
 class VeloxSparkPlanExecApi extends SparkPlanExecApi {
 
-  /** The columnar-batch type this backend is using. */
-  override def batchType: Convention.BatchType = {
-    VeloxBatch
-  }
-
-  /**
-   * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
-   * determine the convention (its row-based processing / columnar-batch 
processing support) of a
-   * plan with a user-defined function that accepts a plan then returns batch 
type it outputs.
-   */
-  override def batchTypeFunc(): BatchOverride = {
-    case i: InMemoryTableScanExec
-        if i.supportsColumnar && i.relation.cacheBuilder.serializer
-          .isInstanceOf[ColumnarCachedBatchSerializer] =>
-      VeloxBatch
-  }
-
   /** Transform GetArrayItem to Substrait. */
   override def genGetArrayItemTransformer(
       substraitExprName: String,
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 9e5843060..211731159 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -22,25 +22,26 @@ import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.plugin.PluginContext;
 import org.apache.spark.resource.ResourceInformation;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
 import java.util.Map;
 
-/** For testing Velox backend without starting a Spark context. */
 public abstract class VeloxBackendTestBase {
+  private static final ListenerApi API = new VeloxListenerApi();
+
   @BeforeClass
   public static void setup() {
-    final ListenerApi api = new VeloxListenerApi();
-    api.onDriverStart(mockSparkContext(), mockPluginContext());
+    API.onExecutorStart(mockPluginContext());
   }
 
-  private static SparkContext mockSparkContext() {
-    // Not yet implemented.
-    return null;
+  @AfterClass
+  public static void tearDown() {
+    API.onExecutorShutdown();
   }
 
   private static PluginContext mockPluginContext() {
@@ -52,9 +53,7 @@ public abstract class VeloxBackendTestBase {
 
       @Override
       public SparkConf conf() {
-        final SparkConf conf = new SparkConf();
-        conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
-        return conf;
+        return newSparkConf();
       }
 
       @Override
@@ -83,4 +82,11 @@ public abstract class VeloxBackendTestBase {
       }
     };
   }
+
+  @NotNull
+  private static SparkConf newSparkConf() {
+    final SparkConf conf = new SparkConf();
+    conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
+    return conf;
+  }
 }
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 448bc9ee2..5e077a8b7 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-ui</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- Prevent our dummy JAR from being included in Spark distributions or 
uploaded to YARN -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java 
b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
index 147e69719..adbc219df 100644
--- a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
+++ b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.exception;
 
-import java.util.concurrent.Callable;
-
 public class GlutenException extends RuntimeException {
 
   public GlutenException() {}
@@ -33,12 +31,4 @@ public class GlutenException extends RuntimeException {
   public GlutenException(Throwable cause) {
     super(cause);
   }
-
-  public static <V> V wrap(Callable<V> callable) {
-    try {
-      return callable.call();
-    } catch (Exception e) {
-      throw new GlutenException(e);
-    }
-  }
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index b7f15d830..ea18a05d0 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -18,11 +18,13 @@ package org.apache.gluten.memory.memtarget;
 
 import org.apache.gluten.GlutenConfig;
 
+import org.apache.spark.annotation.Experimental;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+@Experimental
 public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class);
   private final MemoryTarget delegated;
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index bb1e7102b..6f7cc9bd9 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.memory.MemoryUsageStatsBuilder;
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
 
+import org.apache.spark.annotation.Experimental;
 import org.apache.spark.memory.TaskMemoryManager;
 
 import java.util.Map;
@@ -42,6 +43,7 @@ public final class MemoryTargets {
     return new OverAcquire(target, overTarget, overAcquiredRatio);
   }
 
+  @Experimental
   public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget 
memoryTarget) {
     if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
       return new DynamicOffHeapSizingMemoryTarget(memoryTarget);
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
 
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
similarity index 99%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
rename to 
gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 222c7f91e..7cb0b2fa2 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++ 
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -19,18 +19,15 @@ package org.apache.gluten.softaffinity
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.softaffinity.strategy.SoftAffinityStrategy
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogLevelUtil
-
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListenerStageCompleted, 
SparkListenerStageSubmitted, SparkListenerTaskEnd}
 import org.apache.spark.sql.execution.datasources.FilePartition
-
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.gluten.logging.LogLevelUtil
 
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import scala.collection.mutable
 import scala.util.Random
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
 
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
rename to 
gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
 
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
similarity index 92%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
rename to 
gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
index 7a45a07b4..bc36c3b1e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
+++ 
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.softaffinity.strategy
 
 import org.apache.spark.internal.Logging
 
-import scala.collection.mutable.LinkedHashSet
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging {
@@ -32,7 +32,7 @@ class SoftAffinityStrategy extends 
SoftAffinityAllocationTrait with Logging {
     } else {
       val candidatesSize = candidates.size
       val halfCandidatesSize = candidatesSize / softAffinityReplicationNum
-      val resultSet = new LinkedHashSet[(String, String)]
+      val resultSet = new mutable.LinkedHashSet[(String, String)]
 
       // TODO: try to use ConsistentHash
       val mod = file.hashCode % candidatesSize
@@ -41,7 +41,7 @@ class SoftAffinityStrategy extends 
SoftAffinityAllocationTrait with Logging {
       if (candidates(c1).isDefined) {
         resultSet.add(candidates(c1).get)
       }
-      for (i <- 1 to (softAffinityReplicationNum - 1)) {
+      for (i <- 1 until softAffinityReplicationNum) {
         val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
         if (candidates(c2).isDefined) {
           resultSet.add(candidates(c2).get)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
 b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
similarity index 98%
rename from 
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
rename to 
gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
index cfdce5360..0e439d245 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++ b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -16,9 +16,8 @@
  */
 package org.apache.spark.softaffinity
 
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
-import org.apache.gluten.utils.LogLevelUtil
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.execution.datasources.FilePartition
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
 
b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
similarity index 89%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
rename to 
gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
index b9ffd62dc..d6b9a8d70 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
+++ 
b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.softaffinity.scheduler
+package org.apache.spark.softaffinity
 
 import org.apache.gluten.softaffinity.SoftAffinityManager
-
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 
 class SoftAffinityListener extends SparkListener with Logging {
-
   override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
     SoftAffinityManager.updateStageMap(event)
   }
@@ -46,3 +45,9 @@ class SoftAffinityListener extends SparkListener with Logging 
{
     SoftAffinityManager.handleExecutorRemoved(execId)
   }
 }
+
+object SoftAffinityListener {
+  def register(sc: SparkContext): Unit = {
+    sc.listenerBus.addToStatusQueue(new SoftAffinityListener())
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
similarity index 83%
rename from gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala
rename to gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 66bab72bf..72998c0b0 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -17,19 +17,18 @@
 package org.apache.gluten
 
 import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.events.GlutenBuildInfoEvent
 import org.apache.gluten.exception.GlutenException
-import 
org.apache.gluten.extension.GlutenSessionExtensions.{GLUTEN_SESSION_EXTENSION_NAME,
 SPARK_SESSION_EXTS_KEY}
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.task.TaskListener
-import org.apache.gluten.test.TestStats
 
-import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext, 
TaskFailedReason}
+import org.apache.spark.{SparkConf, SparkContext, TaskFailedReason}
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
 import org.apache.spark.internal.Logging
-import org.apache.spark.listener.GlutenListenerFactory
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.execution.ui.GlutenEventUtils
+import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.ui.{GlutenEventUtils, 
GlutenSQLAppStatusListener}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.task.TaskResources
 import org.apache.spark.util.SparkResourceUtil
@@ -54,23 +53,21 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
   override def init(sc: SparkContext, pluginContext: PluginContext): 
util.Map[String, String] = {
     _sc = Some(sc)
-    GlutenEventUtils.registerListener(sc)
+    GlutenSQLAppStatusListener.register(sc)
     postBuildInfoEvent(sc)
 
     val conf = pluginContext.conf()
-    if (conf.getBoolean(GlutenConfig.UT_STATISTIC.key, defaultValue = false)) {
-      // Only statistic in UT, not thread safe
-      TestStats.beginStatistic()
-    }
 
     setPredefinedConfigs(sc, conf)
-    if (BackendsApiManager.getSettings.generateHdfsConfForLibhdfs()) {
-      HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
+    // Initialize Backends API.
+    Backend.get().onDriverStart(sc, pluginContext)
+    if (
+      sc.getConf.getBoolean(
+        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
+        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)
+    ) {
+      SoftAffinityListener.register(sc)
     }
-    // Initialize Backends API
-    BackendsApiManager.initialize()
-    BackendsApiManager.getListenerApiInstance.onDriverStart(sc, pluginContext)
-    GlutenListenerFactory.addToSparkListenerBus(sc)
 
     Collections.emptyMap()
   }
@@ -86,11 +83,11 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
   }
 
   override def shutdown(): Unit = {
-    BackendsApiManager.getListenerApiInstance.onDriverShutdown()
+    Backend.get().onDriverShutdown()
   }
 
   private def postBuildInfoEvent(sc: SparkContext): Unit = {
-    val buildInfo = BackendsApiManager.getBuildInfo
+    val buildInfo = Backend.get().buildInfo()
 
     // export gluten version to property to spark
     System.setProperty("gluten.version", VERSION)
@@ -107,10 +104,10 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME)
     glutenBuildInfo.put("Gluten Build Time", BUILD_DATE)
     glutenBuildInfo.put("Gluten Repo URL", REPO_URL)
-    glutenBuildInfo.put("Backend", buildInfo.backend)
-    glutenBuildInfo.put("Backend Branch", buildInfo.backendBranch)
-    glutenBuildInfo.put("Backend Revision", buildInfo.backendRevision)
-    glutenBuildInfo.put("Backend Revision Time", buildInfo.backendRevisionTime)
+    glutenBuildInfo.put("Backend", buildInfo.name)
+    glutenBuildInfo.put("Backend Branch", buildInfo.branch)
+    glutenBuildInfo.put("Backend Revision", buildInfo.revision)
+    glutenBuildInfo.put("Backend Revision Time", buildInfo.revisionTime)
     val infoMap = glutenBuildInfo.toMap
     val loggingInfo = infoMap.toSeq
       .sortBy(_._1)
@@ -130,12 +127,13 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
   private def setPredefinedConfigs(sc: SparkContext, conf: SparkConf): Unit = {
     // sql extensions
-    val extensions = if (conf.contains(SPARK_SESSION_EXTS_KEY)) {
-      s"${conf.get(SPARK_SESSION_EXTS_KEY)},$GLUTEN_SESSION_EXTENSION_NAME"
+    val extensions = if 
(conf.contains(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)) {
+      s"${conf.get(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)}," +
+        s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}"
     } else {
-      s"$GLUTEN_SESSION_EXTENSION_NAME"
+      s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}"
     }
-    conf.set(SPARK_SESSION_EXTS_KEY, extensions)
+    conf.set(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY, extensions)
 
     // adaptive custom cost evaluator class
     if (GlutenConfig.getConf.enableGluten && 
GlutenConfig.getConf.enableGlutenCostEvaluator) {
@@ -230,13 +228,19 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
         conservativeOffHeapPerTask.toString)
     }
 
-    // disable vanilla columnar readers, to prevent columnar-to-columnar 
conversions
-    if (BackendsApiManager.getSettings.disableVanillaColumnarReaders(conf)) {
+    // Disable vanilla columnar readers, to prevent columnar-to-columnar 
conversions.
+    // FIXME: Do we still need this trick since
+    //  https://github.com/apache/incubator-gluten/pull/1931 was merged?
+    if (
+      !conf.getBoolean(
+        GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
+        GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
+    ) {
       // FIXME Hongze 22/12/06
       //  BatchScan.scala in shim was not always loaded by class loader.
       //  The file should be removed and the "ClassCastException" issue caused 
by
       //  spark.sql.<format>.enableVectorizedReader=true should be fixed in 
another way.
-      //  Before the issue was fixed we force the use of vanilla row reader by 
using
+      //  Before the issue is fixed we force the use of vanilla row reader by 
using
       //  the following statement.
       conf.set("spark.sql.parquet.enableVectorizedReader", "false")
       conf.set("spark.sql.orc.enableVectorizedReader", "false")
@@ -262,15 +266,13 @@ private[gluten] class GlutenExecutorPlugin extends 
ExecutorPlugin {
   override def init(ctx: PluginContext, extraConf: util.Map[String, String]): 
Unit = {
     val conf = ctx.conf()
 
-    // Initialize Backends API
-    // TODO categorize the APIs by driver's or executor's
-    BackendsApiManager.initialize()
-    BackendsApiManager.getListenerApiInstance.onExecutorStart(ctx)
+    // Initialize Backends API.
+    Backend.get().onExecutorStart(ctx)
   }
 
   /** Clean up and terminate this plugin. For example: close the native 
engine. */
   override def shutdown(): Unit = {
-    BackendsApiManager.getListenerApiInstance.onExecutorShutdown()
+    Backend.get().onExecutorShutdown()
     super.shutdown()
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
 b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
similarity index 52%
copy from 
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
copy to gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
index e4f5cbdc9..5f82a2ee7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
@@ -14,18 +14,47 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.backendsapi
+package org.apache.gluten.backend
+
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
+import org.apache.gluten.extension.injector.RuleInjector
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
 
 import java.util.ServiceLoader
 
 import scala.collection.JavaConverters
 
-object BackendsApiManager {
+trait Backend {
+  import Backend._
+
+  /** Base information. */
+  def name(): String
+  def buildInfo(): BuildInfo
+
+  /** Spark listeners. */
+  def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
+  def onDriverShutdown(): Unit = {}
+  def onExecutorStart(pc: PluginContext): Unit = {}
+  def onExecutorShutdown(): Unit = {}
 
-  private lazy val backend: Backend = initializeInternal()
+  /** The columnar-batch type this backend is using. */
+  def batchType: Convention.BatchType
 
-  /** Initialize all backends api. */
-  private def initializeInternal(): Backend = {
+  /**
+   * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
+   * determine the convention (its row-based processing / columnar-batch 
processing support) of a
+   * plan with a user-defined function that accepts a plan then returns batch 
type it outputs.
+   */
+  def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
+
+  /** Query planner rules. */
+  def injectRules(injector: RuleInjector): Unit
+}
+
+object Backend {
+  private val be: Backend = {
     val discoveredBackends =
       
JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq
     if (discoveredBackends.isEmpty) {
@@ -40,54 +69,9 @@ object BackendsApiManager {
     backend
   }
 
-  /**
-   * Automatically detect the backend api.
-   * @return
-   */
-  def initialize(): String = {
-    getBackendName
-  }
-
-  // Note: Do not make direct if-else checks based on output of the method.
-  // Any form of backend-specific code should be avoided from appearing in 
common module
-  // (e.g. gluten-substrait, gluten-data)
-  def getBackendName: String = {
-    backend.name()
-  }
-
-  def getBuildInfo: BackendBuildInfo = {
-    backend.buildInfo()
+  def get(): Backend = {
+    be
   }
 
-  def getListenerApiInstance: ListenerApi = {
-    backend.listenerApi()
-  }
-
-  def getIteratorApiInstance: IteratorApi = {
-    backend.iteratorApi()
-  }
-
-  def getSparkPlanExecApiInstance: SparkPlanExecApi = {
-    backend.sparkPlanExecApi()
-  }
-
-  def getTransformerApiInstance: TransformerApi = {
-    backend.transformerApi()
-  }
-
-  def getValidatorApiInstance: ValidatorApi = {
-    backend.validatorApi()
-  }
-
-  def getMetricsApiInstance: MetricsApi = {
-    backend.metricsApi()
-  }
-
-  def getRuleApiInstance: RuleApi = {
-    backend.ruleApi()
-  }
-
-  def getSettings: BackendSettingsApi = {
-    backend.settings
-  }
+  case class BuildInfo(name: String, branch: String, revision: String, 
revisionTime: String)
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
similarity index 96%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index c5a9afec3..ce6d50350 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -16,9 +16,9 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.extension.columnar._
+import org.apache.gluten.extension.columnar.ColumnarRuleApplier
 import org.apache.gluten.extension.columnar.transition.Transitions
-import org.apache.gluten.utils.LogLevelUtil
+import org.apache.gluten.logging.LogLevelUtil
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
-object ColumnarOverrideRules {
+object GlutenColumnarRule {
 
   // Utilities to infer columnar rule's caller's property:
   // ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
@@ -92,14 +92,14 @@ object ColumnarOverrideRules {
   }
 }
 
-case class ColumnarOverrideRules(
+case class GlutenColumnarRule(
     session: SparkSession,
     applierBuilder: SparkSession => ColumnarRuleApplier)
   extends ColumnarRule
   with Logging
   with LogLevelUtil {
 
-  import ColumnarOverrideRules._
+  import GlutenColumnarRule._
 
   /**
    * Note: Do not implement this API. We basically inject all of Gluten's 
physical rules through
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
similarity index 92%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 4456dda61..42c9cc94b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.extension.injector.RuleInjector
 
 import org.apache.spark.sql.SparkSessionExtensions
@@ -27,7 +27,7 @@ import java.util.Objects
 private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions 
=> Unit) {
   override def apply(exts: SparkSessionExtensions): Unit = {
     val injector = new RuleInjector()
-    BackendsApiManager.getRuleApiInstance.injectRules(injector)
+    Backend.get().injectRules(injector)
     injector.inject(exts)
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
similarity index 90%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 9b78ccd11..b47b2f338 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -17,9 +17,9 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -74,6 +74,11 @@ object ColumnarRuleApplier {
       logOnLevel(transformPlanLogLevel, message(plan, out, millisTime))
       out
     }
+  }
 
+  // A temporary workaround for applying toggle `spark.gluten.enabled`, to be 
removed.
+  trait SkipCondition {
+    // True if the rule execution should be skipped.
+    def skip(session: SparkSession, plan: SparkPlan): Boolean
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
similarity index 82%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 6ce4e24ed..de6ac2a7d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -17,9 +17,9 @@
 package org.apache.gluten.extension.columnar.enumerated
 
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall}
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
@@ -36,20 +36,24 @@ import org.apache.spark.sql.execution.SparkPlan
  * implementing them in EnumeratedTransform.
  */
 @Experimental
-class EnumeratedApplier(session: SparkSession, ruleBuilders: 
Seq[ColumnarRuleBuilder])
+class EnumeratedApplier(
+    session: SparkSession,
+    skipConditions: Seq[SkipCondition],
+    ruleBuilders: Seq[ColumnarRuleBuilder])
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
   private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+    if (skipConditions.exists(_.skip(session, plan))) {
+      return plan
+    }
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
-    PhysicalPlanSelector.maybe(session, plan) {
-      val finalPlan = maybeAqe {
-        apply0(ruleBuilders.map(b => b(call)), plan)
-      }
-      finalPlan
+    val finalPlan = maybeAqe {
+      apply0(ruleBuilders.map(b => b(call)), plan)
     }
+    finalPlan
   }
 
   private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan 
= {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
similarity index 58%
copy from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
copy to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
index bccbd38b2..89409e5ad 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
@@ -14,19 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension.injector
+package org.apache.gluten.extension.columnar.heuristic
 
-import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
 
-/** Injector used to inject query planner rules into Spark and Gluten. */
-class RuleInjector {
-  val spark: SparkInjector = new SparkInjector()
-  val gluten: GlutenInjector = new GlutenInjector()
-
-  private[extension] def inject(extensions: SparkSessionExtensions): Unit = {
-    spark.inject(extensions)
-    gluten.inject(extensions)
-  }
+/** A wrapper to specify the plan is fallback plan, the caller side should 
unwrap it. */
+case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode {
+  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override def output: Seq[Attribute] = fallbackPlan.output
 }
-
-object RuleInjector {}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
similarity index 80%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 85f44878f..8e612c6ae 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -17,9 +17,9 @@
 package org.apache.gluten.extension.columnar.heuristic
 
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall}
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
  */
 class HeuristicApplier(
     session: SparkSession,
+    skipConditions: Seq[SkipCondition],
     transformBuilders: Seq[ColumnarRuleBuilder],
     fallbackPolicyBuilders: Seq[ColumnarRuleBuilder],
     postBuilders: Seq[ColumnarRuleBuilder],
@@ -42,28 +43,30 @@ class HeuristicApplier(
   private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+    if (skipConditions.exists(_.skip(session, plan))) {
+      return plan
+    }
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
     makeRule(call).apply(plan)
   }
 
-  private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] =
+  private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = {
     plan =>
-      PhysicalPlanSelector.maybe(session, plan) {
-        val finalPlan = prepareFallback(plan) {
-          p =>
-            val suggestedPlan = transformPlan("transform", 
transformRules(call), p)
-            transformPlan("fallback", fallbackPolicies(call), suggestedPlan) 
match {
-              case FallbackNode(fallbackPlan) =>
-                // we should use vanilla c2r rather than native c2r,
-                // and there should be no `GlutenPlan` any more,
-                // so skip the `postRules()`.
-                fallbackPlan
-              case plan =>
-                transformPlan("post", postRules(call), plan)
-            }
-        }
-        transformPlan("final", finalRules(call), finalPlan)
+      val finalPlan = prepareFallback(plan) {
+        p =>
+          val suggestedPlan = transformPlan("transform", transformRules(call), 
p)
+          transformPlan("fallback", fallbackPolicies(call), suggestedPlan) 
match {
+            case FallbackNode(fallbackPlan) =>
+              // we should use vanilla c2r rather than native c2r,
+              // and there should be no `GlutenPlan` any more,
+              // so skip the `postRules()`.
+              fallbackPlan
+            case plan =>
+              transformPlan("post", postRules(call), plan)
+          }
       }
+      transformPlan("final", finalRules(call), finalPlan)
+  }
 
   private def transformPlan(
       phase: String,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
similarity index 97%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index beb809474..21662f503 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -60,7 +60,7 @@ object ConventionFunc {
         return PartialFunction.empty
       }
     }
-    BackendsApiManager.getSparkPlanExecApiInstance.batchTypeFunc()
+    Backend.get().batchTypeFunc()
   }
 
   private class BuiltinFunc(o: BatchOverride) extends ConventionFunc {
@@ -86,7 +86,7 @@ object ConventionFunc {
         val batchType = if (a.supportsColumnar) {
           // By default, we execute columnar AQE with backend batch output.
           // See 
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
-          BackendsApiManager.getSparkPlanExecApiInstance.batchType
+          Backend.get().batchType
         } else {
           Convention.BatchType.None
         }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
similarity index 94%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index 65422b380..cb76ec4de 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 
 import org.apache.spark.sql.execution.SparkPlan
 
@@ -58,7 +58,7 @@ object ConventionReq {
   val vanillaBatch: ConventionReq =
     Impl(RowType.Any, BatchType.Is(Convention.BatchType.VanillaBatch))
   lazy val backendBatch: ConventionReq =
-    Impl(RowType.Any, 
BatchType.Is(BackendsApiManager.getSparkPlanExecApiInstance.batchType))
+    Impl(RowType.Any, BatchType.Is(Backend.get().batchType))
 
   def get(plan: SparkPlan): ConventionReq = 
ConventionFunc.create().conventionReqOf(plan)
   def of(rowType: RowType, batchType: BatchType): ConventionReq = 
Impl(rowType, batchType)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
similarity index 96%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 602f0303c..3ba09efef 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
@@ -89,7 +89,7 @@ object Transitions {
   }
 
   def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
-    val backendBatchType = 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    val backendBatchType = Backend.get().batchType
     val out = toBatchPlan(plan, backendBatchType)
     out
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
similarity index 77%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index 728e569cc..ca76e61b7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -17,9 +17,9 @@
 package org.apache.gluten.extension.injector
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.extension.columnar.ColumnarRuleApplier
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
SkipCondition}
 import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 
@@ -30,20 +30,25 @@ import scala.collection.mutable
 /** Injector used to inject query planner rules into Gluten. */
 class GlutenInjector private[injector] {
   import GlutenInjector._
+  private val skipConditions: mutable.ListBuffer[SkipCondition] = 
mutable.ListBuffer()
   val legacy: LegacyInjector = new LegacyInjector()
   val ras: RasInjector = new RasInjector()
 
   private[injector] def inject(extensions: SparkSessionExtensions): Unit = {
-    val ruleBuilder = (session: SparkSession) => new 
ColumnarOverrideRules(session, applier)
+    val ruleBuilder = (session: SparkSession) => new 
GlutenColumnarRule(session, applier)
     extensions.injectColumnar(session => ruleBuilder(session))
   }
 
   private def applier(session: SparkSession): ColumnarRuleApplier = {
     val conf = new GlutenConfig(session.sessionState.conf)
     if (conf.enableRas) {
-      return ras.createApplier(session)
+      return ras.createApplier(session, skipConditions.toSeq)
     }
-    legacy.createApplier(session)
+    legacy.createApplier(session, skipConditions.toSeq)
+  }
+
+  def skipOn(skipCondition: SkipCondition): Unit = {
+    skipConditions += skipCondition
   }
 }
 
@@ -70,9 +75,12 @@ object GlutenInjector {
       finalBuilders += builder
     }
 
-    private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
+    private[injector] def createApplier(
+        session: SparkSession,
+        skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
       new HeuristicApplier(
         session,
+        skipConditions,
         transformBuilders.toSeq,
         fallbackPolicyBuilders.toSeq,
         postBuilders.toSeq,
@@ -87,8 +95,10 @@ object GlutenInjector {
       ruleBuilders += builder
     }
 
-    private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
-      new EnumeratedApplier(session, ruleBuilders.toSeq)
+    private[injector] def createApplier(
+        session: SparkSession,
+        skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
+      new EnumeratedApplier(session, skipConditions, ruleBuilders.toSeq)
     }
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
similarity index 98%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
index de72bc4bc..b0f42e796 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension.columnar.util
+package org.apache.gluten.extension.util
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.SparkPlan
diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala 
b/gluten-core/src/main/scala/org/apache/gluten/gluten.scala
similarity index 100%
rename from gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala
rename to gluten-core/src/main/scala/org/apache/gluten/gluten.scala
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala 
b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
similarity index 97%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
index cbd52b2d0..07bbde2ba 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.utils
+package org.apache.gluten.logging
 
 import org.apache.spark.internal.Logging
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
similarity index 100%
rename from 
gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml
index 77bb9f3c3..a863b7957 100644
--- a/gluten-substrait/pom.xml
+++ b/gluten-substrait/pom.xml
@@ -32,11 +32,6 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.gluten</groupId>
-      <artifactId>gluten-ui</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <!-- Prevent our dummy JAR from being included in Spark distributions or 
uploaded to YARN -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java 
b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
index 598ab556f..37d9e09bb 100644
--- a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
+++ b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.test;
 
+import org.apache.gluten.GlutenConfig;
+
 import java.util.*;
 
 /** Only use in UT Env. It's not thread safe. */
@@ -24,7 +26,6 @@ public class TestStats {
   private static final String ROW_FORMAT =
       
"<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>";
 
-  private static boolean UT_ENV = false;
   private static final Map<String, CaseInfo> caseInfos = new HashMap<>();
   private static String currentCase;
   public static int offloadGlutenUnitNumber = 0;
@@ -35,8 +36,8 @@ public class TestStats {
   public static int suiteTestNumber = 0;
   public static int offloadGlutenTestNumber = 0;
 
-  public static void beginStatistic() {
-    UT_ENV = true;
+  private static boolean enabled() {
+    return GlutenConfig.getConf().collectUtStats();
   }
 
   public static void reset() {
@@ -56,7 +57,7 @@ public class TestStats {
   public static int totalOffloadGlutenCaseNumber = 0;
 
   public static void printMarkdown(String suitName) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -105,7 +106,7 @@ public class TestStats {
   }
 
   public static void addFallBackClassName(String className) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -117,7 +118,7 @@ public class TestStats {
   }
 
   public static void addFallBackCase() {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -127,7 +128,7 @@ public class TestStats {
   }
 
   public static void addExpressionClassName(String className) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -138,7 +139,7 @@ public class TestStats {
   }
 
   public static Set<String> getFallBackClassName() {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return Collections.emptySet();
     }
 
@@ -150,7 +151,7 @@ public class TestStats {
   }
 
   public static void addIgnoreCaseName(String caseName) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -160,7 +161,7 @@ public class TestStats {
   }
 
   public static void resetCase() {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -171,7 +172,7 @@ public class TestStats {
   }
 
   public static void startCase(String caseName) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
@@ -180,7 +181,7 @@ public class TestStats {
   }
 
   public static void endCase(boolean status) {
-    if (!UT_ENV) {
+    if (!enabled()) {
       return;
     }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 451cb2fd2..0aa5d6ae8 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.plans._
@@ -72,11 +71,6 @@ trait BackendSettingsApi {
   // Whether to fallback aggregate at the same time if its empty-output child 
is fallen back.
   def fallbackAggregateWithEmptyOutputChild(): Boolean = false
 
-  def disableVanillaColumnarReaders(conf: SparkConf): Boolean =
-    !conf.getBoolean(
-      GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
-      GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
-
   def recreateJoinExecOnFallback(): Boolean = false
 
   /**
@@ -141,7 +135,5 @@ trait BackendSettingsApi {
 
   def supportColumnarArrowUdf(): Boolean = false
 
-  def generateHdfsConfForLibhdfs(): Boolean = false
-
   def needPreComputeRangeFrameBoundary(): Boolean = false
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
index e4f5cbdc9..942058cc5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
@@ -16,28 +16,14 @@
  */
 package org.apache.gluten.backendsapi
 
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters
+import org.apache.gluten.backend.Backend
 
 object BackendsApiManager {
-
-  private lazy val backend: Backend = initializeInternal()
+  private lazy val backend: SubstraitBackend = initializeInternal()
 
   /** Initialize all backends api. */
-  private def initializeInternal(): Backend = {
-    val discoveredBackends =
-      
JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq
-    if (discoveredBackends.isEmpty) {
-      throw new IllegalStateException("Backend implementation not discovered 
from JVM classpath")
-    }
-    if (discoveredBackends.size != 1) {
-      throw new IllegalStateException(
-        s"More than one Backend implementation discovered from JVM classpath: 
" +
-          s"${discoveredBackends.map(_.name()).toList}")
-    }
-    val backend = discoveredBackends.head
-    backend
+  private def initializeInternal(): SubstraitBackend = {
+    Backend.get().asInstanceOf[SubstraitBackend]
   }
 
   /**
@@ -55,10 +41,6 @@ object BackendsApiManager {
     backend.name()
   }
 
-  def getBuildInfo: BackendBuildInfo = {
-    backend.buildInfo()
-  }
-
   def getListenerApiInstance: ListenerApi = {
     backend.listenerApi()
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index dd4150806..b82730e72 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression._
-import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 
@@ -53,16 +52,6 @@ import scala.collection.JavaConverters._
 
 trait SparkPlanExecApi {
 
-  /** The columnar-batch type this backend is using. */
-  def batchType: Convention.BatchType
-
-  /**
-   * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
-   * determine the convention (its row-based processing / columnar-batch 
processing support) of a
-   * plan with a user-defined function that accepts a plan then returns batch 
type it outputs.
-   */
-  def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
-
   /**
    * Generate FilterExecTransformer.
    *
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
similarity index 59%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 3a5975522..d7785663d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -16,30 +16,34 @@
  */
 package org.apache.gluten.backendsapi
 
-trait Backend {
-  def name(): String
-
-  def buildInfo(): BackendBuildInfo
-
+import org.apache.gluten.backend.Backend
+import org.apache.gluten.extension.injector.RuleInjector
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
+
+trait SubstraitBackend extends Backend {
+  final override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit 
= {
+    listenerApi().onDriverStart(sc, pc)
+  }
+  final override def onDriverShutdown(): Unit = {
+    listenerApi().onDriverShutdown()
+  }
+  final override def onExecutorStart(pc: PluginContext): Unit = {
+    listenerApi().onExecutorStart(pc)
+  }
+  final override def onExecutorShutdown(): Unit = {
+    listenerApi().onExecutorShutdown()
+  }
+  final override def injectRules(injector: RuleInjector): Unit = {
+    ruleApi().injectRules(injector)
+  }
   def iteratorApi(): IteratorApi
-
   def sparkPlanExecApi(): SparkPlanExecApi
-
   def transformerApi(): TransformerApi
-
   def validatorApi(): ValidatorApi
-
   def metricsApi(): MetricsApi
-
   def listenerApi(): ListenerApi
-
   def ruleApi(): RuleApi
-
   def settings(): BackendSettingsApi
 }
-
-case class BackendBuildInfo(
-    backend: String,
-    backendBranch: String,
-    backendRevision: String,
-    backendRevisionTime: String)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 0c70e1ea7..14d038b15 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -17,15 +17,16 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.expression.TransformerState
 import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.PlanBuilder
 import org.apache.gluten.substrait.rel.RelNode
 import org.apache.gluten.test.TestStats
-import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.sql.execution.SparkPlan
 
@@ -108,7 +109,7 @@ trait GlutenPlan extends SparkPlan with 
Convention.KnownBatchType with LogLevelU
   }
 
   protected def batchType0(): Convention.BatchType = {
-    BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    Backend.get().batchType
   }
 
   protected def doValidateInternal(): ValidationResult = 
ValidationResult.succeeded
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
index 29e1caae7..9c0ddac16 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
@@ -18,12 +18,10 @@ package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.heuristic.FallbackNode
 import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike, Transitions}
 import org.apache.gluten.utils.PlanUtil
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
@@ -275,9 +273,3 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, 
originalPlan: SparkP
     def DO_NOT_FALLBACK(): FallbackInfo = FallbackInfo()
   }
 }
-
-/** A wrapper to specify the plan is fallback plan, the caller side should 
unwrap it. */
-case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode {
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
-  override def output: Seq[Attribute] = fallbackPlan.output
-}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 3bfa611ed..69b74f3a6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,7 +17,8 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
Transitions}
-import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
+import org.apache.gluten.logging.LogLevelUtil
+import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 9ca60177c..6f1b1fa3b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -21,8 +21,9 @@ import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
+import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.api.python.EvalPythonExecTransformer
 import org.apache.spark.internal.Logging
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 007f18fca..3737dd4af 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -19,12 +19,12 @@ package org.apache.gluten.extension.columnar.enumerated
 import org.apache.gluten.extension.columnar.{OffloadExchange, OffloadJoin, 
OffloadOthers}
 import org.apache.gluten.extension.columnar.transition.ConventionReq
 import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.planner.GlutenOptimization
 import org.apache.gluten.planner.cost.GlutenCostModel
 import org.apache.gluten.planner.property.Conv
 import org.apache.gluten.ras.property.PropertySet
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index fe7eb5566..e72677ebf 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.utils
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.sql.execution._
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 
 object PlanUtil {
   private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
-    Convention.get(i).batchType == 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    Convention.get(i).batchType == Backend.get().batchType
   }
 
   def isGlutenTableCache(plan: SparkPlan): Boolean = {
@@ -44,6 +44,6 @@ object PlanUtil {
   }
 
   def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
-    Convention.get(plan).batchType == 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    Convention.get(plan).batchType == Backend.get().batchType
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
index 2cb077d50..cd063ce31 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.utils
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -26,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 
 object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
+  val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
+    !shouldUseGluten(session, plan)
+
   override protected def validate(plan: SparkPlan): Boolean = {
     BackendsApiManager.getValidatorApiInstance.doSparkPlanValidate(plan)
   }
@@ -55,7 +59,7 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends 
Logging {
 
   protected def validate(plan: T): Boolean
 
-  private[this] def shouldUseGluten(session: SparkSession, plan: T): Boolean = 
{
+  def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
     val glutenEnabled = session.conf
       .get(GlutenConfig.GLUTEN_ENABLE_KEY, 
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
       .toBoolean && isGlutenEnabledForCurrentThread(session)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
deleted file mode 100644
index 721711af5..000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.listener
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
-
-import org.apache.spark.SparkContext
-
-object GlutenListenerFactory {
-  def addToSparkListenerBus(sc: SparkContext): Unit = {
-    if (
-      sc.getConf.getBoolean(
-        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
-        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE
-      )
-    ) {
-      sc.listenerBus.addToStatusQueue(new SoftAffinityListener())
-    }
-  }
-}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index e5925e3ac..c57df192c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution._
 import org.apache.gluten.extension.columnar.transition.Convention
@@ -165,7 +166,7 @@ case class ColumnarInputAdapter(child: SparkPlan)
   override def output: Seq[Attribute] = child.output
   override def supportsColumnar: Boolean = true
   override def batchType(): Convention.BatchType =
-    BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    Backend.get().batchType
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
   override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 67ecf81b9..f6e23e7cf 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.events.GlutenPlanFallbackEvent
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.FallbackTags
-import org.apache.gluten.utils.LogLevelUtil
+import org.apache.gluten.logging.LogLevelUtil
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
rename to 
gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index ea3e50e81..9337317d9 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.softaffinity
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.GlutenPartition
 import org.apache.gluten.softaffinity.SoftAffinityManager
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanBuilder
 
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 55f25309d..c91df0b6d 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -19,12 +19,11 @@ package org.apache.spark.softaffinity
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.GlutenPartition
 import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanBuilder
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.{SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerStageCompleted, 
SparkListenerStageSubmitted, SparkListenerTaskEnd, StageInfo, TaskInfo, 
TaskLocality}
+import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.InternalRow
diff --git 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
index 2d61f0c4b..245c678fe 100644
--- 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
+++ 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
@@ -26,12 +26,6 @@ object GlutenEventUtils {
     sc.listenerBus.post(event)
   }
 
-  def registerListener(sc: SparkContext): Unit = {
-    val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
-    val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore)
-    sc.listenerBus.addToStatusQueue(listener)
-  }
-
   def attachUI(sc: SparkContext): Unit = {
     val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
     val statusStore = new GlutenSQLAppStatusStore(kvStore)
diff --git 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
index 7c236b4e8..b9a590f67 100644
--- 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
+++ 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.ui
 
 import org.apache.gluten.events.{GlutenBuildInfoEvent, GlutenPlanFallbackEvent}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -102,3 +102,11 @@ class GlutenSQLAppStatusListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
     toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId))
   }
 }
+
+object GlutenSQLAppStatusListener {
+  def register(sc: SparkContext): Unit = {
+    val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+    val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore)
+    sc.listenerBus.addToStatusQueue(listener)
+  }
+}
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 5d171a36b..82d37b8ca 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -165,6 +165,7 @@ private object FallbackStrategiesSuite {
       transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
     new HeuristicApplier(
       spark,
+      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    assert(
-      
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index a4da5c127..866c16d52 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -176,6 +176,7 @@ private object FallbackStrategiesSuite {
       transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
     new HeuristicApplier(
       spark,
+      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    assert(
-      
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index a4da5c127..866c16d52 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -176,6 +176,7 @@ private object FallbackStrategiesSuite {
       transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
     new HeuristicApplier(
       spark,
+      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    assert(
-      
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index bbdeebfe6..6318c0e06 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -177,6 +177,7 @@ private object FallbackStrategiesSuite {
       transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
     new HeuristicApplier(
       spark,
+      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    assert(
-      
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 927a39332..1d3c8e45d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -406,6 +406,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def debug: Boolean = conf.getConf(DEBUG_ENABLED)
   def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
+  def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
   def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
   def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID)
   def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID)
@@ -835,7 +836,7 @@ object GlutenConfig {
       .createWithDefault(true)
 
   val VANILLA_VECTORIZED_READERS_ENABLED =
-    buildConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders")
+    buildStaticConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders")
       .internal()
       .doc("Enable or disable vanilla vectorized scan.")
       .booleanConf
@@ -1622,6 +1623,12 @@ object GlutenConfig {
       .stringConf
       .createWithDefault("/tmp")
 
+  val UT_STATISTIC =
+    buildStaticConf("spark.gluten.sql.ut.statistic")
+      .internal()
+      .booleanConf
+      .createWithDefault(false)
+
   val BENCHMARK_TASK_STAGEID =
     buildConf("spark.gluten.sql.benchmark_task.stageId")
       .internal()
@@ -1680,12 +1687,6 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
-  val UT_STATISTIC =
-    buildConf("spark.gluten.sql.ut.statistic")
-      .internal()
-      .booleanConf
-      .createWithDefault(false)
-
   // FIXME: This only works with CH backend.
   val EXTENDED_COLUMNAR_TRANSFORM_RULES =
     buildConf("spark.gluten.sql.columnar.extended.columnar.transform.rules")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to