[
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Carson Wang updated SPARK-23128:
--------------------------------
Description:
SPARK-9850 proposed the basic idea of adaptive execution in Spark. In
DAGScheduler, a new API is added to support submitting a single map stage. The
current implementation of adaptive execution in Spark SQL supports changing the
reducer number at runtime. An Exchange coordinator is used to determine the
number of post-shuffle partitions for a stage that needs to fetch shuffle data
from one or multiple stages. The current implementation adds
ExchangeCoordinator while we are adding Exchanges. However there are some
limitations. First, it may cause additional shuffles that may decrease the
performance. We can see this from EnsureRequirements rule when it adds
ExchangeCoordinator. Secondly, it is not a good idea to add
ExchangeCoordinators while we are adding Exchanges because we don’t have a
global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3
tables’ join in a single stage, the same ExchangeCoordinator should be used in
three Exchanges but currently two separated ExchangeCoordinator will be added.
Thirdly, with the current framework it is not easy to implement other features
in adaptive execution flexibly like changing the execution plan and handling
skewed join at runtime.
We'd like to introduce a new way to do adaptive execution in Spark SQL and
address the limitations. The idea is described at
[https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]
was:
SPARK-9850 proposed the basic idea of adaptive execution in Spark. In
DAGScheduler, a new API is added to support submitting a single map stage. The
current implementation of adaptive execution in Spark SQL supports changing the
reducer number at runtime. An Exchange coordinator is used to determine the
number of post-shuffle partitions for a stage that needs to fetch shuffle data
from one or multiple stages. The current implementation adds
ExchangeCoordinator while we are adding Exchanges. However there are some
limitations. First, it may cause additional shuffles that may decrease the
performance. We can see this from EnsureRequirements rule when it adds
ExchangeCoordinator. Secondly, it is not a good idea to add
ExchangeCoordinators while we are adding Exchanges because we don’t have a
global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3
tables’ join in a single stage, the same ExchangeCoordinator should be used in
three Exchanges but currently two separated ExchangeCoordinator will be added.
Thirdly, with the current framework it is not easy to implement other features
in adaptive execution flexibly like changing the execution plan and handling
skewed join at runtime.
We'd like to introduce QueryStage and a new way to do adaptive execution in
Spark SQL and address the limitations. The idea is described at
https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing
> A new approach to do adaptive execution in Spark SQL
> ----------------------------------------------------
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.2.1
> Reporter: Carson Wang
> Priority: Major
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In
> DAGScheduler, a new API is added to support submitting a single map stage.
> The current implementation of adaptive execution in Spark SQL supports
> changing the reducer number at runtime. An Exchange coordinator is used to
> determine the number of post-shuffle partitions for a stage that needs to
> fetch shuffle data from one or multiple stages. The current implementation
> adds ExchangeCoordinator while we are adding Exchanges. However there are
> some limitations. First, it may cause additional shuffles that may decrease
> the performance. We can see this from EnsureRequirements rule when it adds
> ExchangeCoordinator. Secondly, it is not a good idea to add
> ExchangeCoordinators while we are adding Exchanges because we don’t have a
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used
> in three Exchanges but currently two separated ExchangeCoordinator will be
> added. Thirdly, with the current framework it is not easy to implement other
> features in adaptive execution flexibly like changing the execution plan and
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and
> address the limitations. The idea is described at
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]