[ 
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276055#comment-15276055
 ] 

Yi Zhou commented on SPARK-15219:
---------------------------------

Posted the core physical plan

> [Spark SQL] it don't support to detect runtime temporary table for enabling 
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15219
>                 URL: https://issues.apache.org/jira/browse/SPARK-15219
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map 
> Join in Hive) when comparing the implementation by Hive on MR engine. The 
> blew query is a multi-way join operation based on 3 tables including 
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from 
> ‘select’ query operation and also there is a two-way join(1 table and 1 
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower 
> performance than Hive on MR. We investigated the difference between Spark SQL 
> and Hive on MR engine and found that there are total of 5 map join tasks with 
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash 
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for 
> broadcast hash join. From our investigation, it seems that if there is 
> run-time temporary table in join operation in Spark SQL engine it will not 
> detect such table for enabling broadcast hash join optimization. 
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
> select clause is not allowed
>   SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
>   (
>     item_sk,
>     review_sentence,
>     sentiment,
>     sentiment_word
>   )
>   FROM product_reviews pr,
>   (
>     --store returns in week ending given date
>     SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
>     FROM store_returns sr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) sr_dateFilter
>     WHERE sr.sr_returned_date_sk = d_date_sk
>     GROUP BY sr_item_sk --across all store and web channels
>     HAVING sr_item_qty > 0
>   ) fsr,
>   (
>     --web returns in week ending given date
>     SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
>     FROM web_returns wr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) wr_dateFilter
>     WHERE wr.wr_returned_date_sk = d_date_sk
>     GROUP BY wr_item_sk  --across all store and web channels
>     HAVING wr_item_qty > 0
>   ) fwr
>   WHERE fsr.sr_item_sk = fwr.wr_item_sk
>   AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
>   -- equivalent across all store and web channels (within a tolerance of +/- 
> 10%)
>   AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
> Physical Plan:
> {code}
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation bigbench_3t_sparksql, 
> q19_spark_sql_run_query_0_result, None, Map(), false, false
> +- ConvertToSafe
>    +- Sort [item_sk#537L ASC,review_sentence#538 ASC,sentiment#539 
> ASC,sentiment_word#540 ASC], true, 0
>       +- ConvertToUnsafe
>          +- Exchange rangepartitioning(item_sk#537L ASC,review_sentence#538 
> ASC,sentiment#539 ASC,sentiment_word#540 ASC,200), None
>             +- ConvertToSafe
>                +- Project 
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
>                   +- Filter (sentiment#539 = NEG)
>                      +- !Generate 
> HiveGenericUDTF#io.bigdatabenchmark.v1.queries.q10.SentimentUDF(pr_item_sk#363L,pr_review_content#366),
>  false, false, 
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
>                         +- ConvertToSafe
>                            +- Project [pr_item_sk#363L,pr_review_content#366]
>                               +- Filter (abs((cast((sr_item_qty#356L - 
> wr_item_qty#357L) as double) / (cast((sr_item_qty#356L + wr_item_qty#357L) as 
> double) / 2.0))) <= 0.1)
>                                  +- SortMergeJoin [sr_item_sk#369L], 
> [wr_item_sk#445L]
>                                     :- Sort [sr_item_sk#369L ASC], false, 0
>                                     :  +- Project 
> [pr_item_sk#363L,sr_item_qty#356L,pr_review_content#366,sr_item_sk#369L]
>                                     :     +- SortMergeJoin [pr_item_sk#363L], 
> [sr_item_sk#369L]
>                                     :        :- Sort [pr_item_sk#363L ASC], 
> false, 0
>                                     :        :  +- TungstenExchange 
> hashpartitioning(pr_item_sk#363L,200), None
>                                     :        :     +- ConvertToUnsafe
>                                     :        :        +- HiveTableScan 
> [pr_item_sk#363L,pr_review_content#366], MetastoreRelation 
> bigbench_3t_sparksql, product_reviews, Some(pr)
>                                     :        +- Sort [sr_item_sk#369L ASC], 
> false, 0
>                                     :           +- Filter (sr_item_qty#356L > 
> 0)
>                                     :              +- 
> TungstenAggregate(key=[sr_item_sk#369L], 
> functions=[(sum(cast(sr_return_quantity#377 as 
> bigint)),mode=Final,isDistinct=false)], 
> output=[sr_item_sk#369L,sr_item_qty#356L])
>                                     :                 +- TungstenExchange 
> hashpartitioning(sr_item_sk#369L,200), None
>                                     :                    +- 
> TungstenAggregate(key=[sr_item_sk#369L], 
> functions=[(sum(cast(sr_return_quantity#377 as 
> bigint)),mode=Partial,isDistinct=false)], output=[sr_item_sk#369L,sum#551L])
>                                     :                       +- Project 
> [sr_item_sk#369L,sr_return_quantity#377]
>                                     :                          +- 
> SortMergeJoin [sr_returned_date_sk#367L], [d_date_sk#387L]
>                                     :                             :- Sort 
> [sr_returned_date_sk#367L ASC], false, 0
>                                     :                             :  +- 
> TungstenExchange hashpartitioning(sr_returned_date_sk#367L,200), None
>                                     :                             :     +- 
> ConvertToUnsafe
>                                     :                             :        +- 
> HiveTableScan 
> [sr_item_sk#369L,sr_return_quantity#377,sr_returned_date_sk#367L], 
> MetastoreRelation bigbench_3t_sparksql, store_returns, Some(sr)
>                                     :                             +- Sort 
> [d_date_sk#387L ASC], false, 0
>                                     :                                +- 
> TungstenExchange hashpartitioning(d_date_sk#387L,200), None
>                                     :                                   +- 
> Project [d_date_sk#387L]
>                                     :                                      +- 
> BroadcastHashJoin [d_week_seq#391], [d_week_seq#419], BuildRight
>                                     :                                         
> :- ConvertToUnsafe
>                                     :                                         
> :  +- HiveTableScan [d_date_sk#387L,d_week_seq#391], MetastoreRelation 
> bigbench_3t_sparksql, date_dim, Some(d1)
>                                     :                                         
> +- Project [d_week_seq#419]
>                                     :                                         
>    +- Filter d_date#417 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
>                                     :                                         
>       +- HiveTableScan [d_week_seq#419,d_date#417], MetastoreRelation 
> bigbench_3t_sparksql, date_dim, Some(d2)
>                                     +- Sort [wr_item_sk#445L ASC], false, 0
>                                        +- Filter (wr_item_qty#357L > 0)
>                                           +- 
> TungstenAggregate(key=[wr_item_sk#445L], 
> functions=[(sum(cast(wr_return_quantity#457 as 
> bigint)),mode=Final,isDistinct=false)], 
> output=[wr_item_sk#445L,wr_item_qty#357L])
>                                              +- TungstenExchange 
> hashpartitioning(wr_item_sk#445L,200), None
>                                                 +- 
> TungstenAggregate(key=[wr_item_sk#445L], 
> functions=[(sum(cast(wr_return_quantity#457 as 
> bigint)),mode=Partial,isDistinct=false)], output=[wr_item_sk#445L,sum#554L])
>                                                    +- Project 
> [wr_item_sk#445L,wr_return_quantity#457]
>                                                       +- SortMergeJoin 
> [wr_returned_date_sk#443L], [d_date_sk#467L]
>                                                          :- Sort 
> [wr_returned_date_sk#443L ASC], false, 0
>                                                          :  +- 
> TungstenExchange hashpartitioning(wr_returned_date_sk#443L,200), None
>                                                          :     +- 
> ConvertToUnsafe
>                                                          :        +- 
> HiveTableScan 
> [wr_item_sk#445L,wr_return_quantity#457,wr_returned_date_sk#443L], 
> MetastoreRelation bigbench_3t_sparksql, web_returns, Some(wr)
>                                                          +- Sort 
> [d_date_sk#467L ASC], false, 0
>                                                             +- 
> TungstenExchange hashpartitioning(d_date_sk#467L,200), None
>                                                                +- Project 
> [d_date_sk#467L]
>                                                                   +- 
> BroadcastHashJoin [d_week_seq#471], [d_week_seq#499], BuildRight
>                                                                      :- 
> ConvertToUnsafe
>                                                                      :  +- 
> HiveTableScan [d_date_sk#467L,d_week_seq#471], MetastoreRelation 
> bigbench_3t_sparksql, date_dim, Some(d1)
>                                                                      +- 
> Project [d_week_seq#499]
>                                                                         +- 
> Filter d_date#497 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
>                                                                            +- 
> HiveTableScan [d_week_seq#499,d_date#497], MetastoreRelation 
> bigbench_3t_sparksql, date_dim, Some(d2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to