[ https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276042#comment-15276042 ]
Herman van Hovell commented on SPARK-15219: ------------------------------------------- [~jameszhouyi] Could you also post the query plan? Use either {{explain extended ...}} in SQL or {{df.explain(true)}} using dataframes. > [Spark SQL] it don't support to detect runtime temporary table for enabling > broadcast hash join optimization > ------------------------------------------------------------------------------------------------------------ > > Key: SPARK-15219 > URL: https://issues.apache.org/jira/browse/SPARK-15219 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Yi Zhou > > We observed an interesting thing about broadcast Hash join( similar to Map > Join in Hive) when comparing the implementation by Hive on MR engine. The > blew query is a multi-way join operation based on 3 tables including > product_reviews, 2 run-time temporary result tables(fsr and fwr) from > ‘select’ query operation and also there is a two-way join(1 table and 1 > run-time temporary table) in both 'fsr' and 'fwr',which cause slower > performance than Hive on MR. We investigated the difference between Spark SQL > and Hive on MR engine and found that there are total of 5 map join tasks with > tuned map join parameters in Hive on MR but there are only 2 broadcast hash > join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for > broadcast hash join. From our investigation, it seems that if there is > run-time temporary table in join operation in Spark SQL engine it will not > detect such table for enabling broadcast hash join optimization. > Core SQL snippet: > {code} > INSERT INTO TABLE q19_spark_sql_power_test_0_result > SELECT * > FROM > ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in > select clause is not allowed > SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS > ( > item_sk, > review_sentence, > sentiment, > sentiment_word > ) > FROM product_reviews pr, > ( > --store returns in week ending given date > SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty > FROM store_returns sr, > ( > -- within the week ending a given date > SELECT d1.d_date_sk > FROM date_dim d1, date_dim d2 > WHERE d1.d_week_seq = d2.d_week_seq > AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', > '2004-12-20' ) > ) sr_dateFilter > WHERE sr.sr_returned_date_sk = d_date_sk > GROUP BY sr_item_sk --across all store and web channels > HAVING sr_item_qty > 0 > ) fsr, > ( > --web returns in week ending given date > SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty > FROM web_returns wr, > ( > -- within the week ending a given date > SELECT d1.d_date_sk > FROM date_dim d1, date_dim d2 > WHERE d1.d_week_seq = d2.d_week_seq > AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', > '2004-12-20' ) > ) wr_dateFilter > WHERE wr.wr_returned_date_sk = d_date_sk > GROUP BY wr_item_sk --across all store and web channels > HAVING wr_item_qty > 0 > ) fwr > WHERE fsr.sr_item_sk = fwr.wr_item_sk > AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items > -- equivalent across all store and web channels (within a tolerance of +/- > 10%) > AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1 > )extractedSentiments > WHERE sentiment= 'NEG' --if there are any major negative reviews. > ORDER BY item_sk,review_sentence,sentiment,sentiment_word > ; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org