This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f610059f1 [CORE] Rework Gluten + DPP compatibility (#6035)
f610059f1 is described below

commit f610059f1ded8c0942ee8dd0b82b0223d3180a40
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jun 12 10:22:26 2024 +0800

    [CORE] Rework Gluten + DPP compatibility (#6035)
---
 .../DataSourceScanTransformerRegister.scala        |   8 +-
 .../gluten/execution/ScanTransformerFactory.scala  |  42 +++------
 .../gluten/expression/ExpressionConverter.scala    | 101 +-------------------
 .../extension/columnar/MiscColumnarRules.scala     | 102 ++++++++++++++++++++-
 .../extension/columnar/OffloadSingleNode.scala     |  68 ++------------
 .../extension/columnar/TransformHintRule.scala     |   2 +-
 .../columnar/enumerated/EnumeratedApplier.scala    |   5 +-
 .../columnar/heuristic/HeuristicApplier.scala      |   5 +-
 .../execution/ColumnarSubqueryBroadcastExec.scala  |  50 ++++------
 .../gluten/execution/DeltaScanTransformer.scala    |   6 +-
 .../execution/DeltaScanTransformerProvider.scala   |   6 +-
 .../gluten/execution/IcebergScanTransformer.scala  |   6 +-
 .../execution/IcebergTransformerProvider.scala     |   6 +-
 13 files changed, 153 insertions(+), 254 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
index b899790c3..5b46c2385 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
@@ -44,15 +43,12 @@ trait DataSourceScanTransformerRegister {
   val scanClassName: String
 
   def createDataSourceTransformer(
-      batchScan: FileSourceScanExec,
-      newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase 
= {
+      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
     throw new UnsupportedOperationException(
       "This should not be called, please implement this method in child 
class.");
   }
 
-  def createDataSourceV2Transformer(
-      batchScan: BatchScanExec,
-      newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
+  def createDataSourceV2Transformer(batchScan: BatchScanExec): 
BatchScanExecTransformerBase = {
     throw new UnsupportedOperationException(
       "This should not be called, please implement this method in child 
class.");
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index fc2b8f506..fcb9e983e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.extension.columnar.TransformHints
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -37,14 +36,7 @@ object ScanTransformerFactory {
 
   def createFileSourceScanTransformer(
       scanExec: FileSourceScanExec,
-      allPushDownFilters: Option[Seq[Expression]] = None,
-      validation: Boolean = false): FileSourceScanExecTransformerBase = {
-    // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in 
partitionFilters
-    val newPartitionFilters = if (validation) {
-      scanExec.partitionFilters
-    } else {
-      
ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters)
-    }
+      allPushDownFilters: Option[Seq[Expression]] = None): 
FileSourceScanExecTransformerBase = {
     val fileFormat = scanExec.relation.fileFormat
     lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
       case Some(clz) =>
@@ -52,13 +44,13 @@ object ScanTransformerFactory {
           .getDeclaredConstructor()
           .newInstance()
           .asInstanceOf[DataSourceScanTransformerRegister]
-          .createDataSourceTransformer(scanExec, newPartitionFilters)
+          .createDataSourceTransformer(scanExec)
       case _ =>
-        new FileSourceScanExecTransformer(
+        FileSourceScanExecTransformer(
           scanExec.relation,
           scanExec.output,
           scanExec.requiredSchema,
-          newPartitionFilters,
+          scanExec.partitionFilters,
           scanExec.optionalBucketSet,
           scanExec.optionalNumCoalescedBuckets,
           allPushDownFilters.getOrElse(scanExec.dataFilters),
@@ -69,8 +61,7 @@ object ScanTransformerFactory {
   }
 
   private def lookupBatchScanTransformer(
-      batchScanExec: BatchScanExec,
-      newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
+      batchScanExec: BatchScanExec): BatchScanExecTransformerBase = {
     val scan = batchScanExec.scan
     lookupDataSourceScanTransformer(scan.getClass.getName) match {
       case Some(clz) =>
@@ -78,14 +69,14 @@ object ScanTransformerFactory {
           .getDeclaredConstructor()
           .newInstance()
           .asInstanceOf[DataSourceScanTransformerRegister]
-          .createDataSourceV2Transformer(batchScanExec, newPartitionFilters)
+          .createDataSourceV2Transformer(batchScanExec)
       case _ =>
         scan match {
           case _: FileScan =>
-            new BatchScanExecTransformer(
+            BatchScanExecTransformer(
               batchScanExec.output,
               batchScanExec.scan,
-              newPartitionFilters,
+              batchScanExec.runtimeFilters,
               table = 
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
             )
           case _ =>
@@ -99,14 +90,7 @@ object ScanTransformerFactory {
       allPushDownFilters: Option[Seq[Expression]] = None,
       validation: Boolean = false): SparkPlan = {
     if (supportedBatchScan(batchScan.scan)) {
-      val newPartitionFilters = if (validation) {
-        // No transformation is needed for DynamicPruningExpressions
-        // during the validation process.
-        batchScan.runtimeFilters
-      } else {
-        
ExpressionConverter.transformDynamicPruningExpr(batchScan.runtimeFilters)
-      }
-      val transformer = lookupBatchScanTransformer(batchScan, 
newPartitionFilters)
+      val transformer = lookupBatchScanTransformer(batchScan)
       if (!validation && allPushDownFilters.isDefined) {
         transformer.setPushDownFilters(allPushDownFilters.get)
         // Validate again if allPushDownFilters is defined.
@@ -125,12 +109,8 @@ object ScanTransformerFactory {
       if (validation) {
         throw new GlutenNotSupportException(s"Unsupported scan 
${batchScan.scan}")
       }
-      // If filter expressions aren't empty, we need to transform the inner 
operators,
-      // and fallback the BatchScanExec itself.
-      val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
-        .transformDynamicPruningExpr(batchScan.runtimeFilters))
-      TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec 
is not supported.")
-      newSource
+      TransformHints.tagNotTransformable(batchScan, "The scan in BatchScanExec 
is not supported.")
+      batchScan
     }
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 9ebe44f6c..5d0af9e52 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.gluten.expression
 
-import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.test.TestStats
 import org.apache.gluten.utils.DecimalArithmeticUtil
@@ -29,9 +27,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
-import org.apache.spark.sql.execution.{ScalarSubquery, _}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
+import org.apache.spark.sql.execution.ScalarSubquery
 import org.apache.spark.sql.hive.HiveUDFTransformer
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -680,99 +676,4 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
     }
     substraitExprName
   }
-
-  /**
-   * Transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in 
DynamicPruningExpression.
-   *
-   * @param partitionFilters
-   *   The partition filter of Scan
-   * @return
-   *   Transformed partition filter
-   */
-  def transformDynamicPruningExpr(partitionFilters: Seq[Expression]): 
Seq[Expression] = {
-
-    def convertBroadcastExchangeToColumnar(
-        exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
-      val newChild = Transitions.toBackendBatchPlan(exchange.child)
-      ColumnarBroadcastExchangeExec(exchange.mode, newChild)
-    }
-
-    if (
-      GlutenConfig.getConf.enableScanOnly || 
!GlutenConfig.getConf.enableColumnarBroadcastExchange
-    ) {
-      // Disable ColumnarSubqueryBroadcast for scan-only execution
-      // or ColumnarBroadcastExchange was disabled.
-      partitionFilters
-    } else {
-      partitionFilters.map {
-        case dynamicPruning: DynamicPruningExpression =>
-          dynamicPruning.transform {
-            // Lookup inside subqueries for duplicate exchanges.
-            case in: InSubqueryExec =>
-              in.plan match {
-                case s: SubqueryBroadcastExec =>
-                  val newIn = s
-                    .transform {
-                      case exchange: BroadcastExchangeExec =>
-                        convertBroadcastExchangeToColumnar(exchange)
-                    }
-                    .asInstanceOf[SubqueryBroadcastExec]
-                  val transformSubqueryBroadcast = 
ColumnarSubqueryBroadcastExec(
-                    newIn.name,
-                    newIn.index,
-                    newIn.buildKeys,
-                    newIn.child)
-
-                  // When AQE is on, spark will apply ReuseAdaptiveSubquery 
rule first,
-                  // it will reuse vanilla SubqueryBroadcastExec,
-                  // and then use gluten ColumnarOverrides rule to transform 
Subquery,
-                  // so all the SubqueryBroadcastExec in the 
ReusedSubqueryExec will be transformed
-                  // to a new ColumnarSubqueryBroadcastExec for each 
SubqueryBroadcastExec,
-                  // which will lead to execute 
ColumnarSubqueryBroadcastExec.relationFuture
-                  // repeatedly even in the ReusedSubqueryExec.
-                  //
-                  // On the other hand, it needs to use
-                  // the AdaptiveSparkPlanExec.AdaptiveExecutionContext to 
hold the reused map
-                  // for each query.
-                  newIn.child match {
-                    case a: AdaptiveSparkPlanExec if 
SQLConf.get.subqueryReuseEnabled =>
-                      // When AQE is on and reuseSubquery is on.
-                      a.context.subqueryCache
-                        .update(newIn.canonicalized, 
transformSubqueryBroadcast)
-                    case _ =>
-                  }
-                  in.copy(plan = 
transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec])
-                case r: ReusedSubqueryExec if 
r.child.isInstanceOf[SubqueryBroadcastExec] =>
-                  val newIn = r.child
-                    .transform {
-                      case exchange: BroadcastExchangeExec =>
-                        convertBroadcastExchangeToColumnar(exchange)
-                    }
-                    .asInstanceOf[SubqueryBroadcastExec]
-                  newIn.child match {
-                    case a: AdaptiveSparkPlanExec =>
-                      // Only when AQE is on, it needs to replace 
SubqueryBroadcastExec
-                      // with reused ColumnarSubqueryBroadcastExec
-                      val cachedSubquery = 
a.context.subqueryCache.get(newIn.canonicalized)
-                      if (cachedSubquery.isDefined) {
-                        in.copy(plan = ReusedSubqueryExec(cachedSubquery.get))
-                      } else {
-                        val errMsg = "Can not get the reused 
ColumnarSubqueryBroadcastExec" +
-                          "by the ${newIn.canonicalized}"
-                        logWarning(errMsg)
-                        throw new UnsupportedOperationException(errMsg)
-                      }
-                    case _ =>
-                      val errMsg = "Can not get the reused 
ColumnarSubqueryBroadcastExec" +
-                        "by the ${newIn.canonicalized}"
-                      logWarning(errMsg)
-                      throw new UnsupportedOperationException(errMsg)
-                  }
-                case _ => in
-              }
-          }
-        case e: Expression => e
-      }
-    }
-  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index fab973ffb..8ed2137f4 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
Transitions}
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
BroadcastQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ShuffleExchangeLike}
+import org.apache.spark.sql.internal.SQLConf
 
 object MiscColumnarRules {
   object TransformPreOverrides {
@@ -58,6 +59,101 @@ object MiscColumnarRules {
     }
   }
 
+  // Replaces all SubqueryBroadcastExec used by sub-queries with 
ColumnarSubqueryBroadcastExec.
+  // This prevents query execution from being failed by fallen-back 
SubqueryBroadcastExec with
+  // child plan with columnar output (e.g., an adaptive Spark plan that yields 
final plan that
+  // is full-offloaded). ColumnarSubqueryBroadcastExec is both compatible with 
row-based and
+  // columnar child plan so is always functional.
+  case class RewriteSubqueryBroadcast() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = {
+      val out = plan.transformWithSubqueries {
+        case p =>
+          // Since https://github.com/apache/incubator-gluten/pull/1851.
+          //
+          // When AQE is on, the AQE sub-query cache should already be filled 
with
+          // row-based SubqueryBroadcastExec for reusing. Thus we are doing 
the same
+          // memorize-and-reuse work here for the replaced columnar version.
+          val reuseRemoved = removeReuses(p)
+          val replaced = replace(reuseRemoved)
+          replaced
+      }
+      out
+    }
+
+    private def removeReuses(p: SparkPlan): SparkPlan = {
+      val out = p.transformExpressions {
+        case pe: ExecSubqueryExpression =>
+          val newPlan = pe.plan match {
+            case ReusedSubqueryExec(s: SubqueryBroadcastExec) =>
+              // Remove ReusedSubqueryExec. We will re-create reuses in 
subsequent method
+              // #replace.
+              //
+              // We assume only meeting reused sub-queries in AQE execution. 
When AQE is off,
+              // Spark adds reuses only after applying columnar rules by 
preparation rule
+              // ReuseExchangeAndSubquery.
+              assert(s.child.isInstanceOf[AdaptiveSparkPlanExec])
+              s
+            case other =>
+              other
+          }
+          pe.withNewPlan(newPlan)
+      }
+      out
+    }
+
+    private def replace(p: SparkPlan): SparkPlan = {
+      val out = p.transformExpressions {
+        case pe: ExecSubqueryExpression =>
+          val newPlan = pe.plan match {
+            case s: SubqueryBroadcastExec =>
+              val columnarSubqueryBroadcast = toColumnarSubqueryBroadcast(s)
+              val maybeReused = columnarSubqueryBroadcast.child match {
+                case a: AdaptiveSparkPlanExec if 
SQLConf.get.subqueryReuseEnabled =>
+                  val cached = 
a.context.subqueryCache.get(columnarSubqueryBroadcast.canonicalized)
+                  if (cached.nonEmpty) {
+                    // Reuse the one in cache.
+                    ReusedSubqueryExec(cached.get)
+                  } else {
+                    // Place columnar sub-query broadcast into cache, then 
return it.
+                    a.context.subqueryCache
+                      .update(columnarSubqueryBroadcast.canonicalized, 
columnarSubqueryBroadcast)
+                    columnarSubqueryBroadcast
+                  }
+                case _ =>
+                  // We are not in AQE.
+                  columnarSubqueryBroadcast
+              }
+              maybeReused
+            case other => other
+          }
+          pe.withNewPlan(newPlan)
+      }
+      out
+    }
+
+    private def toColumnarBroadcastExchange(
+        exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
+      val newChild = Transitions.toBackendBatchPlan(exchange.child)
+      ColumnarBroadcastExchangeExec(exchange.mode, newChild)
+    }
+
+    private def toColumnarSubqueryBroadcast(
+        from: SubqueryBroadcastExec): ColumnarSubqueryBroadcastExec = {
+      val newChild = from.child match {
+        case exchange: BroadcastExchangeExec =>
+          toColumnarBroadcastExchange(exchange)
+        case aqe: AdaptiveSparkPlanExec =>
+          // Keeps the child if its is AQE even if its supportsColumnar == 
false.
+          // ColumnarSubqueryBroadcastExec is compatible with both row-based
+          // and columnar inputs.
+          aqe
+        case other => other
+      }
+      val out = ColumnarSubqueryBroadcastExec(from.name, from.index, 
from.buildKeys, newChild)
+      out
+    }
+  }
+
   // Remove topmost columnar-to-row otherwise AQE throws error.
   // See: 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec#newQueryStage
   //
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 6e4d37f63..8cd2a5fb6 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -20,18 +20,16 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
-import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.api.python.EvalPythonExecTransformer
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec}
@@ -254,17 +252,7 @@ object OffloadOthers {
     def doReplace(p: SparkPlan): SparkPlan = {
       val plan = p
       if (TransformHints.isNotTransformable(plan)) {
-        logDebug(s"Columnar Processing for ${plan.getClass} is under row 
guard.")
-        plan match {
-          case plan: BatchScanExec =>
-            return applyScanNotTransformable(plan)
-          case plan: FileSourceScanExec =>
-            return applyScanNotTransformable(plan)
-          case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-            return applyScanNotTransformable(plan)
-          case p =>
-            return p
-        }
+        return plan
       }
       plan match {
         case plan: BatchScanExec =>
@@ -404,44 +392,6 @@ object OffloadOthers {
       }
     }
 
-    // Since https://github.com/apache/incubator-gluten/pull/2701
-    private def applyScanNotTransformable(plan: SparkPlan): SparkPlan = plan 
match {
-      case plan: FileSourceScanExec =>
-        val newPartitionFilters =
-          
ExpressionConverter.transformDynamicPruningExpr(plan.partitionFilters)
-        val newSource = plan.copy(partitionFilters = newPartitionFilters)
-        if (plan.logicalLink.nonEmpty) {
-          newSource.setLogicalLink(plan.logicalLink.get)
-        }
-        TransformHints.tag(newSource, TransformHints.getHint(plan))
-        newSource
-      case plan: BatchScanExec =>
-        val newPartitionFilters: Seq[Expression] = plan.scan match {
-          case scan: FileScan =>
-            
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters)
-          case _ =>
-            
ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters)
-        }
-        val newSource = plan.copy(runtimeFilters = newPartitionFilters)
-        if (plan.logicalLink.nonEmpty) {
-          newSource.setLogicalLink(plan.logicalLink.get)
-        }
-        TransformHints.tag(newSource, TransformHints.getHint(plan))
-        newSource
-      case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-        val newPartitionFilters: Seq[Expression] =
-          ExpressionConverter.transformDynamicPruningExpr(
-            HiveTableScanExecTransformer.getPartitionFilters(plan))
-        val newSource = HiveTableScanExecTransformer.copyWith(plan, 
newPartitionFilters)
-        if (plan.logicalLink.nonEmpty) {
-          newSource.setLogicalLink(plan.logicalLink.get)
-        }
-        TransformHints.tag(newSource, TransformHints.getHint(plan))
-        newSource
-      case other =>
-        throw new UnsupportedOperationException(s"${other.getClass.toString} 
is not supported.")
-    }
-
     /**
      * Apply scan transformer for file source and batch source,
      *   1. create new filter and scan transformer, 2. validate, tag new scan 
as unsupported if
@@ -456,18 +406,13 @@ object OffloadOthers {
           transformer
         } else {
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
unsupported.")
-          val newSource = plan.copy(partitionFilters = 
transformer.getPartitionFilters())
-          TransformHints.tagNotTransformable(newSource, 
validationResult.reason.get)
-          newSource
+          TransformHints.tagNotTransformable(plan, validationResult.reason.get)
+          plan
         }
       case plan: BatchScanExec =>
         ScanTransformerFactory.createBatchScanTransformer(plan)
-
       case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
         // TODO: Add DynamicPartitionPruningHiveScanSuite.scala
-        val newPartitionFilters: Seq[Expression] =
-          ExpressionConverter.transformDynamicPruningExpr(
-            HiveTableScanExecTransformer.getPartitionFilters(plan))
         val hiveTableScanExecTransformer =
           
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
         val validateResult = hiveTableScanExecTransformer.doValidate()
@@ -476,9 +421,8 @@ object OffloadOthers {
           return hiveTableScanExecTransformer
         }
         logDebug(s"Columnar Processing for ${plan.getClass} is currently 
unsupported.")
-        val newSource = HiveTableScanExecTransformer.copyWith(plan, 
newPartitionFilters)
-        TransformHints.tagNotTransformable(newSource, 
validateResult.reason.get)
-        newSource
+        TransformHints.tagNotTransformable(plan, validateResult.reason.get)
+        plan
       case other =>
         throw new GlutenNotSupportException(s"${other.getClass.toString} is 
not supported.")
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index ca35c74f6..d32cf2d22 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -372,7 +372,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
           // If filter expressions aren't empty, we need to transform the 
inner operators.
           if (plan.partitionFilters.isEmpty) {
             val transformer =
-              ScanTransformerFactory.createFileSourceScanTransformer(plan, 
validation = true)
+              ScanTransformerFactory.createFileSourceScanTransformer(plan)
             transformer.doValidate().tagOnFallback(plan)
           }
         case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index a259641f5..26201dc1b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.enumerated
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow}
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
 import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 import org.apache.gluten.extension.columnar.util.AdaptiveContext
 import org.apache.gluten.metrics.GlutenTimeMetric
@@ -101,7 +101,8 @@ class EnumeratedApplier(session: SparkSession)
       (_: SparkSession) => RemoveTransitions,
       (spark: SparkSession) => FallbackOnANSIMode(spark),
       (spark: SparkSession) => PlanOneRowRelation(spark),
-      (_: SparkSession) => FallbackEmptySchemaRelation()
+      (_: SparkSession) => FallbackEmptySchemaRelation(),
+      (_: SparkSession) => RewriteSubqueryBroadcast()
     ) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
 :::
       List((spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark)) :::
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 941677a6b..eb5c561bf 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.heuristic
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPreOverrides}
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
 import 
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
 import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 import org.apache.gluten.extension.columnar.util.AdaptiveContext
@@ -112,7 +112,8 @@ class HeuristicApplier(session: SparkSession)
       (spark: SparkSession) => FallbackOnANSIMode(spark),
       (spark: SparkSession) => FallbackMultiCodegens(spark),
       (spark: SparkSession) => PlanOneRowRelation(spark),
-      (_: SparkSession) => FallbackEmptySchemaRelation()
+      (_: SparkSession) => FallbackEmptySchemaRelation(),
+      (_: SparkSession) => RewriteSubqueryBroadcast()
     ) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
 :::
       List(
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
index a74d428fd..2c1edd04b 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
@@ -24,8 +24,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelation, HashJoin, LongHashedRelation}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.ThreadUtils
@@ -75,35 +73,25 @@ case class ColumnarSubqueryBroadcastExec(
       SQLExecution.withExecutionId(session, executionId) {
         val rows = GlutenTimeMetric.millis(longMetric("collectTime")) {
           _ =>
-            val exchangeChild = child match {
-              case exec: ReusedExchangeExec =>
-                exec.child
-              case _ =>
-                child
-            }
-            if (
-              exchangeChild.isInstanceOf[ColumnarBroadcastExchangeExec] ||
-              exchangeChild.isInstanceOf[AdaptiveSparkPlanExec]
-            ) {
-              // transform broadcasted columnar value to Array[InternalRow] by 
key
-              exchangeChild
-                .executeBroadcast[BuildSideRelation]
-                .value
-                .transform(buildKeys(index))
-                .distinct
-            } else {
-              val broadcastRelation = 
exchangeChild.executeBroadcast[HashedRelation]().value
-              val (iter, expr) = if 
(broadcastRelation.isInstanceOf[LongHashedRelation]) {
-                (broadcastRelation.keys(), 
HashJoin.extractKeyExprAt(buildKeys, index))
-              } else {
-                (
-                  broadcastRelation.keys(),
-                  BoundReference(index, buildKeys(index).dataType, 
buildKeys(index).nullable))
-              }
-
-              val proj = UnsafeProjection.create(expr)
-              val keyIter = iter.map(proj).map(_.copy())
-              keyIter.toArray[InternalRow].distinct
+            val relation = child.executeBroadcast[Any]().value
+            relation match {
+              case b: BuildSideRelation =>
+                // Transform columnar broadcast value to Array[InternalRow] by 
key.
+                b.transform(buildKeys(index)).distinct
+              case h: HashedRelation =>
+                val (iter, expr) = if (h.isInstanceOf[LongHashedRelation]) {
+                  (h.keys(), HashJoin.extractKeyExprAt(buildKeys, index))
+                } else {
+                  (
+                    h.keys(),
+                    BoundReference(index, buildKeys(index).dataType, 
buildKeys(index).nullable))
+                }
+                val proj = UnsafeProjection.create(expr)
+                val keyIter = iter.map(proj).map(_.copy())
+                keyIter.toArray[InternalRow].distinct
+              case other =>
+                throw new UnsupportedOperationException(
+                  s"Unrecognizable broadcast relation: $other")
             }
         }
         val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes).sum
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 9e97a3687..1cd735cf7 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -82,14 +82,12 @@ case class DeltaScanTransformer(
 
 object DeltaScanTransformer {
 
-  def apply(
-      scanExec: FileSourceScanExec,
-      newPartitionFilters: Seq[Expression]): DeltaScanTransformer = {
+  def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
     new DeltaScanTransformer(
       scanExec.relation,
       scanExec.output,
       scanExec.requiredSchema,
-      newPartitionFilters,
+      scanExec.partitionFilters,
       scanExec.optionalBucketSet,
       scanExec.optionalNumCoalescedBuckets,
       scanExec.dataFilters,
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
index a7cecde7c..e482150b8 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.FileSourceScanExec
 
 class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
@@ -24,8 +23,7 @@ class DeltaScanTransformerProvider extends 
DataSourceScanTransformerRegister {
   override val scanClassName: String = 
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
 
   override def createDataSourceTransformer(
-      batchScan: FileSourceScanExec,
-      newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase 
= {
-    DeltaScanTransformer(batchScan, newPartitionFilters)
+      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
+    DeltaScanTransformer(batchScan)
   }
 }
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 6e079bf7e..5a735b802 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -80,13 +80,11 @@ case class IcebergScanTransformer(
 }
 
 object IcebergScanTransformer {
-  def apply(
-      batchScan: BatchScanExec,
-      newPartitionFilters: Seq[Expression]): IcebergScanTransformer = {
+  def apply(batchScan: BatchScanExec): IcebergScanTransformer = {
     new IcebergScanTransformer(
       batchScan.output,
       batchScan.scan,
-      newPartitionFilters,
+      batchScan.runtimeFilters,
       table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan),
       keyGroupedPartitioning = 
SparkShimLoader.getSparkShims.getKeyGroupedPartitioning(batchScan),
       commonPartitionValues = 
SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan)
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
index 1ebeebf00..dc521f39c 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 class IcebergTransformerProvider extends DataSourceScanTransformerRegister {
@@ -24,8 +23,7 @@ class IcebergTransformerProvider extends 
DataSourceScanTransformerRegister {
   override val scanClassName: String = 
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
 
   override def createDataSourceV2Transformer(
-      batchScan: BatchScanExec,
-      newPartitionFilters: Seq[Expression]): BatchScanExecTransformerBase = {
-    IcebergScanTransformer(batchScan, newPartitionFilters)
+      batchScan: BatchScanExec): BatchScanExecTransformerBase = {
+    IcebergScanTransformer(batchScan)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to