[ 
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-54437:
-----------------------------
    Description: 
Q1. What are you trying to do?
This document suggests that we set `outputPartitioning` to `SinglePartition` 
(instead of the default UnknownPartition(0)) when the query runs over small 
data to help Spark better generate efficient execution plans for small queries.


Q2. What problem is this proposal NOT designed to solve?
This doc does not propose implementing a new fast execution engine for Spark. 
This doc only suggests a minimal effort, low-hanging fruit change that sets 
`outputPartitioning` to `SinglePartition` when we detect Spark input data is 
small over a set of supported plan shapes.

Q3. How is it done today, and what are the limits of current practice?
Currently, for a query plan if
Its input is LocalTableExec, which uses the default UnknownPartitioning(0)
Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)

The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
apply an Exchange node (to ensure partitioning requirement), which would later 
turn into a shuffle implementation. Shuffle would break the execution to more 
stages and each stage will loop into a full lifecycle of Spark jobs, which is 
expensive if the query itself only needs a few hundred of million seconds to 
run. This is true if Spark only runs a very simple aggregation over a 
LocalTableScan with only a few thousand of rows or a small FileScan with <= 
64MB. However for such small data, all the operators running into a single 
stage without shuffle will be sufficient enough, and much faster.

 
Q4. What is new in your approach and why do you think it will be successful?
First of all, our approach will be guarded by a config and it can be turned 
off, and it is supposed to be turned off by default.

The new framework proposed here will
Verify if the plan shape is supported,
Verify if the input data size is small enough
Set the output partitioning to SinglePartition on input data source including 
LocalTableScan and FileScan
Propagate the SinglePartition through the query plan.
Spark will utilize SinglePartition information to eliminate unneeded shuffles.

the rough code looks like the following:


{code:scala}
def exploreSinglePartitionOpportunites() = if (framework_is_on && 
isPlanSupported(queryPlan)) {
 plan.transform {
      case l: LocalTableScan if isLocalTableSmall(l) => SetSinglePartition(l)
      case f: FileScan if isLocalTableSmall(f) => SetSinglePartition(f)
  }

def setOutputPartitioning(plan: LogicalPlan) {
   for (child: plan.childrens) {
      setOutputPartitioning(child)
  }
  plan.setOutputPartitioning(SinglePartition)
}

if (exploreSinglePartitionOpportunites()) {
   setOutputPartitioning(rootPlan)
}

{code}


> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
>                 Key: SPARK-54437
>                 URL: https://issues.apache.org/jira/browse/SPARK-54437
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>
> Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition` 
> (instead of the default UnknownPartition(0)) when the query runs over small 
> data to help Spark better generate efficient execution plans for small 
> queries.
> Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark. 
> This doc only suggests a minimal effort, low-hanging fruit change that sets 
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is 
> small over a set of supported plan shapes.
> Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> Its input is LocalTableExec, which uses the default UnknownPartitioning(0)
> Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
> apply an Exchange node (to ensure partitioning requirement), which would 
> later turn into a shuffle implementation. Shuffle would break the execution 
> to more stages and each stage will loop into a full lifecycle of Spark jobs, 
> which is expensive if the query itself only needs a few hundred of million 
> seconds to run. This is true if Spark only runs a very simple aggregation 
> over a LocalTableScan with only a few thousand of rows or a small FileScan 
> with <= 64MB. However for such small data, all the operators running into a 
> single stage without shuffle will be sufficient enough, and much faster.
>  
> Q4. What is new in your approach and why do you think it will be successful?
> First of all, our approach will be guarded by a config and it can be turned 
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> Verify if the plan shape is supported,
> Verify if the input data size is small enough
> Set the output partitioning to SinglePartition on input data source including 
> LocalTableScan and FileScan
> Propagate the SinglePartition through the query plan.
> Spark will utilize SinglePartition information to eliminate unneeded shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on && 
> isPlanSupported(queryPlan)) {
>  plan.transform {
>       case l: LocalTableScan if isLocalTableSmall(l) => SetSinglePartition(l)
>       case f: FileScan if isLocalTableSmall(f) => SetSinglePartition(f)
>   }
> def setOutputPartitioning(plan: LogicalPlan) {
>    for (child: plan.childrens) {
>       setOutputPartitioning(child)
>   }
>   plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
>    setOutputPartitioning(rootPlan)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to