[
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276042#comment-15276042
]
Herman van Hovell commented on SPARK-15219:
-------------------------------------------
[~jameszhouyi] Could you also post the query plan? Use either {{explain
extended ...}} in SQL or {{df.explain(true)}} using dataframes.
> [Spark SQL] it don't support to detect runtime temporary table for enabling
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-15219
> URL: https://issues.apache.org/jira/browse/SPARK-15219
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map
> Join in Hive) when comparing the implementation by Hive on MR engine. The
> blew query is a multi-way join operation based on 3 tables including
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from
> ‘select’ query operation and also there is a two-way join(1 table and 1
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower
> performance than Hive on MR. We investigated the difference between Spark SQL
> and Hive on MR engine and found that there are total of 5 map join tasks with
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for
> broadcast hash join. From our investigation, it seems that if there is
> run-time temporary table in join operation in Spark SQL engine it will not
> detect such table for enabling broadcast hash join optimization.
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
> select clause is not allowed
> SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
> (
> item_sk,
> review_sentence,
> sentiment,
> sentiment_word
> )
> FROM product_reviews pr,
> (
> --store returns in week ending given date
> SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
> FROM store_returns sr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) sr_dateFilter
> WHERE sr.sr_returned_date_sk = d_date_sk
> GROUP BY sr_item_sk --across all store and web channels
> HAVING sr_item_qty > 0
> ) fsr,
> (
> --web returns in week ending given date
> SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
> FROM web_returns wr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) wr_dateFilter
> WHERE wr.wr_returned_date_sk = d_date_sk
> GROUP BY wr_item_sk --across all store and web channels
> HAVING wr_item_qty > 0
> ) fwr
> WHERE fsr.sr_item_sk = fwr.wr_item_sk
> AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
> -- equivalent across all store and web channels (within a tolerance of +/-
> 10%)
> AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]