[
https://issues.apache.org/jira/browse/SPARK-14820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256826#comment-15256826
]
Ali Tootoonchian commented on SPARK-14820:
------------------------------------------
I've listed the optimization is done with PushPerdicateThroughJoin for that
specific query.
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) ||
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <=
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) &&
(p_size#15 <= 15)))))
newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) &&
(l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int)
>= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 =
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) &&
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <=
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) &&
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))
newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG)
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as
int) >= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
As I explained in pdf file this optimization can go further and push more
filter command before join starts. I'm going to send PR for this issue shortly.
My plan is improve and add intelligent to PushPredicateThroughJoin.
> Reduce shuffle data by pushing filter toward storage
> ----------------------------------------------------
>
> Key: SPARK-14820
> URL: https://issues.apache.org/jira/browse/SPARK-14820
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Ali Tootoonchian
> Priority: Trivial
> Attachments: Reduce Shuffle Data by pushing filter toward storage.pdf
>
>
> SQL query planner can have intelligence to push down filter commands towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound.
> Proven analysis and example is attached.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]