This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 619624a06a [VL] Rework the implementation of spark.gluten.enabled
(#7672)
619624a06a is described below
commit 619624a06a578290f5996511ec08595f2590aca4
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 25 19:09:36 2024 +0800
[VL] Rework the implementation of spark.gluten.enabled (#7672)
---
.../spark/sql/delta/commands/VacuumCommand.scala | 58 ++++----
.../spark/sql/delta/commands/VacuumCommand.scala | 13 +-
.../spark/sql/delta/commands/VacuumCommand.scala | 11 +-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 3 -
.../extension/CHAQEPropagateEmptyRelation.scala | 4 +-
.../extension/FallbackBroadcastHashJoinRules.scala | 3 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 3 -
.../gluten/datasource/ArrowConvertorRule.scala | 3 +-
.../gluten/extension/ArrowScanReplaceRule.scala | 3 +-
.../BloomFilterMightContainJointRewriteRule.scala | 3 +-
.../gluten/extension/CollectRewriteRule.scala | 3 +-
.../apache/gluten/extension/HLLRewriteRule.scala | 3 +-
.../gluten/execution/MiscOperatorSuite.scala | 70 +++++----
.../gluten/extension/GlutenSessionExtensions.scala | 28 +++-
.../extension/columnar/ColumnarRuleApplier.scala | 9 --
.../columnar/enumerated/EnumeratedApplier.scala | 10 +-
.../columnar/heuristic/HeuristicApplier.scala | 16 +--
.../gluten/extension/injector/GlutenInjector.scala | 49 +++----
.../extension/injector/InjectorControl.scala | 160 +++++++++++++++++++++
.../gluten/extension/injector/RuleInjector.scala | 5 +-
.../gluten/extension/injector/SparkInjector.scala | 43 +++---
.../apache/gluten/extension/injector/package.scala | 35 +++++
.../gluten/execution/VeloxIcebergSuite.scala | 16 +--
.../apache/gluten/utils/QueryPlanSelector.scala | 80 -----------
.../sql/execution/FallbackStrategiesSuite.scala | 15 +-
.../extension/GlutenSessionExtensionSuite.scala | 9 +-
.../sql/execution/FallbackStrategiesSuite.scala | 14 +-
.../extension/GlutenSessionExtensionSuite.scala | 9 +-
.../sql/execution/FallbackStrategiesSuite.scala | 14 +-
.../extension/GlutenSessionExtensionSuite.scala | 9 +-
.../sql/execution/FallbackStrategiesSuite.scala | 14 +-
.../extension/GlutenSessionExtensionSuite.scala | 9 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 4 +-
33 files changed, 423 insertions(+), 305 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index c5527933b2..939e6bcbf2 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta.commands
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta._
@@ -145,9 +145,11 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
val originalEnabledGluten =
-
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext.setLocalProperty(
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+ "false")
// --- modified end
val validFiles = snapshot.stateDS
@@ -284,31 +286,37 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
} else {
allFilesAndDirs
.where('modificationTime < deleteBeforeTimestamp || 'isDir)
- .mapPartitions { fileStatusIterator =>
- val reservoirBase = new Path(basePath)
- val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
- fileStatusIterator.flatMap { fileStatus =>
- if (fileStatus.isDir) {
- Iterator.single(relativize(fileStatus.getPath, fs,
reservoirBase, isDir = true))
- } else {
- val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
- val dirsWithSlash = dirs.map { p =>
- relativize(new Path(p), fs, reservoirBase, isDir = true)
- }
- dirsWithSlash ++ Iterator(
- relativize(new Path(fileStatus.path), fs, reservoirBase,
isDir = false))
+ .mapPartitions {
+ fileStatusIterator =>
+ val reservoirBase = new Path(basePath)
+ val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
+ fileStatusIterator.flatMap {
+ fileStatus =>
+ if (fileStatus.isDir) {
+ Iterator.single(
+ relativize(fileStatus.getPath, fs, reservoirBase,
isDir = true))
+ } else {
+ val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
+ val dirsWithSlash = dirs.map {
+ p => relativize(new Path(p), fs, reservoirBase, isDir
= true)
+ }
+ dirsWithSlash ++ Iterator(
+ relativize(new Path(fileStatus.path), fs,
reservoirBase, isDir = false))
+ }
}
- }
- }.groupBy($"value" as 'path)
+ }
+ .groupBy($"value".as('path))
.count()
.join(validFiles, Seq("path"), "leftanti")
.where('count === 1)
.select('path)
.as[String]
- .map { relativePath =>
- assert(!stringToPath(relativePath).isAbsolute,
- "Shouldn't have any absolute paths for deletion here.")
- pathToString(DeltaFileOperations.absolutePath(basePath,
relativePath))
+ .map {
+ relativePath =>
+ assert(
+ !stringToPath(relativePath).isAbsolute,
+ "Shouldn't have any absolute paths for deletion here.")
+ pathToString(DeltaFileOperations.absolutePath(basePath,
relativePath))
}
}
// --- modified end
@@ -371,10 +379,12 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+ originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+ "true")
}
// --- modified end
}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 9f82feeee2..e59645f58c 100644
---
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -21,19 +21,16 @@ package org.apache.spark.sql.delta.commands
import java.net.URI
import java.util.Date
import java.util.concurrent.TimeUnit
-
import scala.collection.JavaConverters._
-
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import
org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-
-import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
@@ -161,9 +158,9 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
val originalEnabledGluten =
-
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
// --- modified end
val validFiles = snapshot.stateDS
@@ -362,10 +359,10 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 9f455fb27b..5d05bdb868 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -28,8 +28,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import
org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
-
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
@@ -254,9 +253,9 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
val originalEnabledGluten =
-
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
// --- modified end
val validFiles =
@@ -461,10 +460,10 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
- QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+ GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 9a1ead1e6c..4323dc9558 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -26,7 +26,6 @@ import org.apache.gluten.extension.injector.{RuleInjector,
SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector,
RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser,
GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -39,8 +38,6 @@ import org.apache.spark.util.SparkPlanRules
class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
- injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
-
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
index 6f5afa9726..6d99a2ceec 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.extension
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.LeftAnti
@@ -28,8 +27,7 @@ import
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
ClickHouseBuildSideRelation}
case class CHAQEPropagateEmptyRelation(session: SparkSession) extends
Rule[SparkPlan] {
-
- def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session,
plan) {
+ def apply(plan: SparkPlan): SparkPlan = {
if (!(session.conf.get(CHBackendSettings.GLUTEN_AQE_PROPAGATEEMPTY,
"true").toBoolean)) {
plan
} else {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
index 842dc76153..ec465a3c15 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
import
org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
@@ -38,7 +37,7 @@ import scala.util.control.Breaks.{break, breakable}
// to columnar while BHJ fallbacks, BroadcastExec need to be tagged not
transformable when applying
// queryStagePrepRules.
case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession)
extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan =
PhysicalPlanSelector.maybe(session, plan) {
+ override def apply(plan: SparkPlan): SparkPlan = {
val columnarConf: GlutenConfig = GlutenConfig.getConf
plan.foreach {
case bhj: BroadcastHashJoinExec =>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 7cddba157d..a838c463c3 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -27,7 +27,6 @@ import
org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector,
RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter}
@@ -35,8 +34,6 @@ class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
- injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
-
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
index c4684e5a4b..b1b0b813f6 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.datasource
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.datasource.v2.ArrowCSVTable
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogicalPlanSelector
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.SparkSession
@@ -40,7 +39,7 @@ import scala.collection.convert.ImplicitConversions.`map
AsScala`
@Experimental
case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan]
{
- override def apply(plan: LogicalPlan): LogicalPlan =
LogicalPlanSelector.maybe(session, plan) {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
if (!BackendsApiManager.getSettings.enableNativeArrowReadFiles()) {
return plan
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
index dba8df5cf1..adfc6ca742 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.extension
import org.apache.gluten.datasource.ArrowCSVFileFormat
import org.apache.gluten.datasource.v2.ArrowCSVScan
import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
@@ -27,7 +26,7 @@ import
org.apache.spark.sql.execution.{ArrowFileSourceScanExec, FileSourceScanEx
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan =
PhysicalPlanSelector.maybe(spark, plan) {
+ override def apply(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case plan: FileSourceScanExec if
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
ArrowFileSourceScanExec(plan)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
index 735d6ad41b..56a3d86a90 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
@@ -20,14 +20,13 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.expression.VeloxBloomFilterMightContain
import org.apache.gluten.expression.aggregate.VeloxBloomFilterAggregate
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
case class BloomFilterMightContainJointRewriteRule(spark: SparkSession)
extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan =
PhysicalPlanSelector.maybe(spark, plan) {
+ override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.getConf.enableNativeBloomFilter) {
return plan
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
index 85d31b8d02..48541b234e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension
import org.apache.gluten.expression.ExpressionMappings
import org.apache.gluten.expression.aggregate.{VeloxCollectList,
VeloxCollectSet}
-import org.apache.gluten.utils.LogicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, WindowExpression}
@@ -36,7 +35,7 @@ import scala.reflect.{classTag, ClassTag}
*/
case class CollectRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
import CollectRewriteRule._
- override def apply(plan: LogicalPlan): LogicalPlan =
LogicalPlanSelector.maybe(spark, plan) {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
if (!has[VeloxCollectSet] && !has[VeloxCollectList]) {
return plan
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
index 8b44005646..8ceee3d573 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
import org.apache.gluten.expression.aggregate.HLLAdapter
-import org.apache.gluten.utils.LogicalPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Literal
@@ -29,7 +28,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXP
import org.apache.spark.sql.types._
case class HLLRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan =
LogicalPlanSelector.maybe(spark, plan) {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) {
case a: Aggregate =>
a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index ba42b57f1b..7ddab8769f 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -64,35 +64,6 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
.set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
}
- test("field names contain non-ASCII characters") {
- withTempPath {
- path =>
- // scalastyle:off nonascii
- Seq((1, 2, 3, 4)).toDF("товары", "овары", "国ⅵ",
"中文").write.parquet(path.getCanonicalPath)
- // scalastyle:on
-
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
- runQueryAndCompare("select * from view") {
- checkGlutenOperatorMatch[FileSourceScanExecTransformer]
- }
- }
-
- withTempPath {
- path =>
- // scalastyle:off nonascii
- spark.range(10).toDF("中文").write.parquet(path.getCanonicalPath)
-
spark.read.parquet(path.getCanonicalPath).filter("`中文`>1").createOrReplaceTempView("view")
- // scalastyle:on
- runQueryAndCompare("select * from view") {
- checkGlutenOperatorMatch[FileSourceScanExecTransformer]
- }
- }
- }
-
- test("simple_select") {
- val df = runQueryAndCompare("select * from lineitem limit 1") { _ => }
- checkLengthAndPlan(df, 1)
- }
-
test("select_part_column") {
val df = runQueryAndCompare("select l_shipdate, l_orderkey from lineitem
limit 1") {
df =>
@@ -1924,4 +1895,45 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
}
+
+ // Since https://github.com/apache/incubator-gluten/pull/7330.
+ test("field names contain non-ASCII characters") {
+ withTempPath {
+ path =>
+ // scalastyle:off nonascii
+ Seq((1, 2, 3, 4)).toDF("товары", "овары", "国ⅵ",
"中文").write.parquet(path.getCanonicalPath)
+ // scalastyle:on
+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
+ runQueryAndCompare("select * from view") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ }
+
+ withTempPath {
+ path =>
+ // scalastyle:off nonascii
+ spark.range(10).toDF("中文").write.parquet(path.getCanonicalPath)
+
spark.read.parquet(path.getCanonicalPath).filter("`中文`>1").createOrReplaceTempView("view")
+ // scalastyle:on
+ runQueryAndCompare("select * from view") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ }
+ }
+
+ test("test 'spark.gluten.enabled'") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "true") {
+ runQueryAndCompare("select * from lineitem limit 1") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ runQueryAndCompare("select * from lineitem limit 1") {
+ checkSparkOperatorMatch[FileSourceScanExec]
+ }
+ }
+ runQueryAndCompare("select * from lineitem limit 1") {
+ checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+ }
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 697b41da9e..5c8b2260d3 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -16,19 +16,43 @@
*/
package org.apache.gluten.extension
+import org.apache.gluten.GlutenConfig
import org.apache.gluten.backend.Backend
import org.apache.gluten.extension.injector.RuleInjector
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSessionExtensions
-private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions
=> Unit) {
+private[gluten] class GlutenSessionExtensions
+ extends (SparkSessionExtensions => Unit)
+ with Logging {
+ import GlutenSessionExtensions._
override def apply(exts: SparkSessionExtensions): Unit = {
val injector = new RuleInjector(exts)
+ injector.control.disableOn {
+ session =>
+ val glutenEnabledGlobally = session.conf
+ .get(GlutenConfig.GLUTEN_ENABLED_KEY,
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
+ .toBoolean
+ val disabled = !glutenEnabledGlobally
+ logDebug(s"Gluten is disabled by variable: glutenEnabledGlobally:
$glutenEnabledGlobally")
+ disabled
+ }
+ injector.control.disableOn {
+ session =>
+ val glutenEnabledForThread =
+
Option(session.sparkContext.getLocalProperty(GLUTEN_ENABLE_FOR_THREAD_KEY))
+ .forall(_.toBoolean)
+ val disabled = !glutenEnabledForThread
+ logDebug(s"Gluten is disabled by variable: glutenEnabledForThread:
$glutenEnabledForThread")
+ disabled
+ }
Backend.get().injectRules(injector)
injector.inject()
}
}
-private[gluten] object GlutenSessionExtensions {
+object GlutenSessionExtensions {
val GLUTEN_SESSION_EXTENSION_NAME: String =
classOf[GlutenSessionExtensions].getCanonicalName
+ val GLUTEN_ENABLE_FOR_THREAD_KEY: String = "gluten.enabledForCurrentThread"
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index d275c58564..ecf13967e3 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
trait ColumnarRuleApplier {
@@ -28,8 +27,6 @@ trait ColumnarRuleApplier {
}
object ColumnarRuleApplier {
- type ColumnarRuleBuilder = ColumnarRuleCall => Rule[SparkPlan]
-
class ColumnarRuleCall(
val session: SparkSession,
val ac: AdaptiveContext,
@@ -38,10 +35,4 @@ object ColumnarRuleApplier {
new GlutenConfig(session.sessionState.conf)
}
}
-
- // A temporary workaround for applying toggle `spark.gluten.enabled`, to be
removed.
- trait SkipCondition {
- // True if the rule execution should be skipped.
- def skip(session: SparkSession, plan: SparkPlan): Boolean
- }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index c4d53653c0..7ddeb33c7d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -16,8 +16,8 @@
*/
package org.apache.gluten.extension.columnar.enumerated
-import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.columnar.{ColumnarRuleApplier,
ColumnarRuleExecutor}
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.gluten.logging.LogLevelUtil
@@ -38,17 +38,13 @@ import org.apache.spark.sql.execution.SparkPlan
@Experimental
class EnumeratedApplier(
session: SparkSession,
- skipConditions: Seq[SkipCondition],
- ruleBuilders: Seq[ColumnarRuleBuilder])
+ ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
- if (skipConditions.exists(_.skip(session, plan))) {
- return plan
- }
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
val finalPlan = maybeAqe {
apply0(ruleBuilders.map(b => b(call)), plan)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index a039e9a562..e4825d2eb7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -16,8 +16,8 @@
*/
package org.apache.gluten.extension.columnar.heuristic
-import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.columnar.{ColumnarRuleApplier,
ColumnarRuleExecutor}
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.gluten.logging.LogLevelUtil
@@ -32,20 +32,16 @@ import org.apache.spark.sql.execution.SparkPlan
*/
class HeuristicApplier(
session: SparkSession,
- skipConditions: Seq[SkipCondition],
- transformBuilders: Seq[ColumnarRuleBuilder],
- fallbackPolicyBuilders: Seq[ColumnarRuleBuilder],
- postBuilders: Seq[ColumnarRuleBuilder],
- finalBuilders: Seq[ColumnarRuleBuilder])
+ transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+ fallbackPolicyBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+ postBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+ finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
- if (skipConditions.exists(_.skip(session, plan))) {
- return plan
- }
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
makeRule(call).apply(plan)
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index db3310151f..498f040c90 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -19,67 +19,62 @@ package org.apache.gluten.extension.injector
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.extension.columnar.ColumnarRuleApplier
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
SkipCondition}
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
import scala.collection.mutable
/** Injector used to inject query planner rules into Gluten. */
-class GlutenInjector private[injector] {
+class GlutenInjector private[injector] (control: InjectorControl) {
import GlutenInjector._
- private val skipConditions: mutable.ListBuffer[SkipCondition] =
mutable.ListBuffer()
val legacy: LegacyInjector = new LegacyInjector()
val ras: RasInjector = new RasInjector()
private[injector] def inject(extensions: SparkSessionExtensions): Unit = {
- extensions.injectColumnar(session => new GlutenColumnarRule(session,
applier))
+ extensions.injectColumnar(
+ control.disabler().wrapColumnarRule(s => new GlutenColumnarRule(s,
applier)))
}
private def applier(session: SparkSession): ColumnarRuleApplier = {
val conf = new GlutenConfig(session.sessionState.conf)
if (conf.enableRas) {
- return ras.createApplier(session, skipConditions.toSeq)
+ return ras.createApplier(session)
}
- legacy.createApplier(session, skipConditions.toSeq)
- }
-
- def skipOn(skipCondition: SkipCondition): Unit = {
- skipConditions += skipCondition
+ legacy.createApplier(session)
}
}
object GlutenInjector {
class LegacyInjector {
- private val transformBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
- private val fallbackPolicyBuilders =
mutable.Buffer.empty[ColumnarRuleBuilder]
- private val postBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
- private val finalBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
+ private val transformBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
+ private val fallbackPolicyBuilders = mutable.Buffer.empty[ColumnarRuleCall
=> Rule[SparkPlan]]
+ private val postBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
+ private val finalBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
- def injectTransform(builder: ColumnarRuleBuilder): Unit = {
+ def injectTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
transformBuilders += builder
}
- def injectFallbackPolicy(builder: ColumnarRuleBuilder): Unit = {
+ def injectFallbackPolicy(builder: ColumnarRuleCall => Rule[SparkPlan]):
Unit = {
fallbackPolicyBuilders += builder
}
- def injectPost(builder: ColumnarRuleBuilder): Unit = {
+ def injectPost(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
postBuilders += builder
}
- def injectFinal(builder: ColumnarRuleBuilder): Unit = {
+ def injectFinal(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
finalBuilders += builder
}
- private[injector] def createApplier(
- session: SparkSession,
- skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
+ private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
new HeuristicApplier(
session,
- skipConditions,
transformBuilders.toSeq,
fallbackPolicyBuilders.toSeq,
postBuilders.toSeq,
@@ -88,16 +83,14 @@ object GlutenInjector {
}
class RasInjector {
- private val ruleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
+ private val ruleBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
- def inject(builder: ColumnarRuleBuilder): Unit = {
+ def inject(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
ruleBuilders += builder
}
- private[injector] def createApplier(
- session: SparkSession,
- skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
- new EnumeratedApplier(session, skipConditions, ruleBuilders.toSeq)
+ private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
+ new EnumeratedApplier(session, ruleBuilders.toSeq)
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
new file mode 100644
index 0000000000..fd16d3dbad
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.injector
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
+
+import java.lang.reflect.{InvocationHandler, InvocationTargetException, Method}
+
+import scala.collection.mutable
+
+class InjectorControl private[injector] () {
+ import InjectorControl._
+ private val disablerBuffer: mutable.ListBuffer[Disabler] =
+ mutable.ListBuffer()
+ private var combined: Disabler = (_: SparkSession) => false
+
+ def disableOn(one: Disabler): Unit = synchronized {
+ disablerBuffer += one
+ // Update the combined disabler.
+ val disablerList = disablerBuffer.toList
+ combined = s => disablerList.exists(_.disabled(s))
+ }
+
+ private[injector] def disabler(): Disabler = synchronized {
+ combined
+ }
+}
+
+object InjectorControl {
+ trait Disabler {
+ // If true, the injected rule will be disabled.
+ protected[injector] def disabled(session: SparkSession): Boolean
+ }
+
+ private object Disabler {
+ implicit private[injector] class DisablerOps(disabler: Disabler) {
+ def wrapRule[TreeType <: TreeNode[_]](
+ ruleBuilder: SparkSession => Rule[TreeType]): SparkSession =>
Rule[TreeType] = session =>
+ {
+ val rule = ruleBuilder(session)
+ new Rule[TreeType] with DisablerAware {
+ override val ruleName: String = rule.ruleName
+ override def apply(plan: TreeType): TreeType = {
+ if (disabler.disabled(session)) {
+ return plan
+ }
+ rule(plan)
+ }
+ }
+ }
+
+ def wrapStrategy(strategyBuilder: StrategyBuilder): StrategyBuilder =
session => {
+ val strategy = strategyBuilder(session)
+ new Strategy with DisablerAware {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ if (disabler.disabled(session)) {
+ return Nil
+ }
+ strategy(plan)
+ }
+ }
+ }
+
+ def wrapParser(parserBuilder: ParserBuilder): ParserBuilder = (session,
parser) => {
+ val before = parser
+ val after = parserBuilder(session, before)
+ // Use dynamic proxy to get rid of 3.2 compatibility issues.
+ java.lang.reflect.Proxy
+ .newProxyInstance(
+ classOf[ParserInterface].getClassLoader,
+ Array(classOf[ParserInterface], classOf[DisablerAware]),
+ new InvocationHandler {
+ override def invoke(proxy: Any, method: Method, args:
Array[AnyRef]): AnyRef = {
+ try {
+ if (disabler.disabled(session)) {
+ return method.invoke(before, args: _*)
+ }
+ method.invoke(after, args: _*)
+ } catch {
+ case e: InvocationTargetException =>
+ // Unwrap the UTE.
+ throw e.getCause
+ }
+ }
+ }
+ )
+ .asInstanceOf[ParserInterface]
+ }
+
+ def wrapFunction(functionDescription: FunctionDescription):
FunctionDescription = {
+ val (identifier, info, builder) = functionDescription
+ val wrappedBuilder: FunctionBuilder = children => {
+ if (
+ disabler.disabled(SparkSession.getActiveSession.getOrElse(
+ throw new IllegalStateException("Active Spark session not
found")))
+ ) {
+ throw new UnsupportedOperationException(
+ s"Function ${info.getName} is not callable as Gluten is
disabled")
+ }
+ builder(children)
+ }
+ (identifier, info, wrappedBuilder)
+ }
+
+ def wrapColumnarRule(columnarRuleBuilder: ColumnarRuleBuilder):
ColumnarRuleBuilder =
+ session => {
+ val columnarRule = columnarRuleBuilder(session)
+ new ColumnarRule with DisablerAware {
+ override val preColumnarTransitions: Rule[SparkPlan] = {
+ new Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (disabler.disabled(session)) {
+ return plan
+ }
+ columnarRule.preColumnarTransitions.apply(plan)
+ }
+ }
+ }
+
+ override val postColumnarTransitions: Rule[SparkPlan] = {
+ new Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (disabler.disabled(session)) {
+ return plan
+ }
+ columnarRule.postColumnarTransitions.apply(plan)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * The entity (could be a rule, a parser, cost evaluator) that is
dynamically injected to Spark,
+ * whose effectivity is under the control by a disabler.
+ */
+ trait DisablerAware
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
index 60a649387d..c497a24a07 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
@@ -20,8 +20,9 @@ import org.apache.spark.sql.SparkSessionExtensions
/** Injector used to inject query planner rules into Spark and Gluten. */
class RuleInjector(extensions: SparkSessionExtensions) {
- val spark: SparkInjector = new SparkInjector(extensions)
- val gluten: GlutenInjector = new GlutenInjector()
+ val control = new InjectorControl()
+ val spark: SparkInjector = new SparkInjector(control, extensions)
+ val gluten: GlutenInjector = new GlutenInjector(control)
private[extension] def inject(): Unit = {
// The regular Spark rules already injected with the `injectRules` of
`RuleApi` directly.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
index 847a9349e4..87942c4155 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
@@ -16,44 +16,37 @@
*/
package org.apache.gluten.extension.injector
-import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy}
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.SparkSessionExtensions
/** Injector used to inject query planner rules into Spark. */
-class SparkInjector private[injector] (extensions: SparkSessionExtensions) {
-
- def injectQueryStagePrepRule(builder: SparkSession => Rule[SparkPlan]): Unit
= {
- extensions.injectQueryStagePrepRule(builder)
+class SparkInjector private[injector] (
+ control: InjectorControl,
+ extensions: SparkSessionExtensions) {
+ def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
+ extensions.injectQueryStagePrepRule(control.disabler().wrapRule(builder))
}
- def injectResolutionRule(builder: SparkSession => Rule[LogicalPlan]): Unit =
{
- extensions.injectResolutionRule(builder)
+ def injectResolutionRule(builder: RuleBuilder): Unit = {
+ extensions.injectResolutionRule(control.disabler().wrapRule(builder))
}
- def injectPostHocResolutionRule(builder: SparkSession => Rule[LogicalPlan]):
Unit = {
- extensions.injectPostHocResolutionRule(builder)
+ def injectPostHocResolutionRule(builder: RuleBuilder): Unit = {
+
extensions.injectPostHocResolutionRule(control.disabler().wrapRule(builder))
}
- def injectOptimizerRule(builder: SparkSession => Rule[LogicalPlan]): Unit = {
- extensions.injectOptimizerRule(builder)
+ def injectOptimizerRule(builder: RuleBuilder): Unit = {
+ extensions.injectOptimizerRule(control.disabler().wrapRule(builder))
}
- def injectPlannerStrategy(builder: SparkSession => Strategy): Unit = {
- extensions.injectPlannerStrategy(builder)
+ def injectPlannerStrategy(builder: StrategyBuilder): Unit = {
+ extensions.injectPlannerStrategy(control.disabler().wrapStrategy(builder))
}
- def injectParser(builder: (SparkSession, ParserInterface) =>
ParserInterface): Unit = {
- extensions.injectParser(builder)
+ def injectParser(builder: ParserBuilder): Unit = {
+ extensions.injectParser(control.disabler().wrapParser(builder))
}
- def injectFunction(
- functionDescription: (FunctionIdentifier, ExpressionInfo,
FunctionBuilder)): Unit = {
- extensions.injectFunction(functionDescription)
+ def injectFunction(functionDescription: FunctionDescription): Unit = {
+
extensions.injectFunction(control.disabler().wrapFunction(functionDescription))
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala
new file mode 100644
index 0000000000..7262dbdc2e
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
+
+package object injector {
+ type RuleBuilder = SparkSession => Rule[LogicalPlan]
+ type StrategyBuilder = SparkSession => Strategy
+ type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
+ type FunctionDescription = (FunctionIdentifier, ExpressionInfo,
FunctionBuilder)
+ type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
+ type ColumnarRuleBuilder = SparkSession => ColumnarRule
+}
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 5ebf8883c6..0d063d22f8 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -63,7 +63,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p string)
@@ -84,7 +84,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
// Partition key of integer type.
withSQLConf(
- GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(s"""
@@ -143,7 +143,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p int)
@@ -164,7 +164,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
// Partition key of integer type.
withSQLConf(
- GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(s"""
@@ -223,7 +223,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p int)
@@ -244,7 +244,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
// Partition key of integer type.
withSQLConf(
- GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(s"""
@@ -338,7 +338,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
- withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
spark.sql("""
|create table iceberg_mor_tb (
| id int,
@@ -390,7 +390,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
test("iceberg read mor table - merge into") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
- withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
spark.sql("""
|create table iceberg_mor_tb (
| id int,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
deleted file mode 100644
index 2d8d7b29e4..0000000000
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.utils
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-
-object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
- val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
- !shouldUseGluten(session, plan)
-}
-
-object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {}
-
-/** Select to decide whether a Spark plan can be accepted by Gluten for
further execution. */
-abstract class QueryPlanSelector[T <: QueryPlan[_]] extends Logging {
-
- private[this] def stackTrace(max: Int = 5): String = {
- val trim: Int = 6
- new Throwable().getStackTrace().slice(trim, trim + max).mkString("\n")
- }
-
- private def isGlutenEnabledForCurrentThread(session: SparkSession): Boolean
= {
- val enabled =
-
session.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
- if (enabled != null) {
- enabled.toBoolean
- } else {
- true
- }
- }
-
- def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
- val glutenEnabled = session.conf
- .get(GlutenConfig.GLUTEN_ENABLE_KEY,
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
- .toBoolean && isGlutenEnabledForCurrentThread(session)
- if (log.isDebugEnabled) {
- logDebug(s"shouldUseGluten: $glutenEnabled")
- logDebug(
- s"=========================\n" +
- s"running shouldUseGluten from:\n${stackTrace()}\n" +
- s"plan:\n${plan.treeString}\n" +
- "=========================")
- }
- glutenEnabled
- }
-
- def maybe(session: SparkSession, plan: T)(func: => T): T = {
- if (shouldUseGluten(session, plan)) func else plan
- }
-
- def maybeNil(session: SparkSession, plan: T)(func: => Seq[SparkPlan]):
Seq[SparkPlan] = {
- if (shouldUseGluten(session, plan)) func else Nil
- }
-}
-
-object QueryPlanSelector {
- // control the usage of gluten at thread level
- val GLUTEN_ENABLE_FOR_THREAD_KEY = "gluten.enabledForCurrentThread"
-}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 82d37b8ca1..fc31289119 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,18 +18,18 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
RemoveFallbackTagRule}
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
@@ -142,14 +142,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
val thread = new Thread(
() => {
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
val fallbackScanExec = fallbackPlan.collect {
case e: FileSourceScanExec if
!e.isInstanceOf[BasicScanExecTransformer] => true
}
assert(fallbackScanExec.size == 1)
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
null)
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
val noFallbackScanExec = noFallbackPlan.collect { case _:
BasicScanExecTransformer => true }
assert(noFallbackScanExec.size == 1)
@@ -162,10 +164,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
- transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+ transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]):
HeuristicApplier = {
new HeuristicApplier(
spark,
- Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
-
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+ assert(
+ spark.sessionState.columnarRules
+ .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
if (BackendTestUtils.isCHBackendLoaded()) {
- assert(
-
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
} else {
assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 866c16d52f..3e77672131 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -153,14 +152,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
val thread = new Thread(
() => {
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
val fallbackScanExec = fallbackPlan.collect {
case e: FileSourceScanExec if
!e.isInstanceOf[BasicScanExecTransformer] => true
}
assert(fallbackScanExec.size == 1)
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
null)
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
val noFallbackScanExec = noFallbackPlan.collect { case _:
BasicScanExecTransformer => true }
assert(noFallbackScanExec.size == 1)
@@ -173,10 +174,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
- transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+ transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]):
HeuristicApplier = {
new HeuristicApplier(
spark,
- Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
-
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+ assert(
+ spark.sessionState.columnarRules
+ .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
if (BackendTestUtils.isCHBackendLoaded()) {
- assert(
-
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
} else {
assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 866c16d52f..3e77672131 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -153,14 +152,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
val thread = new Thread(
() => {
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
val fallbackScanExec = fallbackPlan.collect {
case e: FileSourceScanExec if
!e.isInstanceOf[BasicScanExecTransformer] => true
}
assert(fallbackScanExec.size == 1)
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
null)
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
val noFallbackScanExec = noFallbackPlan.collect { case _:
BasicScanExecTransformer => true }
assert(noFallbackScanExec.size == 1)
@@ -173,10 +174,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
- transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+ transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]):
HeuristicApplier = {
new HeuristicApplier(
spark,
- Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
-
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+ assert(
+ spark.sessionState.columnarRules
+ .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
if (BackendTestUtils.isCHBackendLoaded()) {
- assert(
-
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
} else {
assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 6318c0e06b..a214d9755e 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -154,14 +153,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
val thread = new Thread(
() => {
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
val fallbackScanExec = fallbackPlan.collect {
case e: FileSourceScanExec if
!e.isInstanceOf[BasicScanExecTransformer] => true
}
assert(fallbackScanExec.size == 1)
-
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
null)
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
val noFallbackScanExec = noFallbackPlan.collect { case _:
BasicScanExecTransformer => true }
assert(noFallbackScanExec.size == 1)
@@ -174,10 +175,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
- transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+ transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]):
HeuristicApplier = {
new HeuristicApplier(
spark,
- Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
-
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+ assert(
+ spark.sessionState.columnarRules
+ .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
if (BackendTestUtils.isCHBackendLoaded()) {
- assert(
-
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
} else {
assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index de111d33ed..dfe7e4a082 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -471,7 +471,7 @@ object GlutenConfig {
import SQLConf._
var GLUTEN_ENABLE_BY_DEFAULT = true
- val GLUTEN_ENABLE_KEY = "spark.gluten.enabled"
+ val GLUTEN_ENABLED_KEY = "spark.gluten.enabled"
val GLUTEN_LIB_NAME = "spark.gluten.sql.columnar.libname"
val GLUTEN_LIB_PATH = "spark.gluten.sql.columnar.libpath"
val GLUTEN_EXECUTOR_LIB_PATH = "spark.gluten.sql.columnar.executor.libpath"
@@ -810,7 +810,7 @@ object GlutenConfig {
}
val GLUTEN_ENABLED =
- buildConf(GLUTEN_ENABLE_KEY)
+ buildConf(GLUTEN_ENABLED_KEY)
.internal()
.doc("Whether to enable gluten. Default value is true. Just an
experimental property." +
" Recommend to enable/disable Gluten through the setting for
spark.plugins.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]