[
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yi Zhou updated SPARK-15219:
----------------------------
Description:
We observed an interesting thing about broadcast Hash join( similar to Map Join
in Hive) when comparing the implementation by Hive on MR engine. The blew query
is a multi-way join operation based on 3 tables including product_reviews, 2
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and
also there is a two-way join(1 table and 1 run-time temporary table) in both
'fsr' and 'fwr',which cause slower performance than Hive on MR. We investigated
the difference between Spark SQL and Hive on MR engine and found that there are
total of 5 map join tasks with tuned map join parameters in Hive on MR but
there are only 2 broadcast hash join tasks in Spark SQL even if we set a larger
threshold(e.g.,1GB) for broadcast hash join. From our investigation, it seems
that if there is run-time temporary table in join operation in Spark SQL engine
it will not detect such table for enabling broadcast hash join optimization.
Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
select clause is not allowed
SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
(
item_sk,
review_sentence,
sentiment,
sentiment_word
)
FROM product_reviews pr,
(
--store returns in week ending given date
SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
FROM store_returns sr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) sr_dateFilter
WHERE sr.sr_returned_date_sk = d_date_sk
GROUP BY sr_item_sk --across all store and web channels
HAVING sr_item_qty > 0
) fsr,
(
--web returns in week ending given date
SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
FROM web_returns wr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) wr_dateFilter
WHERE wr.wr_returned_date_sk = d_date_sk
GROUP BY wr_item_sk --across all store and web channels
HAVING wr_item_qty > 0
) fwr
WHERE fsr.sr_item_sk = fwr.wr_item_sk
AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
-- equivalent across all store and web channels (within a tolerance of +/-
10%)
AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}
was:
We observed an interesting thing about broadcast Hash join( similar to Map Join
in Hive) when comparing the implementation by Hive on MR engine. The blew query
is a multi-way join operation based on 3 tables including product_reviews, 2
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and
also there is a two-way join(1 table and 1 run-time temporary table) in both
'fsr' and 'fwr'. We investigated the difference between Spark SQL and Hive on
MR engine and found that there are total of 5 map join tasks with tuned map
join parameters in Hive on MR but there are only 2 broadcast hash join tasks in
Spark SQL even if we set a larger threshold(e.g.,1GB) for broadcast hash join.
From our investigation, it seems that if there is run-time temporary table in
join operation in Spark SQL engine it will not detect such table for enabling
broadcast hash join optimization.
Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
select clause is not allowed
SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
(
item_sk,
review_sentence,
sentiment,
sentiment_word
)
FROM product_reviews pr,
(
--store returns in week ending given date
SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
FROM store_returns sr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) sr_dateFilter
WHERE sr.sr_returned_date_sk = d_date_sk
GROUP BY sr_item_sk --across all store and web channels
HAVING sr_item_qty > 0
) fsr,
(
--web returns in week ending given date
SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
FROM web_returns wr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) wr_dateFilter
WHERE wr.wr_returned_date_sk = d_date_sk
GROUP BY wr_item_sk --across all store and web channels
HAVING wr_item_qty > 0
) fwr
WHERE fsr.sr_item_sk = fwr.wr_item_sk
AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
-- equivalent across all store and web channels (within a tolerance of +/-
10%)
AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}
> [Spark SQL] it don't support to detect runtime temporary table for enabling
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-15219
> URL: https://issues.apache.org/jira/browse/SPARK-15219
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map
> Join in Hive) when comparing the implementation by Hive on MR engine. The
> blew query is a multi-way join operation based on 3 tables including
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from
> ‘select’ query operation and also there is a two-way join(1 table and 1
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower
> performance than Hive on MR. We investigated the difference between Spark SQL
> and Hive on MR engine and found that there are total of 5 map join tasks with
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for
> broadcast hash join. From our investigation, it seems that if there is
> run-time temporary table in join operation in Spark SQL engine it will not
> detect such table for enabling broadcast hash join optimization.
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
> select clause is not allowed
> SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
> (
> item_sk,
> review_sentence,
> sentiment,
> sentiment_word
> )
> FROM product_reviews pr,
> (
> --store returns in week ending given date
> SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
> FROM store_returns sr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) sr_dateFilter
> WHERE sr.sr_returned_date_sk = d_date_sk
> GROUP BY sr_item_sk --across all store and web channels
> HAVING sr_item_qty > 0
> ) fsr,
> (
> --web returns in week ending given date
> SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
> FROM web_returns wr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) wr_dateFilter
> WHERE wr.wr_returned_date_sk = d_date_sk
> GROUP BY wr_item_sk --across all store and web channels
> HAVING wr_item_qty > 0
> ) fwr
> WHERE fsr.sr_item_sk = fwr.wr_item_sk
> AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
> -- equivalent across all store and web channels (within a tolerance of +/-
> 10%)
> AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]