[ 
https://issues.apache.org/jira/browse/SPARK-46240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-46240:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add ExecutedPlanPrepRules to SparkSessionExtensions
> ---------------------------------------------------
>
>                 Key: SPARK-46240
>                 URL: https://issues.apache.org/jira/browse/SPARK-46240
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0
>            Reporter: jiang13021
>            Priority: Major
>              Labels: pull-request-available
>
> Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. 
> However, users do not have the ability to add rules in this context.
> {code:java}
> // org.apache.spark.sql.execution.QueryExecution#preparations  
> private[execution] def preparations(
>     sparkSession: SparkSession,
>     adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
>     subquery: Boolean): Seq[Rule[SparkPlan]] = {
>   // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following 
> rules will be no-op
>   // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
>   adaptiveExecutionRule.toSeq ++
>   Seq(
>     CoalesceBucketsInJoin,
>     PlanDynamicPruningFilters(sparkSession),
>     PlanSubqueries(sparkSession),
>     RemoveRedundantProjects,
>     EnsureRequirements(),
>     // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` 
> to guarantee the
>     // sort order of each node is checked to be valid.
>     ReplaceHashWithSortAgg,
>     // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to 
> guarantee the same
>     // number of partitions when instantiating PartitioningCollection.
>     RemoveRedundantSorts,
>     DisableUnnecessaryBucketedScan,
>     ApplyColumnarRulesAndInsertTransitions(
>       sparkSession.sessionState.columnarRules, outputsColumnar = false),
>     CollapseCodegenStages()) ++
>     (if (subquery) {
>       Nil
>     } else {
>       Seq(ReuseExchangeAndSubquery)
>     })
> }{code}
> We need to add some "Rule[SparkPlan]"s at this position because currently, 
> all such rules are present in AQE, which requires users to use AQE and meet 
> the requirements to enter AdaptiveSparkPlanExec. This makes it difficult to 
> implement certain extensions for simple SQLs.
> For example, adding some new datasource filters for external data sources is 
> challenging. Modifying DataSourceStrategy directly is not conducive to 
> staying in sync with future advancements in the community. Additionally, 
> customizing the Strategy makes it difficult to append new functionalities in 
> an incremental manner. If we define AQE rules, they would not be effective 
> for the simplest 'SELECT * FROM ... WHERE ...' statements. Therefore, it is 
> necessary to introduce a customizable Rule[SparkPlan] between sparkPlan and 
> executedPlan.
> We could add an extension called "ExecutedPlanPrepRule" to 
> SparkSessionExtensions,  which would allow users to add their own rules.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to