Hi all,

I also like this idea very much and I think it may bring also other
performance improvements in the future.

Thanks to everybody who worked on this.

I agree to target this feature for 3.0.

Thanks everybody,
Bests.
Marco

On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cloud0...@gmail.com> wrote:

> Hi Carson and Yuanjian,
>
> Thanks for contributing to this project and sharing the production use
> cases! I believe the adaptive execution will be a very important feature of
> Spark SQL and will definitely benefit a lot of users.
>
> I went through the design docs and the high-level design totally makes
> sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may
> not have enough time to review the code and merge it, how about we target
> this feature to Spark 3.0?
>
> Besides, it would be great if we can have some real benchmark numbers for
> it.
>
> Thanks,
> Wenchen
>
> On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xyliyuanj...@gmail.com>
> wrote:
>
>> Thanks Carson, great note!
>> Actually Baidu has ported this patch in our internal folk. I collected
>> some user cases and performance improve effect during Baidu internal usage
>> of this patch, summarize as following 3 scenario:
>> 1. SortMergeJoin to BroadcastJoin
>> The SortMergeJoin transform to BroadcastJoin over deeply tree node can
>> bring us 50% to 200% boosting on query performance, and this strategy alway
>> hit the BI scenario like join several tables with filter strategy in
>> subquery
>> 2. Long running application or use Spark as a service
>> In this case, long running application refers to the duration of
>> application near 1 hour. Using Spark as a service refers to use spark-shell
>> and keep submit sql or use the service of Spark like Zeppelin, Livy or our
>> internal sql service Baidu BigSQL. In such scenario, all spark jobs share
>> same partition number, so enable AE and add configs about expected task
>> info including data size, row number, min\max partition number and etc,
>> will bring us 50%-100% boosting on performance improvement.
>> 3. GraphFrame jobs
>> The last scenario is the application use GraphFrame, in this case, user
>> has a 2-dimension graph with 1 billion edges, use the connected
>> componentsalgorithm in GraphFrame. With enabling AE, the duration of app
>> reduce from 58min to 32min, almost 100% boosting on performance improvement.
>>
>> The detailed screenshot and config in the JIRA SPARK-23128
>> <https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.
>>
>> Thanks,
>> Yuanjian Li
>>
>> Wang, Carson <carson.w...@intel.com> 于2018年7月28日周六 上午12:49写道:
>>
>>> Dear all,
>>>
>>>
>>>
>>> The initial support of adaptive execution[SPARK-9850
>>> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has
>>> been there since Spark 1.6, but there is no more update since then. One of
>>> the key features in adaptive execution is to determine the number of
>>> reducer automatically at runtime. This is a feature required by many Spark
>>> users especially the infrastructure team in many companies, as there are
>>> thousands of queries running on the cluster where the shuffle partition
>>> number may not be set properly for every query. The same shuffle partition
>>> number also doesn’t work well for all stages in a query because each stage
>>> has different input data size. Other features in adaptive execution include
>>> optimizing join strategy at runtime and handling skewed join automatically,
>>> which have not been implemented in Spark.
>>>
>>>
>>>
>>> In the current implementation, an Exchange coordinator is used to
>>> determine the number of post-shuffle partitions for a stage. However,
>>> exchange coordinator is added when Exchange is being added, so it actually
>>> lacks a global picture of all shuffle dependencies of a post-shuffle
>>> stage.  I.e. for 3 tables’ join in a single stage, the same
>>> ExchangeCoordinator should be used in three Exchanges but currently two
>>> separated ExchangeCoordinator will be added. It also adds additional
>>> Exchanges in some cases. So I think it is time to rethink how to better
>>> support adaptive execution in Spark SQL. I have proposed a new approach in
>>> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A
>>> document about the idea is described at here
>>> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
>>> The idea about how to changing a sort merge join to a broadcast hash join
>>> at runtime is also described in a separated doc
>>> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.
>>>
>>>
>>>
>>>
>>> The docs have been there for a while, and I also had an implementation
>>> based on Spark 2.3 available at
>>> https://github.com/Intel-bigdata/spark-adaptive. The code is split into
>>> 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many
>>> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and
>>> received very good feedback. Baidu also shared their result at the Jira. We
>>> also finished a 100 TB TPC-DS benchmark earlier using the patch which
>>> passed all queries with good performance improvement.
>>>
>>>
>>>
>>> I’d like to call for a review on the docs and even code and we can
>>> further discuss in this thread. Thanks very much!
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Carson
>>>
>>>
>>>
>>

Reply via email to