[ 
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276042#comment-15276042
 ] 

Herman van Hovell commented on SPARK-15219:
-------------------------------------------

[~jameszhouyi] Could you also post the query plan? Use either {{explain 
extended ...}} in SQL or {{df.explain(true)}} using dataframes.

> [Spark SQL] it don't support to detect runtime temporary table for enabling 
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15219
>                 URL: https://issues.apache.org/jira/browse/SPARK-15219
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map 
> Join in Hive) when comparing the implementation by Hive on MR engine. The 
> blew query is a multi-way join operation based on 3 tables including 
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from 
> ‘select’ query operation and also there is a two-way join(1 table and 1 
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower 
> performance than Hive on MR. We investigated the difference between Spark SQL 
> and Hive on MR engine and found that there are total of 5 map join tasks with 
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash 
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for 
> broadcast hash join. From our investigation, it seems that if there is 
> run-time temporary table in join operation in Spark SQL engine it will not 
> detect such table for enabling broadcast hash join optimization. 
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
> select clause is not allowed
>   SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
>   (
>     item_sk,
>     review_sentence,
>     sentiment,
>     sentiment_word
>   )
>   FROM product_reviews pr,
>   (
>     --store returns in week ending given date
>     SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
>     FROM store_returns sr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) sr_dateFilter
>     WHERE sr.sr_returned_date_sk = d_date_sk
>     GROUP BY sr_item_sk --across all store and web channels
>     HAVING sr_item_qty > 0
>   ) fsr,
>   (
>     --web returns in week ending given date
>     SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
>     FROM web_returns wr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) wr_dateFilter
>     WHERE wr.wr_returned_date_sk = d_date_sk
>     GROUP BY wr_item_sk  --across all store and web channels
>     HAVING wr_item_qty > 0
>   ) fwr
>   WHERE fsr.sr_item_sk = fwr.wr_item_sk
>   AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
>   -- equivalent across all store and web channels (within a tolerance of +/- 
> 10%)
>   AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to