Ihor Bobak created SPARK-8081:
---------------------------------
Summary: Problems with Optimized Logical Plan Generation
Key: SPARK-8081
URL: https://issues.apache.org/jira/browse/SPARK-8081
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.3.1, 1.2.2
Reporter: Ihor Bobak
You can reproduce the issue on this database:
https://drive.google.com/file/d/0B3DMXMfcPWF3UEtsTjJ3aGMzOEE/view
This is a Foodmart script for mysql, and a script for importing this database
to Hive.
The problem is next. When we run the following query on Spark SQL:
SELECT `time_by_day`.`the_year` `c0`
,`product_class`.`product_family` `c1`
,SUM(`sales_fact_1997`.`unit_sales`) `m0`
,SUM(`sales_fact_1997`.`store_cost`) `m1`
,SUM(`sales_fact_1997`.`store_sales`) `m2`
,COUNT(`sales_fact_1997`.`product_id`) `m3`
,COUNT(DISTINCT `sales_fact_1997`.`customer_id`) `m4`
,SUM((
CASE
WHEN `sales_fact_1997`.`promotion_id` = 0
THEN 0
ELSE `sales_fact_1997`.`store_sales`
END
)) `m5`
FROM `foodmart`.`time_by_day` `time_by_day`
CROSS JOIN `foodmart`.`sales_fact_1997` `sales_fact_1997`
CROSS JOIN `foodmart`.`product` `product`
CROSS JOIN `foodmart`.`product_class` `product_class`
WHERE
`sales_fact_1997`.`time_id` = `time_by_day`.`time_id`
AND `sales_fact_1997`.`product_id` =
`product`.`product_id`
AND `product`.`product_class_id` =
`product_class`.`product_class_id`
GROUP BY `time_by_day`.`the_year`
,`product_class`.`product_family`
the plan will be OK:
== Optimized Logical Plan ==
Aggregate [the_year#51,product_family#84], [the_year#51 AS
c0#0,product_family#84 AS c1#1,SUM(unit_sales#64) AS m0#2,SUM(store_cost#63) AS
m1#3,SUM(store_sales#62) AS m2#4,COUNT(product_id#57) AS m3#5L,COUNT(DISTINCT
customer_id#59) AS m4#6L,SUM(CASE WHEN (promotion_id#60 = 0) THEN 0.0 ELSE
store_sales#62) AS m5#7]
Project
[store_cost#63,the_year#51,store_sales#62,product_family#84,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
Join Inner, Some((product_class_id#65 = product_class_id#80))
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_class_id#65,product_id#57,promotion_id#60]
Join Inner, Some((product_id#57 = product_id#66))
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
Join Inner, Some((time_id#58 = time_id#47))
Project [time_id#47,the_year#51]
MetastoreRelation foodmart, time_by_day, Some(time_by_day)
Project
[store_cost#63,store_sales#62,unit_sales#64,customer_id#59,product_id#57,time_id#58,promotion_id#60]
MetastoreRelation foodmart, sales_fact_1997, Some(sales_fact_1997)
Project [product_id#66,product_class_id#65]
MetastoreRelation foodmart, product, Some(product)
Project [product_family#84,product_class_id#80]
MetastoreRelation foodmart, product_class, Some(product_class)
== Physical Plan ==
Aggregate false, [the_year#51,product_family#84], [the_year#51 AS
c0#0,product_family#84 AS c1#1,SUM(PartialSum#91) AS m0#2,SUM(PartialSum#92) AS
m1#3,SUM(PartialSum#93) AS m2#4,Coalesce(SUM(PartialCount#94L),0) AS
m3#5L,CombineAndCount(partialSets#95) AS m4#6L,SUM(PartialSum#96) AS m5#7]
Exchange (HashPartitioning [the_year#51,product_family#84], 200)
Aggregate true, [the_year#51,product_family#84],
[the_year#51,product_family#84,SUM(store_cost#63) AS
PartialSum#92,AddToHashSet(customer_id#59) AS
partialSets#95,SUM(store_sales#62) AS PartialSum#93,SUM(CASE WHEN
(promotion_id#60 = 0) THEN 0.0 ELSE store_sales#62) AS
PartialSum#96,SUM(unit_sales#64) AS PartialSum#91,COUNT(product_id#57) AS
PartialCount#94L]
Project
[store_cost#63,the_year#51,store_sales#62,product_family#84,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
BroadcastHashJoin [product_class_id#65], [product_class_id#80], BuildRight
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_class_id#65,product_id#57,promotion_id#60]
BroadcastHashJoin [product_id#57], [product_id#66], BuildRight
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
BroadcastHashJoin [time_id#47], [time_id#58], BuildRight
HiveTableScan [time_id#47,the_year#51], (MetastoreRelation foodmart,
time_by_day, Some(time_by_day)), None
HiveTableScan
[store_cost#63,store_sales#62,unit_sales#64,customer_id#59,product_id#57,time_id#58,promotion_id#60],
(MetastoreRelation foodmart, sales_fact_1997, Some(sales_fact_1997)), None
HiveTableScan [product_id#66,product_class_id#65], (MetastoreRelation
foodmart, product, Some(product)), None
HiveTableScan [product_family#84,product_class_id#80], (MetastoreRelation
foodmart, product_class, Some(product_class)), None
But as soon as we run the same query, but with order of tables product and
product_class changed:
SELECT `time_by_day`.`the_year` `c0`
,`product_class`.`product_family` `c1`
,SUM(`sales_fact_1997`.`unit_sales`) `m0`
,SUM(`sales_fact_1997`.`store_cost`) `m1`
,SUM(`sales_fact_1997`.`store_sales`) `m2`
,COUNT(`sales_fact_1997`.`product_id`) `m3`
,COUNT(DISTINCT `sales_fact_1997`.`customer_id`) `m4`
,SUM((
CASE
WHEN `sales_fact_1997`.`promotion_id` = 0
THEN 0
ELSE `sales_fact_1997`.`store_sales`
END
)) `m5`
FROM `foodmart`.`time_by_day` `time_by_day`
CROSS JOIN `foodmart`.`sales_fact_1997` `sales_fact_1997`
CROSS JOIN `foodmart`.`product_class` `product_class`
CROSS JOIN `foodmart`.`product` `product`
WHERE
`sales_fact_1997`.`time_id` = `time_by_day`.`time_id`
AND `sales_fact_1997`.`product_id` =
`product`.`product_id`
AND `product`.`product_class_id` =
`product_class`.`product_class_id`
GROUP BY `time_by_day`.`the_year`
,`product_class`.`product_family`
we will get this:
== Optimized Logical Plan ==
Aggregate [the_year#51,product_family#69], [the_year#51 AS
c0#0,product_family#69 AS c1#1,SUM(unit_sales#64) AS m0#2,SUM(store_cost#63) AS
m1#3,SUM(store_sales#62) AS m2#4,COUNT(product_id#57) AS m3#5L,COUNT(DISTINCT
customer_id#59) AS m4#6L,SUM(CASE WHEN (promotion_id#60 = 0) THEN 0.0 ELSE
store_sales#62) AS m5#7]
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,product_family#69,promotion_id#60]
Join Inner, Some(((product_id#57 = product_id#71) && (product_class_id#70 =
product_class_id#65)))
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_class_id#65,product_id#57,product_family#69,promotion_id#60]
Join Inner, None
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
Join Inner, Some((time_id#58 = time_id#47))
Project [time_id#47,the_year#51]
MetastoreRelation foodmart, time_by_day, Some(time_by_day)
Project
[store_cost#63,store_sales#62,unit_sales#64,customer_id#59,product_id#57,time_id#58,promotion_id#60]
MetastoreRelation foodmart, sales_fact_1997, Some(sales_fact_1997)
Project [product_family#69,product_class_id#65]
MetastoreRelation foodmart, product_class, Some(product_class)
Project [product_id#71,product_class_id#70]
MetastoreRelation foodmart, product, Some(product)
== Physical Plan ==
Aggregate false, [the_year#51,product_family#69], [the_year#51 AS
c0#0,product_family#69 AS c1#1,SUM(PartialSum#91) AS m0#2,SUM(PartialSum#92) AS
m1#3,SUM(PartialSum#93) AS m2#4,Coalesce(SUM(PartialCount#94L),0) AS
m3#5L,CombineAndCount(partialSets#95) AS m4#6L,SUM(PartialSum#96) AS m5#7]
Exchange (HashPartitioning [the_year#51,product_family#69], 200)
Aggregate true, [the_year#51,product_family#69],
[the_year#51,product_family#69,SUM(store_cost#63) AS
PartialSum#92,AddToHashSet(customer_id#59) AS
partialSets#95,SUM(store_sales#62) AS PartialSum#93,SUM(CASE WHEN
(promotion_id#60 = 0) THEN 0.0 ELSE store_sales#62) AS
PartialSum#96,SUM(unit_sales#64) AS PartialSum#91,COUNT(product_id#57) AS
PartialCount#94L]
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,product_family#69,promotion_id#60]
BroadcastHashJoin [product_id#57,product_class_id#65],
[product_id#71,product_class_id#70], BuildRight
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_class_id#65,product_id#57,product_family#69,promotion_id#60]
CartesianProduct
Project
[store_cost#63,the_year#51,store_sales#62,unit_sales#64,customer_id#59,product_id#57,promotion_id#60]
BroadcastHashJoin [time_id#47], [time_id#58], BuildRight
HiveTableScan [time_id#47,the_year#51], (MetastoreRelation foodmart,
time_by_day, Some(time_by_day)), None
HiveTableScan
[store_cost#63,store_sales#62,unit_sales#64,customer_id#59,product_id#57,time_id#58,promotion_id#60],
(MetastoreRelation foodmart, sales_fact_1997, Some(sales_fact_1997)), None
HiveTableScan [product_family#69,product_class_id#65],
(MetastoreRelation foodmart, product_class, Some(product_class)), None
HiveTableScan [product_id#71,product_class_id#70], (MetastoreRelation
foodmart, product, Some(product)), None
As you see, there is a cartesian product, which gives me "outofmemory"
exception even on such a small database as this one.
The query was generated by Mondrian, it is not human-generated. But I tested
the same query on MySQL - it works JUST FINE. And on Hive2 this query also
works fine, independently what is the order of the tables in the "from" clause.
Could you please fix this in the future Spark versions?
Thanks.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]