[ 
https://issues.apache.org/jira/browse/SPARK-14820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256826#comment-15256826
 ] 

Ali Tootoonchian edited comment on SPARK-14820 at 4/26/16 4:18 PM:
-------------------------------------------------------------------

I've listed the optimization is done with PushPerdicateThroughJoin for that 
specific query.
{noformat}
left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
+- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN 
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13 
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || 
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED 
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG 
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && 
(p_size#15 <= 15)))))

newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN 
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) && 
(l_shipinstruct#33 = DELIVER IN PERSON)))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int) 
>= 1))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 = 
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM 
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && 
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED 
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16 
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && 
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))

newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
   +- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) 
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as 
int) >= 1))
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))
{noformat}
As I explained in pdf file this optimization can go further and push more 
filter command before join starts. I'm going to send PR for this issue shortly. 
My plan is improve and add intelligent to PushPredicateThroughJoin.


was (Author: ali tootoonchian):
I've listed the optimization is done with PushPerdicateThroughJoin for that 
specific query.

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
+- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN 
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13 
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || 
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED 
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG 
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && 
(p_size#15 <= 15)))))

newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN 
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) && 
(l_shipinstruct#33 = DELIVER IN PERSON)))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int) 
>= 1))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 = 
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM 
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && 
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED 
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16 
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && 
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))

newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
   +- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) 
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as 
int) >= 1))
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

As I explained in pdf file this optimization can go further and push more 
filter command before join starts. I'm going to send PR for this issue shortly. 
My plan is improve and add intelligent to PushPredicateThroughJoin.

> Reduce shuffle data by pushing filter toward storage
> ----------------------------------------------------
>
>                 Key: SPARK-14820
>                 URL: https://issues.apache.org/jira/browse/SPARK-14820
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Ali Tootoonchian
>            Priority: Trivial
>         Attachments: Reduce Shuffle Data by pushing filter toward storage.pdf
>
>
> SQL query planner can have intelligence to push down filter commands towards 
> the storage layer. If we optimize the query planner such that the IO to the 
> storage is reduced at the cost of running multiple filters (i.e., compute), 
> this should be desirable when the system is IO bound.
> Proven analysis and example is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to