[
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276055#comment-15276055
]
Yi Zhou commented on SPARK-15219:
---------------------------------
Posted the core physical plan
> [Spark SQL] it don't support to detect runtime temporary table for enabling
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-15219
> URL: https://issues.apache.org/jira/browse/SPARK-15219
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map
> Join in Hive) when comparing the implementation by Hive on MR engine. The
> blew query is a multi-way join operation based on 3 tables including
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from
> ‘select’ query operation and also there is a two-way join(1 table and 1
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower
> performance than Hive on MR. We investigated the difference between Spark SQL
> and Hive on MR engine and found that there are total of 5 map join tasks with
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for
> broadcast hash join. From our investigation, it seems that if there is
> run-time temporary table in join operation in Spark SQL engine it will not
> detect such table for enabling broadcast hash join optimization.
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
> select clause is not allowed
> SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
> (
> item_sk,
> review_sentence,
> sentiment,
> sentiment_word
> )
> FROM product_reviews pr,
> (
> --store returns in week ending given date
> SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
> FROM store_returns sr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) sr_dateFilter
> WHERE sr.sr_returned_date_sk = d_date_sk
> GROUP BY sr_item_sk --across all store and web channels
> HAVING sr_item_qty > 0
> ) fsr,
> (
> --web returns in week ending given date
> SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
> FROM web_returns wr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) wr_dateFilter
> WHERE wr.wr_returned_date_sk = d_date_sk
> GROUP BY wr_item_sk --across all store and web channels
> HAVING wr_item_qty > 0
> ) fwr
> WHERE fsr.sr_item_sk = fwr.wr_item_sk
> AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
> -- equivalent across all store and web channels (within a tolerance of +/-
> 10%)
> AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
> Physical Plan:
> {code}
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation bigbench_3t_sparksql,
> q19_spark_sql_run_query_0_result, None, Map(), false, false
> +- ConvertToSafe
> +- Sort [item_sk#537L ASC,review_sentence#538 ASC,sentiment#539
> ASC,sentiment_word#540 ASC], true, 0
> +- ConvertToUnsafe
> +- Exchange rangepartitioning(item_sk#537L ASC,review_sentence#538
> ASC,sentiment#539 ASC,sentiment_word#540 ASC,200), None
> +- ConvertToSafe
> +- Project
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
> +- Filter (sentiment#539 = NEG)
> +- !Generate
> HiveGenericUDTF#io.bigdatabenchmark.v1.queries.q10.SentimentUDF(pr_item_sk#363L,pr_review_content#366),
> false, false,
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
> +- ConvertToSafe
> +- Project [pr_item_sk#363L,pr_review_content#366]
> +- Filter (abs((cast((sr_item_qty#356L -
> wr_item_qty#357L) as double) / (cast((sr_item_qty#356L + wr_item_qty#357L) as
> double) / 2.0))) <= 0.1)
> +- SortMergeJoin [sr_item_sk#369L],
> [wr_item_sk#445L]
> :- Sort [sr_item_sk#369L ASC], false, 0
> : +- Project
> [pr_item_sk#363L,sr_item_qty#356L,pr_review_content#366,sr_item_sk#369L]
> : +- SortMergeJoin [pr_item_sk#363L],
> [sr_item_sk#369L]
> : :- Sort [pr_item_sk#363L ASC],
> false, 0
> : : +- TungstenExchange
> hashpartitioning(pr_item_sk#363L,200), None
> : : +- ConvertToUnsafe
> : : +- HiveTableScan
> [pr_item_sk#363L,pr_review_content#366], MetastoreRelation
> bigbench_3t_sparksql, product_reviews, Some(pr)
> : +- Sort [sr_item_sk#369L ASC],
> false, 0
> : +- Filter (sr_item_qty#356L >
> 0)
> : +-
> TungstenAggregate(key=[sr_item_sk#369L],
> functions=[(sum(cast(sr_return_quantity#377 as
> bigint)),mode=Final,isDistinct=false)],
> output=[sr_item_sk#369L,sr_item_qty#356L])
> : +- TungstenExchange
> hashpartitioning(sr_item_sk#369L,200), None
> : +-
> TungstenAggregate(key=[sr_item_sk#369L],
> functions=[(sum(cast(sr_return_quantity#377 as
> bigint)),mode=Partial,isDistinct=false)], output=[sr_item_sk#369L,sum#551L])
> : +- Project
> [sr_item_sk#369L,sr_return_quantity#377]
> : +-
> SortMergeJoin [sr_returned_date_sk#367L], [d_date_sk#387L]
> : :- Sort
> [sr_returned_date_sk#367L ASC], false, 0
> : : +-
> TungstenExchange hashpartitioning(sr_returned_date_sk#367L,200), None
> : : +-
> ConvertToUnsafe
> : : +-
> HiveTableScan
> [sr_item_sk#369L,sr_return_quantity#377,sr_returned_date_sk#367L],
> MetastoreRelation bigbench_3t_sparksql, store_returns, Some(sr)
> : +- Sort
> [d_date_sk#387L ASC], false, 0
> : +-
> TungstenExchange hashpartitioning(d_date_sk#387L,200), None
> : +-
> Project [d_date_sk#387L]
> : +-
> BroadcastHashJoin [d_week_seq#391], [d_week_seq#419], BuildRight
> :
> :- ConvertToUnsafe
> :
> : +- HiveTableScan [d_date_sk#387L,d_week_seq#391], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d1)
> :
> +- Project [d_week_seq#419]
> :
> +- Filter d_date#417 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
> :
> +- HiveTableScan [d_week_seq#419,d_date#417], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d2)
> +- Sort [wr_item_sk#445L ASC], false, 0
> +- Filter (wr_item_qty#357L > 0)
> +-
> TungstenAggregate(key=[wr_item_sk#445L],
> functions=[(sum(cast(wr_return_quantity#457 as
> bigint)),mode=Final,isDistinct=false)],
> output=[wr_item_sk#445L,wr_item_qty#357L])
> +- TungstenExchange
> hashpartitioning(wr_item_sk#445L,200), None
> +-
> TungstenAggregate(key=[wr_item_sk#445L],
> functions=[(sum(cast(wr_return_quantity#457 as
> bigint)),mode=Partial,isDistinct=false)], output=[wr_item_sk#445L,sum#554L])
> +- Project
> [wr_item_sk#445L,wr_return_quantity#457]
> +- SortMergeJoin
> [wr_returned_date_sk#443L], [d_date_sk#467L]
> :- Sort
> [wr_returned_date_sk#443L ASC], false, 0
> : +-
> TungstenExchange hashpartitioning(wr_returned_date_sk#443L,200), None
> : +-
> ConvertToUnsafe
> : +-
> HiveTableScan
> [wr_item_sk#445L,wr_return_quantity#457,wr_returned_date_sk#443L],
> MetastoreRelation bigbench_3t_sparksql, web_returns, Some(wr)
> +- Sort
> [d_date_sk#467L ASC], false, 0
> +-
> TungstenExchange hashpartitioning(d_date_sk#467L,200), None
> +- Project
> [d_date_sk#467L]
> +-
> BroadcastHashJoin [d_week_seq#471], [d_week_seq#499], BuildRight
> :-
> ConvertToUnsafe
> : +-
> HiveTableScan [d_date_sk#467L,d_week_seq#471], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d1)
> +-
> Project [d_week_seq#499]
> +-
> Filter d_date#497 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
> +-
> HiveTableScan [d_week_seq#499,d_date#497], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d2)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]