[
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yi Zhou updated SPARK-15219:
----------------------------
Description:
We observed an interesting thing about broadcast Hash join( similar to Map Join
in Hive) when comparing the implementation by Hive on MR engine. The blew query
is a multi-way join operation based on 3 tables including product_reviews, 2
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and
also there is a two-way join(1 table and 1 run-time temporary table) in both
'fsr' and 'fwr',which cause slower performance than Hive on MR. We investigated
the difference between Spark SQL and Hive on MR engine and found that there are
total of 5 map join tasks with tuned map join parameters in Hive on MR but
there are only 2 broadcast hash join tasks in Spark SQL even if we set a larger
threshold(e.g.,1GB) for broadcast hash join. From our investigation, it seems
that if there is run-time temporary table in join operation in Spark SQL engine
it will not detect such table for enabling broadcast hash join optimization.
Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
select clause is not allowed
SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
(
item_sk,
review_sentence,
sentiment,
sentiment_word
)
FROM product_reviews pr,
(
--store returns in week ending given date
SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
FROM store_returns sr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) sr_dateFilter
WHERE sr.sr_returned_date_sk = d_date_sk
GROUP BY sr_item_sk --across all store and web channels
HAVING sr_item_qty > 0
) fsr,
(
--web returns in week ending given date
SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
FROM web_returns wr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) wr_dateFilter
WHERE wr.wr_returned_date_sk = d_date_sk
GROUP BY wr_item_sk --across all store and web channels
HAVING wr_item_qty > 0
) fwr
WHERE fsr.sr_item_sk = fwr.wr_item_sk
AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
-- equivalent across all store and web channels (within a tolerance of +/-
10%)
AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}
Physical Plan:
{code}
== Physical Plan ==
InsertIntoHiveTable MetastoreRelation bigbench_3t_sparksql,
q19_spark_sql_run_query_0_result, None, Map(), false, false
+- ConvertToSafe
+- Sort [item_sk#537L ASC,review_sentence#538 ASC,sentiment#539
ASC,sentiment_word#540 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(item_sk#537L ASC,review_sentence#538
ASC,sentiment#539 ASC,sentiment_word#540 ASC,200), None
+- ConvertToSafe
+- Project
[item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
+- Filter (sentiment#539 = NEG)
+- !Generate
HiveGenericUDTF#io.bigdatabenchmark.v1.queries.q10.SentimentUDF(pr_item_sk#363L,pr_review_content#366),
false, false,
[item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
+- ConvertToSafe
+- Project [pr_item_sk#363L,pr_review_content#366]
+- Filter (abs((cast((sr_item_qty#356L -
wr_item_qty#357L) as double) / (cast((sr_item_qty#356L + wr_item_qty#357L) as
double) / 2.0))) <= 0.1)
+- SortMergeJoin [sr_item_sk#369L],
[wr_item_sk#445L]
:- Sort [sr_item_sk#369L ASC], false, 0
: +- Project
[pr_item_sk#363L,sr_item_qty#356L,pr_review_content#366,sr_item_sk#369L]
: +- SortMergeJoin [pr_item_sk#363L],
[sr_item_sk#369L]
: :- Sort [pr_item_sk#363L ASC],
false, 0
: : +- TungstenExchange
hashpartitioning(pr_item_sk#363L,200), None
: : +- ConvertToUnsafe
: : +- HiveTableScan
[pr_item_sk#363L,pr_review_content#366], MetastoreRelation
bigbench_3t_sparksql, product_reviews, Some(pr)
: +- Sort [sr_item_sk#369L ASC],
false, 0
: +- Filter (sr_item_qty#356L > 0)
: +-
TungstenAggregate(key=[sr_item_sk#369L],
functions=[(sum(cast(sr_return_quantity#377 as
bigint)),mode=Final,isDistinct=false)],
output=[sr_item_sk#369L,sr_item_qty#356L])
: +- TungstenExchange
hashpartitioning(sr_item_sk#369L,200), None
: +-
TungstenAggregate(key=[sr_item_sk#369L],
functions=[(sum(cast(sr_return_quantity#377 as
bigint)),mode=Partial,isDistinct=false)], output=[sr_item_sk#369L,sum#551L])
: +- Project
[sr_item_sk#369L,sr_return_quantity#377]
: +- SortMergeJoin
[sr_returned_date_sk#367L], [d_date_sk#387L]
: :- Sort
[sr_returned_date_sk#367L ASC], false, 0
: : +-
TungstenExchange hashpartitioning(sr_returned_date_sk#367L,200), None
: : +-
ConvertToUnsafe
: : +-
HiveTableScan
[sr_item_sk#369L,sr_return_quantity#377,sr_returned_date_sk#367L],
MetastoreRelation bigbench_3t_sparksql, store_returns, Some(sr)
: +- Sort
[d_date_sk#387L ASC], false, 0
: +-
TungstenExchange hashpartitioning(d_date_sk#387L,200), None
: +-
Project [d_date_sk#387L]
: +-
BroadcastHashJoin [d_week_seq#391], [d_week_seq#419], BuildRight
:
:- ConvertToUnsafe
: :
+- HiveTableScan [d_date_sk#387L,d_week_seq#391], MetastoreRelation
bigbench_3t_sparksql, date_dim, Some(d1)
:
+- Project [d_week_seq#419]
:
+- Filter d_date#417 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
:
+- HiveTableScan [d_week_seq#419,d_date#417], MetastoreRelation
bigbench_3t_sparksql, date_dim, Some(d2)
+- Sort [wr_item_sk#445L ASC], false, 0
+- Filter (wr_item_qty#357L > 0)
+-
TungstenAggregate(key=[wr_item_sk#445L],
functions=[(sum(cast(wr_return_quantity#457 as
bigint)),mode=Final,isDistinct=false)],
output=[wr_item_sk#445L,wr_item_qty#357L])
+- TungstenExchange
hashpartitioning(wr_item_sk#445L,200), None
+-
TungstenAggregate(key=[wr_item_sk#445L],
functions=[(sum(cast(wr_return_quantity#457 as
bigint)),mode=Partial,isDistinct=false)], output=[wr_item_sk#445L,sum#554L])
+- Project
[wr_item_sk#445L,wr_return_quantity#457]
+- SortMergeJoin
[wr_returned_date_sk#443L], [d_date_sk#467L]
:- Sort
[wr_returned_date_sk#443L ASC], false, 0
: +- TungstenExchange
hashpartitioning(wr_returned_date_sk#443L,200), None
: +-
ConvertToUnsafe
: +-
HiveTableScan
[wr_item_sk#445L,wr_return_quantity#457,wr_returned_date_sk#443L],
MetastoreRelation bigbench_3t_sparksql, web_returns, Some(wr)
+- Sort
[d_date_sk#467L ASC], false, 0
+- TungstenExchange
hashpartitioning(d_date_sk#467L,200), None
+- Project
[d_date_sk#467L]
+-
BroadcastHashJoin [d_week_seq#471], [d_week_seq#499], BuildRight
:-
ConvertToUnsafe
: +-
HiveTableScan [d_date_sk#467L,d_week_seq#471], MetastoreRelation
bigbench_3t_sparksql, date_dim, Some(d1)
+- Project
[d_week_seq#499]
+-
Filter d_date#497 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
+-
HiveTableScan [d_week_seq#499,d_date#497], MetastoreRelation
bigbench_3t_sparksql, date_dim, Some(d2)
{code}
was:
We observed an interesting thing about broadcast Hash join( similar to Map Join
in Hive) when comparing the implementation by Hive on MR engine. The blew query
is a multi-way join operation based on 3 tables including product_reviews, 2
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and
also there is a two-way join(1 table and 1 run-time temporary table) in both
'fsr' and 'fwr',which cause slower performance than Hive on MR. We investigated
the difference between Spark SQL and Hive on MR engine and found that there are
total of 5 map join tasks with tuned map join parameters in Hive on MR but
there are only 2 broadcast hash join tasks in Spark SQL even if we set a larger
threshold(e.g.,1GB) for broadcast hash join. From our investigation, it seems
that if there is run-time temporary table in join operation in Spark SQL engine
it will not detect such table for enabling broadcast hash join optimization.
Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
select clause is not allowed
SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
(
item_sk,
review_sentence,
sentiment,
sentiment_word
)
FROM product_reviews pr,
(
--store returns in week ending given date
SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
FROM store_returns sr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) sr_dateFilter
WHERE sr.sr_returned_date_sk = d_date_sk
GROUP BY sr_item_sk --across all store and web channels
HAVING sr_item_qty > 0
) fsr,
(
--web returns in week ending given date
SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
FROM web_returns wr,
(
-- within the week ending a given date
SELECT d1.d_date_sk
FROM date_dim d1, date_dim d2
WHERE d1.d_week_seq = d2.d_week_seq
AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
) wr_dateFilter
WHERE wr.wr_returned_date_sk = d_date_sk
GROUP BY wr_item_sk --across all store and web channels
HAVING wr_item_qty > 0
) fwr
WHERE fsr.sr_item_sk = fwr.wr_item_sk
AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
-- equivalent across all store and web channels (within a tolerance of +/-
10%)
AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}
> [Spark SQL] it don't support to detect runtime temporary table for enabling
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-15219
> URL: https://issues.apache.org/jira/browse/SPARK-15219
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map
> Join in Hive) when comparing the implementation by Hive on MR engine. The
> blew query is a multi-way join operation based on 3 tables including
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from
> ‘select’ query operation and also there is a two-way join(1 table and 1
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower
> performance than Hive on MR. We investigated the difference between Spark SQL
> and Hive on MR engine and found that there are total of 5 map join tasks with
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for
> broadcast hash join. From our investigation, it seems that if there is
> run-time temporary table in join operation in Spark SQL engine it will not
> detect such table for enabling broadcast hash join optimization.
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in
> select clause is not allowed
> SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
> (
> item_sk,
> review_sentence,
> sentiment,
> sentiment_word
> )
> FROM product_reviews pr,
> (
> --store returns in week ending given date
> SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
> FROM store_returns sr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) sr_dateFilter
> WHERE sr.sr_returned_date_sk = d_date_sk
> GROUP BY sr_item_sk --across all store and web channels
> HAVING sr_item_qty > 0
> ) fsr,
> (
> --web returns in week ending given date
> SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
> FROM web_returns wr,
> (
> -- within the week ending a given date
> SELECT d1.d_date_sk
> FROM date_dim d1, date_dim d2
> WHERE d1.d_week_seq = d2.d_week_seq
> AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15',
> '2004-12-20' )
> ) wr_dateFilter
> WHERE wr.wr_returned_date_sk = d_date_sk
> GROUP BY wr_item_sk --across all store and web channels
> HAVING wr_item_qty > 0
> ) fwr
> WHERE fsr.sr_item_sk = fwr.wr_item_sk
> AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
> -- equivalent across all store and web channels (within a tolerance of +/-
> 10%)
> AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}
> Physical Plan:
> {code}
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation bigbench_3t_sparksql,
> q19_spark_sql_run_query_0_result, None, Map(), false, false
> +- ConvertToSafe
> +- Sort [item_sk#537L ASC,review_sentence#538 ASC,sentiment#539
> ASC,sentiment_word#540 ASC], true, 0
> +- ConvertToUnsafe
> +- Exchange rangepartitioning(item_sk#537L ASC,review_sentence#538
> ASC,sentiment#539 ASC,sentiment_word#540 ASC,200), None
> +- ConvertToSafe
> +- Project
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
> +- Filter (sentiment#539 = NEG)
> +- !Generate
> HiveGenericUDTF#io.bigdatabenchmark.v1.queries.q10.SentimentUDF(pr_item_sk#363L,pr_review_content#366),
> false, false,
> [item_sk#537L,review_sentence#538,sentiment#539,sentiment_word#540]
> +- ConvertToSafe
> +- Project [pr_item_sk#363L,pr_review_content#366]
> +- Filter (abs((cast((sr_item_qty#356L -
> wr_item_qty#357L) as double) / (cast((sr_item_qty#356L + wr_item_qty#357L) as
> double) / 2.0))) <= 0.1)
> +- SortMergeJoin [sr_item_sk#369L],
> [wr_item_sk#445L]
> :- Sort [sr_item_sk#369L ASC], false, 0
> : +- Project
> [pr_item_sk#363L,sr_item_qty#356L,pr_review_content#366,sr_item_sk#369L]
> : +- SortMergeJoin [pr_item_sk#363L],
> [sr_item_sk#369L]
> : :- Sort [pr_item_sk#363L ASC],
> false, 0
> : : +- TungstenExchange
> hashpartitioning(pr_item_sk#363L,200), None
> : : +- ConvertToUnsafe
> : : +- HiveTableScan
> [pr_item_sk#363L,pr_review_content#366], MetastoreRelation
> bigbench_3t_sparksql, product_reviews, Some(pr)
> : +- Sort [sr_item_sk#369L ASC],
> false, 0
> : +- Filter (sr_item_qty#356L >
> 0)
> : +-
> TungstenAggregate(key=[sr_item_sk#369L],
> functions=[(sum(cast(sr_return_quantity#377 as
> bigint)),mode=Final,isDistinct=false)],
> output=[sr_item_sk#369L,sr_item_qty#356L])
> : +- TungstenExchange
> hashpartitioning(sr_item_sk#369L,200), None
> : +-
> TungstenAggregate(key=[sr_item_sk#369L],
> functions=[(sum(cast(sr_return_quantity#377 as
> bigint)),mode=Partial,isDistinct=false)], output=[sr_item_sk#369L,sum#551L])
> : +- Project
> [sr_item_sk#369L,sr_return_quantity#377]
> : +-
> SortMergeJoin [sr_returned_date_sk#367L], [d_date_sk#387L]
> : :- Sort
> [sr_returned_date_sk#367L ASC], false, 0
> : : +-
> TungstenExchange hashpartitioning(sr_returned_date_sk#367L,200), None
> : : +-
> ConvertToUnsafe
> : : +-
> HiveTableScan
> [sr_item_sk#369L,sr_return_quantity#377,sr_returned_date_sk#367L],
> MetastoreRelation bigbench_3t_sparksql, store_returns, Some(sr)
> : +- Sort
> [d_date_sk#387L ASC], false, 0
> : +-
> TungstenExchange hashpartitioning(d_date_sk#387L,200), None
> : +-
> Project [d_date_sk#387L]
> : +-
> BroadcastHashJoin [d_week_seq#391], [d_week_seq#419], BuildRight
> :
> :- ConvertToUnsafe
> :
> : +- HiveTableScan [d_date_sk#387L,d_week_seq#391], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d1)
> :
> +- Project [d_week_seq#419]
> :
> +- Filter d_date#417 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
> :
> +- HiveTableScan [d_week_seq#419,d_date#417], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d2)
> +- Sort [wr_item_sk#445L ASC], false, 0
> +- Filter (wr_item_qty#357L > 0)
> +-
> TungstenAggregate(key=[wr_item_sk#445L],
> functions=[(sum(cast(wr_return_quantity#457 as
> bigint)),mode=Final,isDistinct=false)],
> output=[wr_item_sk#445L,wr_item_qty#357L])
> +- TungstenExchange
> hashpartitioning(wr_item_sk#445L,200), None
> +-
> TungstenAggregate(key=[wr_item_sk#445L],
> functions=[(sum(cast(wr_return_quantity#457 as
> bigint)),mode=Partial,isDistinct=false)], output=[wr_item_sk#445L,sum#554L])
> +- Project
> [wr_item_sk#445L,wr_return_quantity#457]
> +- SortMergeJoin
> [wr_returned_date_sk#443L], [d_date_sk#467L]
> :- Sort
> [wr_returned_date_sk#443L ASC], false, 0
> : +-
> TungstenExchange hashpartitioning(wr_returned_date_sk#443L,200), None
> : +-
> ConvertToUnsafe
> : +-
> HiveTableScan
> [wr_item_sk#445L,wr_return_quantity#457,wr_returned_date_sk#443L],
> MetastoreRelation bigbench_3t_sparksql, web_returns, Some(wr)
> +- Sort
> [d_date_sk#467L ASC], false, 0
> +-
> TungstenExchange hashpartitioning(d_date_sk#467L,200), None
> +- Project
> [d_date_sk#467L]
> +-
> BroadcastHashJoin [d_week_seq#471], [d_week_seq#499], BuildRight
> :-
> ConvertToUnsafe
> : +-
> HiveTableScan [d_date_sk#467L,d_week_seq#471], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d1)
> +-
> Project [d_week_seq#499]
> +-
> Filter d_date#497 IN (2004-03-8,2004-08-02,2004-11-15,2004-12-20)
> +-
> HiveTableScan [d_week_seq#499,d_date#497], MetastoreRelation
> bigbench_3t_sparksql, date_dim, Some(d2)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]