Hi,

I would like to share some experience when using AE in eBay’s data warehouse.


  1.  Saving many manual setting and tuning effort. Setting shuffle.partition 
one by one query is annoy, with AE, we just need set a big number for all 
queries.
  2.  Saving memory. With AE, we can start less executors and less tasks for 
those small stages, it greatly helps save memory. We see many queries has 1.5x 
improvement on MB-Seconds.
  3.  Reducing execution time when SortMergeJoin to BroadcastHashJoin. Some 
kinds of queries could really benefit a lot from this, especially when one side 
of intermediate tables is skewed or quite large, we see 2-3x improvement.
  4.  Reducing number of output files. Based on the partition data size, AE can 
help merge small files in the last stage. Quite helpful to HDFS.
  5.  Handling skewed join. Data skew is difficult to handle in current Spark 
engine, because it is quite hard to detect at planning phase. AE can detect the 
skewed data in the runtime, and dynamically increases parallelism for the 
skewed task. We see 1.6x improvement on execution time.

AE is landing in our biggest Spark cluster, we can share more real production 
performance numbers in near future.

Thanks,
Yucai

From: "Wang, Carson" <carson.w...@intel.com>
Date: Tuesday, July 31, 2018 at 4:06 PM
To: Marco Gaido <marcogaid...@gmail.com>, Wenchen Fan <cloud0...@gmail.com>
Cc: "xyliyuanj...@gmail.com" <xyliyuanj...@gmail.com>, Spark dev list 
<dev@spark.apache.org>
Subject: RE: [DISCUSS] Adaptive execution in Spark SQL

Thanks Marco and Wenchen for reviewing. It sounds good to target this for 3.0.

I can also share more data on the benchmark. In the 100 TB TPC-DS benchmark we 
performed on a 100-node cluster, we saw 90% of the 103 queries had performance 
gain, and 46% of them are more than 1.1x faster. Individual query can have up 
to 3.8x performance gain(q8: 3.8x, q81:2.1x, q30: 2.1x, q51: 1.8x, q61: 1.6x, 
q60: 1.6x …).  In addition, 5 queries failed earlier can pass in adaptive 
execution mode successfully. The detailed report is also available 
here<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsoftware.intel.com%2Fen-us%2Farticles%2Fspark-sql-adaptive-execution-at-100-tb&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810464807&sdata=dtsXUGaQAQbyCIUHIkjEh%2BPtdX%2BMnZeBuIOb%2FaAjsk0%3D&reserved=0>.

Thanks,
Carson

From: Marco Gaido [mailto:marcogaid...@gmail.com]
Sent: Tuesday, July 31, 2018 3:00 PM
To: Wenchen Fan <cloud0...@gmail.com>
Cc: xyliyuanj...@gmail.com; Wang, Carson <carson.w...@intel.com>; Spark dev 
list <dev@spark.apache.org>
Subject: Re: [DISCUSS] Adaptive execution in Spark SQL

Hi all,

I also like this idea very much and I think it may bring also other performance 
improvements in the future.

Thanks to everybody who worked on this.

I agree to target this feature for 3.0.

Thanks everybody,
Bests.
Marco

On Tue, 31 Jul 2018, 08:39 Wenchen Fan, 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
Hi Carson and Yuanjian,

Thanks for contributing to this project and sharing the production use cases! I 
believe the adaptive execution will be a very important feature of Spark SQL 
and will definitely benefit a lot of users.

I went through the design docs and the high-level design totally makes sense to 
me. Since the code freeze of Spark 2.4 is close, I'm afraid we may not have 
enough time to review the code and merge it, how about we target this feature 
to Spark 3.0?

Besides, it would be great if we can have some real benchmark numbers for it.

Thanks,
Wenchen

