[ 
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-54437:
-----------------------------
    Description: 
h4. Q1. What are you trying to do?
This document suggests that we set `outputPartitioning` to `SinglePartition` 
(instead of the default UnknownPartition(0)) when the query runs over small 
data to help Spark better generate efficient execution plans for small queries.


h4. Q2. What problem is this proposal NOT designed to solve?
This doc does not propose implementing a new fast execution engine for Spark. 
This doc only suggests a minimal effort, low-hanging fruit change that sets 
`outputPartitioning` to `SinglePartition` when we detect Spark input data is 
small over a set of supported plan shapes.

h4. Q3. How is it done today, and what are the limits of current practice?
Currently, for a query plan if
Its input is LocalTableExec, which uses the [default 
UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163]
Its input is DataSourceScanExec, which also [uses 
UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L478]

The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
apply an Exchange node (to ensure partitioning requirement), which would later 
turn into a shuffle implementation. Shuffle would break the execution to more 
stages and each stage will loop into a full lifecycle of Spark jobs, which is 
expensive if the query itself only needs a few hundred of million seconds to 
run. This is true if Spark only runs a very simple aggregation over a 
LocalTableScan with only a few thousand of rows or a small FileScan with <= 
64MB. For such small data, all the operators running into a single stage 
without shuffle will be sufficient enough, and much faster.

 
h4. Q4. What is new in your approach and why do you think it will be successful?
First of all, our approach will be guarded by a config and it can be turned 
off, and it is supposed to be turned off by default.

The new framework proposed here will
# Verify if the plan shape is supported,
# Verify if the input data size is small enough
# Set the output partitioning to SinglePartition on input data source including 
LocalTableScan and FileScan
# Propagate the SinglePartition through the query plan.
# Spark will utilize SinglePartition information to eliminate unneeded shuffles.

the rough code looks like the following:


{code:scala}
def exploreSinglePartitionOpportunites() = if (framework_is_on && 
isPlanSupported(queryPlan)) {
 plan.transform {
      case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
      case f: FileScan if isSmall(f) => SetSinglePartition(f)
  }

def setOutputPartitioning(plan: LogicalPlan) {
   for (child: plan.childrens) {
      setOutputPartitioning(child)
  }
  plan.setOutputPartitioning(SinglePartition)
}

if (exploreSinglePartitionOpportunites()) {
   setOutputPartitioning(rootPlan)
}

{code}


An example of running a simple select query with filter, agg and sort by. The 
input is a LocalTableScan with a few thousand rows.

With this framework enabled, we only observed one stage/one job created by 
Spark and   e2e runtime was 150ms:

 !without.png! 

Without this framework enabled,  we observed two stages/two jobs. 330 ms e2e 
runtime.
 !with.png! 


This is a 2x boost on the selected example. The reason for the speedup is that 
the data size is small enough to not need more than one stage (one thread, one 
job, etc.) to execute. We are able to tell Spark that this execution could 
utilize SinglePartition thus Spark does not add the extra shuffle to ensure 
requirement.

h4. Q5. Who cares? If you are successful, what difference will it make?
The majority of the Spark workload is not as huge as scanning terabytes of 
data. They often only scan one single file after file prunning and some 
megabytes, or consume a small local table. It could be an order of magnitude of 
tens of millions of queries that could be sped up to a certain percentage. For 
any means, this is non-trival impact in terms of its coverage.


h4. Q6. What are the risks?
SinglePartition setup might cause performance regression for certain cases like 
JOIN with high cardinality. For example, even though the JOIN input could be 
small, the output could explode. This risk is being addressed by adding a layer 
in the framework to verify the query plan shape and only support a selected 
plan shape at the beginning. For example for small data, aggregation and sort 
should almost always be safe.

The framework will also by default turn off until it is improved and expanded 
by community and all feedback says it is mature to turn on as default.

h4. Q7. How long will it take?

About 4 weeks to build 
The framework
Support for basic plan shape
Macro benchmark to help developers measure the performance if they start extend 
the framework and the supported plan shape.


h4. Q8. What are the mid-term and final “exams” to check for success?
Enable the framework, having benchmarks to demonstrate the small query speeds 
up.


  was:
h4. Q1. What are you trying to do?
This document suggests that we set `outputPartitioning` to `SinglePartition` 
(instead of the default UnknownPartition(0)) when the query runs over small 
data to help Spark better generate efficient execution plans for small queries.


h4. Q2. What problem is this proposal NOT designed to solve?
This doc does not propose implementing a new fast execution engine for Spark. 
This doc only suggests a minimal effort, low-hanging fruit change that sets 
`outputPartitioning` to `SinglePartition` when we detect Spark input data is 
small over a set of supported plan shapes.

h4. Q3. How is it done today, and what are the limits of current practice?
Currently, for a query plan if
Its input is LocalTableExec, which uses the [default 
UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163]
Its input is DataSourceScanExec, which also [uses 
UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L478]

The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
apply an Exchange node (to ensure partitioning requirement), which would later 
turn into a shuffle implementation. Shuffle would break the execution to more 
stages and each stage will loop into a full lifecycle of Spark jobs, which is 
expensive if the query itself only needs a few hundred of million seconds to 
run. This is true if Spark only runs a very simple aggregation over a 
LocalTableScan with only a few thousand of rows or a small FileScan with <= 
64MB. For such small data, all the operators running into a single stage 
without shuffle will be sufficient enough, and much faster.

 
h4. Q4. What is new in your approach and why do you think it will be successful?
First of all, our approach will be guarded by a config and it can be turned 
off, and it is supposed to be turned off by default.

The new framework proposed here will
Verify if the plan shape is supported,
Verify if the input data size is small enough
Set the output partitioning to SinglePartition on input data source including 
LocalTableScan and FileScan
Propagate the SinglePartition through the query plan.
Spark will utilize SinglePartition information to eliminate unneeded shuffles.

the rough code looks like the following:


{code:scala}
def exploreSinglePartitionOpportunites() = if (framework_is_on && 
isPlanSupported(queryPlan)) {
 plan.transform {
      case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
      case f: FileScan if isSmall(f) => SetSinglePartition(f)
  }

def setOutputPartitioning(plan: LogicalPlan) {
   for (child: plan.childrens) {
      setOutputPartitioning(child)
  }
  plan.setOutputPartitioning(SinglePartition)
}

if (exploreSinglePartitionOpportunites()) {
   setOutputPartitioning(rootPlan)
}

{code}


An example of running a simple select query with filter, agg and sort by. The 
input is a LocalTableScan with a few thousand rows.

With this framework enabled, we only observed one stage/one job created by 
Spark and   e2e runtime was 150ms:

 !without.png! 

Without this framework enabled,  we observed two stages/two jobs. 330 ms e2e 
runtime.
 !with.png! 


This is a 2x boost on the selected example. The reason for the speedup is that 
the data size is small enough to not need more than one stage (one thread, one 
job, etc.) to execute. We are able to tell Spark that this execution could 
utilize SinglePartition thus Spark does not add the extra shuffle to ensure 
requirement.

h4. Q5. Who cares? If you are successful, what difference will it make?
The majority of the Spark workload is not as huge as scanning terabytes of 
data. They often only scan one single file after file prunning and some 
megabytes, or consume a small local table. It could be an order of magnitude of 
tens of millions of queries that could be sped up to a certain percentage. For 
any means, this is non-trival impact in terms of its coverage.


h4. Q6. What are the risks?
SinglePartition setup might cause performance regression for certain cases like 
JOIN with high cardinality. For example, even though the JOIN input could be 
small, the output could explode. This risk is being addressed by adding a layer 
in the framework to verify the query plan shape and only support a selected 
plan shape at the beginning. For example for small data, aggregation and sort 
should almost always be safe.

The framework will also by default turn off until it is improved and expanded 
by community and all feedback says it is mature to turn on as default.

h4. Q7. How long will it take?

About 4 weeks to build 
The framework
Support for basic plan shape
Macro benchmark to help developers measure the performance if they start extend 
the framework and the supported plan shape.


h4. Q8. What are the mid-term and final “exams” to check for success?
Enable the framework, having benchmarks to demonstrate the small query speeds 
up.



> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
>                 Key: SPARK-54437
>                 URL: https://issues.apache.org/jira/browse/SPARK-54437
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>         Attachments: with.png, without.png
>
>
> h4. Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition` 
> (instead of the default UnknownPartition(0)) when the query runs over small 
> data to help Spark better generate efficient execution plans for small 
> queries.
> h4. Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark. 
> This doc only suggests a minimal effort, low-hanging fruit change that sets 
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is 
> small over a set of supported plan shapes.
> h4. Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> Its input is LocalTableExec, which uses the [default 
> UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163]
> Its input is DataSourceScanExec, which also [uses 
> UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L478]
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
> apply an Exchange node (to ensure partitioning requirement), which would 
> later turn into a shuffle implementation. Shuffle would break the execution 
> to more stages and each stage will loop into a full lifecycle of Spark jobs, 
> which is expensive if the query itself only needs a few hundred of million 
> seconds to run. This is true if Spark only runs a very simple aggregation 
> over a LocalTableScan with only a few thousand of rows or a small FileScan 
> with <= 64MB. For such small data, all the operators running into a single 
> stage without shuffle will be sufficient enough, and much faster.
>  
> h4. Q4. What is new in your approach and why do you think it will be 
> successful?
> First of all, our approach will be guarded by a config and it can be turned 
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> # Verify if the plan shape is supported,
> # Verify if the input data size is small enough
> # Set the output partitioning to SinglePartition on input data source 
> including LocalTableScan and FileScan
> # Propagate the SinglePartition through the query plan.
> # Spark will utilize SinglePartition information to eliminate unneeded 
> shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on && 
> isPlanSupported(queryPlan)) {
>  plan.transform {
>       case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
>       case f: FileScan if isSmall(f) => SetSinglePartition(f)
>   }
> def setOutputPartitioning(plan: LogicalPlan) {
>    for (child: plan.childrens) {
>       setOutputPartitioning(child)
>   }
>   plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
>    setOutputPartitioning(rootPlan)
> }
> {code}
> An example of running a simple select query with filter, agg and sort by. The 
> input is a LocalTableScan with a few thousand rows.
> With this framework enabled, we only observed one stage/one job created by 
> Spark and   e2e runtime was 150ms:
>  !without.png! 
> Without this framework enabled,  we observed two stages/two jobs. 330 ms e2e 
> runtime.
>  !with.png! 
> This is a 2x boost on the selected example. The reason for the speedup is 
> that the data size is small enough to not need more than one stage (one 
> thread, one job, etc.) to execute. We are able to tell Spark that this 
> execution could utilize SinglePartition thus Spark does not add the extra 
> shuffle to ensure requirement.
> h4. Q5. Who cares? If you are successful, what difference will it make?
> The majority of the Spark workload is not as huge as scanning terabytes of 
> data. They often only scan one single file after file prunning and some 
> megabytes, or consume a small local table. It could be an order of magnitude 
> of tens of millions of queries that could be sped up to a certain percentage. 
> For any means, this is non-trival impact in terms of its coverage.
> h4. Q6. What are the risks?
> SinglePartition setup might cause performance regression for certain cases 
> like JOIN with high cardinality. For example, even though the JOIN input 
> could be small, the output could explode. This risk is being addressed by 
> adding a layer in the framework to verify the query plan shape and only 
> support a selected plan shape at the beginning. For example for small data, 
> aggregation and sort should almost always be safe.
> The framework will also by default turn off until it is improved and expanded 
> by community and all feedback says it is mature to turn on as default.
> h4. Q7. How long will it take?
> About 4 weeks to build 
> The framework
> Support for basic plan shape
> Macro benchmark to help developers measure the performance if they start 
> extend the framework and the supported plan shape.
> h4. Q8. What are the mid-term and final “exams” to check for success?
> Enable the framework, having benchmarks to demonstrate the small query speeds 
> up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to