This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f610059f1 [CORE] Rework Gluten + DPP compatibility (#6035)
f610059f1 is described below
commit f610059f1ded8c0942ee8dd0b82b0223d3180a40
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jun 12 10:22:26 2024 +0800
[CORE] Rework Gluten + DPP compatibility (#6035)
---
.../DataSourceScanTransformerRegister.scala | 8 +-
.../gluten/execution/ScanTransformerFactory.scala | 42 +++------
.../gluten/expression/ExpressionConverter.scala | 101 +-------------------
.../extension/columnar/MiscColumnarRules.scala | 102 ++++++++++++++++++++-
.../extension/columnar/OffloadSingleNode.scala | 68 ++------------
.../extension/columnar/TransformHintRule.scala | 2 +-
.../columnar/enumerated/EnumeratedApplier.scala | 5 +-
.../columnar/heuristic/HeuristicApplier.scala | 5 +-
.../execution/ColumnarSubqueryBroadcastExec.scala | 50 ++++------
.../gluten/execution/DeltaScanTransformer.scala | 6 +-
.../execution/DeltaScanTransformerProvider.scala | 6 +-
.../gluten/execution/IcebergScanTransformer.scala | 6 +-
.../execution/IcebergTransformerProvider.scala | 6 +-
13 files changed, 153 insertions(+), 254 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
index b899790c3..5b46c2385 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -44,15 +43,12 @@ trait DataSourceScanTransformerRegister {
val scanClassName: String
def createDataSourceTransformer(
- batchScan: FileSourceScanExec,
- newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase
= {
+ batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
throw new UnsupportedOperationException(
"This should not be called, please implement this method in child
class.");
}
- def createDataSourceV2Transformer(
- batchScan: BatchScanExec,
- newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
+ def createDataSourceV2Transformer(batchScan: BatchScanExec):
BatchScanExecTransformerBase = {
throw new UnsupportedOperationException(
"This should not be called, please implement this method in child
class.");
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index fc2b8f506..fcb9e983e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.columnar.TransformHints
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -37,14 +36,7 @@ object ScanTransformerFactory {
def createFileSourceScanTransformer(
scanExec: FileSourceScanExec,
- allPushDownFilters: Option[Seq[Expression]] = None,
- validation: Boolean = false): FileSourceScanExecTransformerBase = {
- // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in
partitionFilters
- val newPartitionFilters = if (validation) {
- scanExec.partitionFilters
- } else {
-
ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters)
- }
+ allPushDownFilters: Option[Seq[Expression]] = None):
FileSourceScanExecTransformerBase = {
val fileFormat = scanExec.relation.fileFormat
lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
case Some(clz) =>
@@ -52,13 +44,13 @@ object ScanTransformerFactory {
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[DataSourceScanTransformerRegister]
- .createDataSourceTransformer(scanExec, newPartitionFilters)
+ .createDataSourceTransformer(scanExec)
case _ =>
- new FileSourceScanExecTransformer(
+ FileSourceScanExecTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
- newPartitionFilters,
+ scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
allPushDownFilters.getOrElse(scanExec.dataFilters),
@@ -69,8 +61,7 @@ object ScanTransformerFactory {
}
private def lookupBatchScanTransformer(
- batchScanExec: BatchScanExec,
- newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
+ batchScanExec: BatchScanExec): BatchScanExecTransformerBase = {
val scan = batchScanExec.scan
lookupDataSourceScanTransformer(scan.getClass.getName) match {
case Some(clz) =>
@@ -78,14 +69,14 @@ object ScanTransformerFactory {
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[DataSourceScanTransformerRegister]
- .createDataSourceV2Transformer(batchScanExec, newPartitionFilters)
+ .createDataSourceV2Transformer(batchScanExec)
case _ =>
scan match {
case _: FileScan =>
- new BatchScanExecTransformer(
+ BatchScanExecTransformer(
batchScanExec.output,
batchScanExec.scan,
- newPartitionFilters,
+ batchScanExec.runtimeFilters,
table =
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
)
case _ =>
@@ -99,14 +90,7 @@ object ScanTransformerFactory {
allPushDownFilters: Option[Seq[Expression]] = None,
validation: Boolean = false): SparkPlan = {
if (supportedBatchScan(batchScan.scan)) {
- val newPartitionFilters = if (validation) {
- // No transformation is needed for DynamicPruningExpressions
- // during the validation process.
- batchScan.runtimeFilters
- } else {
-
ExpressionConverter.transformDynamicPruningExpr(batchScan.runtimeFilters)
- }
- val transformer = lookupBatchScanTransformer(batchScan,
newPartitionFilters)
+ val transformer = lookupBatchScanTransformer(batchScan)
if (!validation && allPushDownFilters.isDefined) {
transformer.setPushDownFilters(allPushDownFilters.get)
// Validate again if allPushDownFilters is defined.
@@ -125,12 +109,8 @@ object ScanTransformerFactory {
if (validation) {
throw new GlutenNotSupportException(s"Unsupported scan
${batchScan.scan}")
}
- // If filter expressions aren't empty, we need to transform the inner
operators,
- // and fallback the BatchScanExec itself.
- val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
- .transformDynamicPruningExpr(batchScan.runtimeFilters))
- TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec
is not supported.")
- newSource
+ TransformHints.tagNotTransformable(batchScan, "The scan in BatchScanExec
is not supported.")
+ batchScan
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 9ebe44f6c..5d0af9e52 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -16,10 +16,8 @@
*/
package org.apache.gluten.expression
-import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.test.TestStats
import org.apache.gluten.utils.DecimalArithmeticUtil
@@ -29,9 +27,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
-import org.apache.spark.sql.execution.{ScalarSubquery, _}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
+import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.hive.HiveUDFTransformer
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -680,99 +676,4 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
}
substraitExprName
}
-
- /**
- * Transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in
DynamicPruningExpression.
- *
- * @param partitionFilters
- * The partition filter of Scan
- * @return
- * Transformed partition filter
- */
- def transformDynamicPruningExpr(partitionFilters: Seq[Expression]):
Seq[Expression] = {
-
- def convertBroadcastExchangeToColumnar(
- exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
- val newChild = Transitions.toBackendBatchPlan(exchange.child)
- ColumnarBroadcastExchangeExec(exchange.mode, newChild)
- }
-
- if (
- GlutenConfig.getConf.enableScanOnly ||
!GlutenConfig.getConf.enableColumnarBroadcastExchange
- ) {
- // Disable ColumnarSubqueryBroadcast for scan-only execution
- // or ColumnarBroadcastExchange was disabled.
- partitionFilters
- } else {
- partitionFilters.map {
- case dynamicPruning: DynamicPruningExpression =>
- dynamicPruning.transform {
- // Lookup inside subqueries for duplicate exchanges.
- case in: InSubqueryExec =>
- in.plan match {
- case s: SubqueryBroadcastExec =>
- val newIn = s
- .transform {
- case exchange: BroadcastExchangeExec =>
- convertBroadcastExchangeToColumnar(exchange)
- }
- .asInstanceOf[SubqueryBroadcastExec]
- val transformSubqueryBroadcast =
ColumnarSubqueryBroadcastExec(
- newIn.name,
- newIn.index,
- newIn.buildKeys,
- newIn.child)
-
- // When AQE is on, spark will apply ReuseAdaptiveSubquery
rule first,
- // it will reuse vanilla SubqueryBroadcastExec,
- // and then use gluten ColumnarOverrides rule to transform
Subquery,
- // so all the SubqueryBroadcastExec in the
ReusedSubqueryExec will be transformed
- // to a new ColumnarSubqueryBroadcastExec for each
SubqueryBroadcastExec,
- // which will lead to execute
ColumnarSubqueryBroadcastExec.relationFuture
- // repeatedly even in the ReusedSubqueryExec.
- //
- // On the other hand, it needs to use
- // the AdaptiveSparkPlanExec.AdaptiveExecutionContext to
hold the reused map
- // for each query.
- newIn.child match {
- case a: AdaptiveSparkPlanExec if
SQLConf.get.subqueryReuseEnabled =>
- // When AQE is on and reuseSubquery is on.
- a.context.subqueryCache
- .update(newIn.canonicalized,
transformSubqueryBroadcast)
- case _ =>
- }
- in.copy(plan =
transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec])
- case r: ReusedSubqueryExec if
r.child.isInstanceOf[SubqueryBroadcastExec] =>
- val newIn = r.child
- .transform {
- case exchange: BroadcastExchangeExec =>
- convertBroadcastExchangeToColumnar(exchange)
- }
- .asInstanceOf[SubqueryBroadcastExec]
- newIn.child match {
- case a: AdaptiveSparkPlanExec =>
- // Only when AQE is on, it needs to replace
SubqueryBroadcastExec
- // with reused ColumnarSubqueryBroadcastExec
- val cachedSubquery =
a.context.subqueryCache.get(newIn.canonicalized)
- if (cachedSubquery.isDefined) {
- in.copy(plan = ReusedSubqueryExec(cachedSubquery.get))
- } else {
- val errMsg = "Can not get the reused
ColumnarSubqueryBroadcastExec" +
- "by the ${newIn.canonicalized}"
- logWarning(errMsg)
- throw new UnsupportedOperationException(errMsg)
- }
- case _ =>
- val errMsg = "Can not get the reused
ColumnarSubqueryBroadcastExec" +
- "by the ${newIn.canonicalized}"
- logWarning(errMsg)
- throw new UnsupportedOperationException(errMsg)
- }
- case _ => in
- }
- }
- case e: Expression => e
- }
- }
- }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index fab973ffb..8ed2137f4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,14 +16,15 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
Transitions}
import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
BroadcastQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
BroadcastExchangeLike, ShuffleExchangeLike}
+import org.apache.spark.sql.internal.SQLConf
object MiscColumnarRules {
object TransformPreOverrides {
@@ -58,6 +59,101 @@ object MiscColumnarRules {
}
}
+ // Replaces all SubqueryBroadcastExec used by sub-queries with
ColumnarSubqueryBroadcastExec.
+ // This prevents query execution from being failed by fallen-back
SubqueryBroadcastExec with
+ // child plan with columnar output (e.g., an adaptive Spark plan that yields
final plan that
+ // is full-offloaded). ColumnarSubqueryBroadcastExec is both compatible with
row-based and
+ // columnar child plan so is always functional.
+ case class RewriteSubqueryBroadcast() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ val out = plan.transformWithSubqueries {
+ case p =>
+ // Since https://github.com/apache/incubator-gluten/pull/1851.
+ //
+ // When AQE is on, the AQE sub-query cache should already be filled
with
+ // row-based SubqueryBroadcastExec for reusing. Thus we are doing
the same
+ // memorize-and-reuse work here for the replaced columnar version.
+ val reuseRemoved = removeReuses(p)
+ val replaced = replace(reuseRemoved)
+ replaced
+ }
+ out
+ }
+
+ private def removeReuses(p: SparkPlan): SparkPlan = {
+ val out = p.transformExpressions {
+ case pe: ExecSubqueryExpression =>
+ val newPlan = pe.plan match {
+ case ReusedSubqueryExec(s: SubqueryBroadcastExec) =>
+ // Remove ReusedSubqueryExec. We will re-create reuses in
subsequent method
+ // #replace.
+ //
+ // We assume only meeting reused sub-queries in AQE execution.
When AQE is off,
+ // Spark adds reuses only after applying columnar rules by
preparation rule
+ // ReuseExchangeAndSubquery.
+ assert(s.child.isInstanceOf[AdaptiveSparkPlanExec])
+ s
+ case other =>
+ other
+ }
+ pe.withNewPlan(newPlan)
+ }
+ out
+ }
+
+ private def replace(p: SparkPlan): SparkPlan = {
+ val out = p.transformExpressions {
+ case pe: ExecSubqueryExpression =>
+ val newPlan = pe.plan match {
+ case s: SubqueryBroadcastExec =>
+ val columnarSubqueryBroadcast = toColumnarSubqueryBroadcast(s)
+ val maybeReused = columnarSubqueryBroadcast.child match {
+ case a: AdaptiveSparkPlanExec if
SQLConf.get.subqueryReuseEnabled =>
+ val cached =
a.context.subqueryCache.get(columnarSubqueryBroadcast.canonicalized)
+ if (cached.nonEmpty) {
+ // Reuse the one in cache.
+ ReusedSubqueryExec(cached.get)
+ } else {
+ // Place columnar sub-query broadcast into cache, then
return it.
+ a.context.subqueryCache
+ .update(columnarSubqueryBroadcast.canonicalized,
columnarSubqueryBroadcast)
+ columnarSubqueryBroadcast
+ }
+ case _ =>
+ // We are not in AQE.
+ columnarSubqueryBroadcast
+ }
+ maybeReused
+ case other => other
+ }
+ pe.withNewPlan(newPlan)
+ }
+ out
+ }
+
+ private def toColumnarBroadcastExchange(
+ exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
+ val newChild = Transitions.toBackendBatchPlan(exchange.child)
+ ColumnarBroadcastExchangeExec(exchange.mode, newChild)
+ }
+
+ private def toColumnarSubqueryBroadcast(
+ from: SubqueryBroadcastExec): ColumnarSubqueryBroadcastExec = {
+ val newChild = from.child match {
+ case exchange: BroadcastExchangeExec =>
+ toColumnarBroadcastExchange(exchange)
+ case aqe: AdaptiveSparkPlanExec =>
+ // Keeps the child if its is AQE even if its supportsColumnar ==
false.
+ // ColumnarSubqueryBroadcastExec is compatible with both row-based
+ // and columnar inputs.
+ aqe
+ case other => other
+ }
+ val out = ColumnarSubqueryBroadcastExec(from.name, from.index,
from.buildKeys, newChild)
+ out
+ }
+ }
+
// Remove topmost columnar-to-row otherwise AQE throws error.
// See:
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec#newQueryStage
//
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 6e4d37f63..8cd2a5fb6 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -20,18 +20,16 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
-import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BatchEvalPythonExec}
@@ -254,17 +252,7 @@ object OffloadOthers {
def doReplace(p: SparkPlan): SparkPlan = {
val plan = p
if (TransformHints.isNotTransformable(plan)) {
- logDebug(s"Columnar Processing for ${plan.getClass} is under row
guard.")
- plan match {
- case plan: BatchScanExec =>
- return applyScanNotTransformable(plan)
- case plan: FileSourceScanExec =>
- return applyScanNotTransformable(plan)
- case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- return applyScanNotTransformable(plan)
- case p =>
- return p
- }
+ return plan
}
plan match {
case plan: BatchScanExec =>
@@ -404,44 +392,6 @@ object OffloadOthers {
}
}
- // Since https://github.com/apache/incubator-gluten/pull/2701
- private def applyScanNotTransformable(plan: SparkPlan): SparkPlan = plan
match {
- case plan: FileSourceScanExec =>
- val newPartitionFilters =
-
ExpressionConverter.transformDynamicPruningExpr(plan.partitionFilters)
- val newSource = plan.copy(partitionFilters = newPartitionFilters)
- if (plan.logicalLink.nonEmpty) {
- newSource.setLogicalLink(plan.logicalLink.get)
- }
- TransformHints.tag(newSource, TransformHints.getHint(plan))
- newSource
- case plan: BatchScanExec =>
- val newPartitionFilters: Seq[Expression] = plan.scan match {
- case scan: FileScan =>
-
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters)
- case _ =>
-
ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters)
- }
- val newSource = plan.copy(runtimeFilters = newPartitionFilters)
- if (plan.logicalLink.nonEmpty) {
- newSource.setLogicalLink(plan.logicalLink.get)
- }
- TransformHints.tag(newSource, TransformHints.getHint(plan))
- newSource
- case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- val newPartitionFilters: Seq[Expression] =
- ExpressionConverter.transformDynamicPruningExpr(
- HiveTableScanExecTransformer.getPartitionFilters(plan))
- val newSource = HiveTableScanExecTransformer.copyWith(plan,
newPartitionFilters)
- if (plan.logicalLink.nonEmpty) {
- newSource.setLogicalLink(plan.logicalLink.get)
- }
- TransformHints.tag(newSource, TransformHints.getHint(plan))
- newSource
- case other =>
- throw new UnsupportedOperationException(s"${other.getClass.toString}
is not supported.")
- }
-
/**
* Apply scan transformer for file source and batch source,
* 1. create new filter and scan transformer, 2. validate, tag new scan
as unsupported if
@@ -456,18 +406,13 @@ object OffloadOthers {
transformer
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- val newSource = plan.copy(partitionFilters =
transformer.getPartitionFilters())
- TransformHints.tagNotTransformable(newSource,
validationResult.reason.get)
- newSource
+ TransformHints.tagNotTransformable(plan, validationResult.reason.get)
+ plan
}
case plan: BatchScanExec =>
ScanTransformerFactory.createBatchScanTransformer(plan)
-
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
- val newPartitionFilters: Seq[Expression] =
- ExpressionConverter.transformDynamicPruningExpr(
- HiveTableScanExecTransformer.getPartitionFilters(plan))
val hiveTableScanExecTransformer =
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
val validateResult = hiveTableScanExecTransformer.doValidate()
@@ -476,9 +421,8 @@ object OffloadOthers {
return hiveTableScanExecTransformer
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- val newSource = HiveTableScanExecTransformer.copyWith(plan,
newPartitionFilters)
- TransformHints.tagNotTransformable(newSource,
validateResult.reason.get)
- newSource
+ TransformHints.tagNotTransformable(plan, validateResult.reason.get)
+ plan
case other =>
throw new GlutenNotSupportException(s"${other.getClass.toString} is
not supported.")
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index ca35c74f6..d32cf2d22 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -372,7 +372,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
// If filter expressions aren't empty, we need to transform the
inner operators.
if (plan.partitionFilters.isEmpty) {
val transformer =
- ScanTransformerFactory.createFileSourceScanTransformer(plan,
validation = true)
+ ScanTransformerFactory.createFileSourceScanTransformer(plan)
transformer.doValidate().tagOnFallback(plan)
}
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index a259641f5..26201dc1b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow}
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
import org.apache.gluten.metrics.GlutenTimeMetric
@@ -101,7 +101,8 @@ class EnumeratedApplier(session: SparkSession)
(_: SparkSession) => RemoveTransitions,
(spark: SparkSession) => FallbackOnANSIMode(spark),
(spark: SparkSession) => PlanOneRowRelation(spark),
- (_: SparkSession) => FallbackEmptySchemaRelation()
+ (_: SparkSession) => FallbackEmptySchemaRelation(),
+ (_: SparkSession) => RewriteSubqueryBroadcast()
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
:::
List((spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark)) :::
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 941677a6b..eb5c561bf 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.heuristic
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPreOverrides}
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
@@ -112,7 +112,8 @@ class HeuristicApplier(session: SparkSession)
(spark: SparkSession) => FallbackOnANSIMode(spark),
(spark: SparkSession) => FallbackMultiCodegens(spark),
(spark: SparkSession) => PlanOneRowRelation(spark),
- (_: SparkSession) => FallbackEmptySchemaRelation()
+ (_: SparkSession) => FallbackEmptySchemaRelation(),
+ (_: SparkSession) => RewriteSubqueryBroadcast()
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
:::
List(
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
index a74d428fd..2c1edd04b 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
@@ -24,8 +24,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelation, HashJoin, LongHashedRelation}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.ThreadUtils
@@ -75,35 +73,25 @@ case class ColumnarSubqueryBroadcastExec(
SQLExecution.withExecutionId(session, executionId) {
val rows = GlutenTimeMetric.millis(longMetric("collectTime")) {
_ =>
- val exchangeChild = child match {
- case exec: ReusedExchangeExec =>
- exec.child
- case _ =>
- child
- }
- if (
- exchangeChild.isInstanceOf[ColumnarBroadcastExchangeExec] ||
- exchangeChild.isInstanceOf[AdaptiveSparkPlanExec]
- ) {
- // transform broadcasted columnar value to Array[InternalRow] by
key
- exchangeChild
- .executeBroadcast[BuildSideRelation]
- .value
- .transform(buildKeys(index))
- .distinct
- } else {
- val broadcastRelation =
exchangeChild.executeBroadcast[HashedRelation]().value
- val (iter, expr) = if
(broadcastRelation.isInstanceOf[LongHashedRelation]) {
- (broadcastRelation.keys(),
HashJoin.extractKeyExprAt(buildKeys, index))
- } else {
- (
- broadcastRelation.keys(),
- BoundReference(index, buildKeys(index).dataType,
buildKeys(index).nullable))
- }
-
- val proj = UnsafeProjection.create(expr)
- val keyIter = iter.map(proj).map(_.copy())
- keyIter.toArray[InternalRow].distinct
+ val relation = child.executeBroadcast[Any]().value
+ relation match {
+ case b: BuildSideRelation =>
+ // Transform columnar broadcast value to Array[InternalRow] by
key.
+ b.transform(buildKeys(index)).distinct
+ case h: HashedRelation =>
+ val (iter, expr) = if (h.isInstanceOf[LongHashedRelation]) {
+ (h.keys(), HashJoin.extractKeyExprAt(buildKeys, index))
+ } else {
+ (
+ h.keys(),
+ BoundReference(index, buildKeys(index).dataType,
buildKeys(index).nullable))
+ }
+ val proj = UnsafeProjection.create(expr)
+ val keyIter = iter.map(proj).map(_.copy())
+ keyIter.toArray[InternalRow].distinct
+ case other =>
+ throw new UnsupportedOperationException(
+ s"Unrecognizable broadcast relation: $other")
}
}
val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes).sum
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 9e97a3687..1cd735cf7 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -82,14 +82,12 @@ case class DeltaScanTransformer(
object DeltaScanTransformer {
- def apply(
- scanExec: FileSourceScanExec,
- newPartitionFilters: Seq[Expression]): DeltaScanTransformer = {
+ def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
new DeltaScanTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
- newPartitionFilters,
+ scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
index a7cecde7c..e482150b8 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec
class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
@@ -24,8 +23,7 @@ class DeltaScanTransformerProvider extends
DataSourceScanTransformerRegister {
override val scanClassName: String =
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
override def createDataSourceTransformer(
- batchScan: FileSourceScanExec,
- newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase
= {
- DeltaScanTransformer(batchScan, newPartitionFilters)
+ batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
+ DeltaScanTransformer(batchScan)
}
}
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 6e079bf7e..5a735b802 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -80,13 +80,11 @@ case class IcebergScanTransformer(
}
object IcebergScanTransformer {
- def apply(
- batchScan: BatchScanExec,
- newPartitionFilters: Seq[Expression]): IcebergScanTransformer = {
+ def apply(batchScan: BatchScanExec): IcebergScanTransformer = {
new IcebergScanTransformer(
batchScan.output,
batchScan.scan,
- newPartitionFilters,
+ batchScan.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan),
keyGroupedPartitioning =
SparkShimLoader.getSparkShims.getKeyGroupedPartitioning(batchScan),
commonPartitionValues =
SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan)
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
index 1ebeebf00..dc521f39c 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
class IcebergTransformerProvider extends DataSourceScanTransformerRegister {
@@ -24,8 +23,7 @@ class IcebergTransformerProvider extends
DataSourceScanTransformerRegister {
override val scanClassName: String =
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
override def createDataSourceV2Transformer(
- batchScan: BatchScanExec,
- newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
- IcebergScanTransformer(batchScan, newPartitionFilters)
+ batchScan: BatchScanExec): BatchScanExecTransformerBase = {
+ IcebergScanTransformer(batchScan)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]