On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li 
<xyliyuanj...@gmail.com<mailto:xyliyuanj...@gmail.com>> wrote:
Thanks Carson, great note!
Actually Baidu has ported this patch in our internal folk. I collected some 
user cases and performance improve effect during Baidu internal usage of this 
patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 
50% to 200% boosting on query performance, and this strategy alway hit the BI 
scenario like join several tables with filter strategy in subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application 
near 1 hour. Using Spark as a service refers to use spark-shell and keep submit 
sql or use the service of Spark like Zeppelin, Livy or our internal sql service 
Baidu BigSQL. In such scenario, all spark jobs share same partition number, so 
enable AE and add configs about expected task info including data size, row 
number, min\max partition number and etc, will bring us 50%-100% boosting on 
performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 
2-dimension graph with 1 billion edges, use the connected componentsalgorithm 
in GraphFrame. With enabling AE, the duration of app reduce from 58min to 
32min, almost 100% boosting on performance improvement.

The detailed screenshot and config in the JIRA 
SPARK-23128<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-23128&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810474820&sdata=yz0FmcTbe8vd1sVYU1EJ%2FD2yO%2FuLvRZzMBWe0eUUot8%3D&reserved=0>
 attached pdf.

Thanks,
Yuanjian Li

Wang, Carson <carson.w...@intel.com<mailto:carson.w...@intel.com>> 
于2018年7月28日周六 上午12:49写道:
Dear all,

The initial support of adaptive 
execution[SPARK-9850<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-9850&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810484828&sdata=kX0ktjuKoxlaMl%2FW%2BlxciIzAbCxHfadiLTMGxzmI6Wc%3D&reserved=0>]
 in Spark SQL has been there since Spark 1.6, but there is no more update since 
then. One of the key features in adaptive execution is to determine the number 
of reducer automatically at runtime. This is a feature required by many Spark 
users especially the infrastructure team in many companies, as there are 
thousands of queries running on the cluster where the shuffle partition number 
may not be set properly for every query. The same shuffle partition number also 
doesn’t work well for all stages in a query because each stage has different 
input data size. Other features in adaptive execution include optimizing join 
strategy at runtime and handling skewed join automatically, which have not been 
implemented in Spark.

In the current implementation, an Exchange coordinator is used to determine the 
number of post-shuffle partitions for a stage. However, exchange coordinator is 
added when Exchange is being added, so it actually lacks a global picture of 
all shuffle dependencies of a post-shuffle stage.  I.e. for 3 tables’ join in a 
single stage, the same ExchangeCoordinator should be used in three Exchanges 
but currently two separated ExchangeCoordinator will be added. It also adds 
additional Exchanges in some cases. So I think it is time to rethink how to 
better support adaptive execution in Spark SQL. I have proposed a new approach 
in 
SPARK-23128<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-23128&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810494835&sdata=BNtk434cEZ9fm%2FeZH1ytRXwGSc2cMIJ1UA60r3qYDs8%3D&reserved=0>.
 A document about the idea is described at 
here<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k%2Fedit%3Fusp%3Dsharing&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810504843&sdata=KBPriP1NUulHFE6D04szN6yaTHALUiIoJp%2FQKNRbMck%3D&reserved=0>.
 The idea about how to changing a sort merge join to a broadcast hash join at 
runtime is also described in a separated 
doc<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw%2Fedit%3Fusp%3Dsharing&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810514851&sdata=eh%2FRfsTgIEOEmGP2bJj1oKwxHenWZSldPe6K9SmfMhQ%3D&reserved=0>.

The docs have been there for a while, and I also had an implementation based on 
Spark 2.3 available at 
https://github.com/Intel-bigdata/spark-adaptive<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FIntel-bigdata%2Fspark-adaptive&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810524858&sdata=BEfSDp3Wt%2BhCwCT22gmiIJQSA5AlWUziAd%2Fph9zCevM%3D&reserved=0>.
 The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull 
requests. I asked many partners to evaluate the patch including Baidu, Alibaba, 
JD.com, etc and received very good feedback. Baidu also shared their result at 
the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch 
which passed all queries with good performance improvement.

I’d like to call for a review on the docs and even code and we can further 
discuss in this thread. Thanks very much!

Thanks,
Carson

Reply via email to