[
https://issues.apache.org/jira/browse/SPARK-14820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256826#comment-15256826
]
Ali Tootoonchian edited comment on SPARK-14820 at 4/26/16 4:18 PM:
-------------------------------------------------------------------
I've listed the optimization is done with PushPerdicateThroughJoin for that
specific query.
{noformat}
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) ||
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <=
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) &&
(p_size#15 <= 15)))))
newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) &&
(l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int)
>= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 =
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) &&
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <=
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) &&
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))
newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG)
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as
int) >= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
{noformat}
As I explained in pdf file this optimization can go further and push more
filter command before join starts. I'm going to send PR for this issue shortly.
My plan is improve and add intelligent to PushPredicateThroughJoin.
was (Author: ali tootoonchian):
I've listed the optimization is done with PushPerdicateThroughJoin for that
specific query.
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) ||
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <=
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) &&
(p_size#15 <= 15)))))
newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS
l_shipmode#51]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) &&
(l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int)
>= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 =
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) &&
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <=
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) &&
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))
newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG)
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as
int) >= 1))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- Project
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS
l_partkey#38,cast(l_quantity#24 as float) AS
l_quantity#41,cast(l_extendedprice#25 as float) AS
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
+- LogicalRDD
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as
decimal(20,0)) as int) >= 1)))
+- LogicalRDD
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) &&
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0))
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23)
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >=
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 =
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
As I explained in pdf file this optimization can go further and push more
filter command before join starts. I'm going to send PR for this issue shortly.
My plan is improve and add intelligent to PushPredicateThroughJoin.
> Reduce shuffle data by pushing filter toward storage
> ----------------------------------------------------
>
> Key: SPARK-14820
> URL: https://issues.apache.org/jira/browse/SPARK-14820
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Ali Tootoonchian
> Priority: Trivial
> Attachments: Reduce Shuffle Data by pushing filter toward storage.pdf
>
>
> SQL query planner can have intelligence to push down filter commands towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound.
> Proven analysis and example is attached.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]