Dear all,

The initial support of adaptive 
execution[SPARK-9850<https://issues.apache.org/jira/browse/SPARK-9850>] in 
Spark SQL has been there since Spark 1.6, but there is no more update since 
then. One of the key features in adaptive execution is to determine the number 
of reducer automatically at runtime. This is a feature required by many Spark 
users especially the infrastructure team in many companies, as there are 
thousands of queries running on the cluster where the shuffle partition number 
may not be set properly for every query. The same shuffle partition number also 
doesn't work well for all stages in a query because each stage has different 
input data size. Other features in adaptive execution include optimizing join 
strategy at runtime and handling skewed join automatically, which have not been 
implemented in Spark.

In the current implementation, an Exchange coordinator is used to determine the 
number of post-shuffle partitions for a stage. However, exchange coordinator is 
added when Exchange is being added, so it actually lacks a global picture of 
all shuffle dependencies of a post-shuffle stage.  I.e. for 3 tables' join in a 
single stage, the same ExchangeCoordinator should be used in three Exchanges 
but currently two separated ExchangeCoordinator will be added. It also adds 
additional Exchanges in some cases. So I think it is time to rethink how to 
better support adaptive execution in Spark SQL. I have proposed a new approach 
in SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128>. A document 
about the idea is described at 
here<https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
 The idea about how to changing a sort merge join to a broadcast hash join at 
runtime is also described in a separated 
doc<https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.

The docs have been there for a while, and I also had an implementation based on 
Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive. The 
code is split into 7 PRs labeled with AE2.3-0x if you look at the pull 
requests. I asked many partners to evaluate the patch including Baidu, Alibaba, 
JD.com, etc and received very good feedback. Baidu also shared their result at 
the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch 
which passed all queries with good performance improvement.

I'd like to call for a review on the docs and even code and we can further 
discuss in this thread. Thanks very much!

Thanks,
Carson

Reply via email to