[ 
https://issues.apache.org/jira/browse/SPARK-46240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiang13021 updated SPARK-46240:
-------------------------------
    Description: 
Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. 
However, users do not have the ability to add rules in this context.
{code:java}
// org.apache.spark.sql.execution.QueryExecution#preparations  
private[execution] def preparations(
    sparkSession: SparkSession,
    adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
    subquery: Boolean): Seq[Rule[SparkPlan]] = {
  // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following 
rules will be no-op
  // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
  adaptiveExecutionRule.toSeq ++
  Seq(
    CoalesceBucketsInJoin,
    PlanDynamicPruningFilters(sparkSession),
    PlanSubqueries(sparkSession),
    RemoveRedundantProjects,
    EnsureRequirements(),
    // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to 
guarantee the
    // sort order of each node is checked to be valid.
    ReplaceHashWithSortAgg,
    // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to 
guarantee the same
    // number of partitions when instantiating PartitioningCollection.
    RemoveRedundantSorts,
    DisableUnnecessaryBucketedScan,
    ApplyColumnarRulesAndInsertTransitions(
      sparkSession.sessionState.columnarRules, outputsColumnar = false),
    CollapseCodegenStages()) ++
    (if (subquery) {
      Nil
    } else {
      Seq(ReuseExchangeAndSubquery)
    })
}{code}
We need to add some "Rule[SparkPlan]"s at this position because currently, all 
such rules are present in AQE, which requires users to use AQE and meet the 
requirements to enter AdaptiveSparkPlanExec. This makes it difficult to 
implement certain extensions for simple SQLs.

For example, adding some new datasource filters for external data sources is 
challenging. Modifying DataSourceStrategy directly is not conducive to staying 
in sync with future advancements in the community. Additionally, customizing 
the Strategy makes it difficult to append new functionalities in an incremental 
manner. If we define AQE rules, they would not be effective for the simplest 
'SELECT * FROM ... WHERE ...' statements. Therefore, it is necessary to 
introduce a customizable Rule[SparkPlan] between sparkPlan and executedPlan.

We could add an extension called "ExecutedPlanPrepRule" to 
SparkSessionExtensions,  which would allow users to add their own rules.

  was:
Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. 
However, users do not have the ability to add rules in this context.
{code:java}
// org.apache.spark.sql.execution.QueryExecution#preparations  
private[execution] def preparations(
    sparkSession: SparkSession,
    adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
    subquery: Boolean): Seq[Rule[SparkPlan]] = {
  // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following 
rules will be no-op
  // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
  adaptiveExecutionRule.toSeq ++
  Seq(
    CoalesceBucketsInJoin,
    PlanDynamicPruningFilters(sparkSession),
    PlanSubqueries(sparkSession),
    RemoveRedundantProjects,
    EnsureRequirements(),
    // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to 
guarantee the
    // sort order of each node is checked to be valid.
    ReplaceHashWithSortAgg,
    // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to 
guarantee the same
    // number of partitions when instantiating PartitioningCollection.
    RemoveRedundantSorts,
    DisableUnnecessaryBucketedScan,
    ApplyColumnarRulesAndInsertTransitions(
      sparkSession.sessionState.columnarRules, outputsColumnar = false),
    CollapseCodegenStages()) ++
    (if (subquery) {
      Nil
    } else {
      Seq(ReuseExchangeAndSubquery)
    })
}{code}
We could add an extension called "PrepExecutedPlanRule" to 
SparkSessionExtensions,  which would allow users to add their own rules.

        Summary: Add ExecutedPlanPrepRules to SparkSessionExtensions  (was: Add 
PrepExecutedPlanRule to SparkSessionExtensions)

> Add ExecutedPlanPrepRules to SparkSessionExtensions
> ---------------------------------------------------
>
>                 Key: SPARK-46240
>                 URL: https://issues.apache.org/jira/browse/SPARK-46240
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0
>            Reporter: jiang13021
>            Priority: Major
>
> Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. 
> However, users do not have the ability to add rules in this context.
> {code:java}
> // org.apache.spark.sql.execution.QueryExecution#preparations  
> private[execution] def preparations(
>     sparkSession: SparkSession,
>     adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
>     subquery: Boolean): Seq[Rule[SparkPlan]] = {
>   // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following 
> rules will be no-op
>   // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
>   adaptiveExecutionRule.toSeq ++
>   Seq(
>     CoalesceBucketsInJoin,
>     PlanDynamicPruningFilters(sparkSession),
>     PlanSubqueries(sparkSession),
>     RemoveRedundantProjects,
>     EnsureRequirements(),
>     // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` 
> to guarantee the
>     // sort order of each node is checked to be valid.
>     ReplaceHashWithSortAgg,
>     // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to 
> guarantee the same
>     // number of partitions when instantiating PartitioningCollection.
>     RemoveRedundantSorts,
>     DisableUnnecessaryBucketedScan,
>     ApplyColumnarRulesAndInsertTransitions(
>       sparkSession.sessionState.columnarRules, outputsColumnar = false),
>     CollapseCodegenStages()) ++
>     (if (subquery) {
>       Nil
>     } else {
>       Seq(ReuseExchangeAndSubquery)
>     })
> }{code}
> We need to add some "Rule[SparkPlan]"s at this position because currently, 
> all such rules are present in AQE, which requires users to use AQE and meet 
> the requirements to enter AdaptiveSparkPlanExec. This makes it difficult to 
> implement certain extensions for simple SQLs.
> For example, adding some new datasource filters for external data sources is 
> challenging. Modifying DataSourceStrategy directly is not conducive to 
> staying in sync with future advancements in the community. Additionally, 
> customizing the Strategy makes it difficult to append new functionalities in 
> an incremental manner. If we define AQE rules, they would not be effective 
> for the simplest 'SELECT * FROM ... WHERE ...' statements. Therefore, it is 
> necessary to introduce a customizable Rule[SparkPlan] between sparkPlan and 
> executedPlan.
> We could add an extension called "ExecutedPlanPrepRule" to 
> SparkSessionExtensions,  which would allow users to add their own rules.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to