[ 
https://issues.apache.org/jira/browse/SPARK-14820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256826#comment-15256826
 ] 

Ali Tootoonchian commented on SPARK-14820:
------------------------------------------

I've listed the optimization is done with PushPerdicateThroughJoin for that 
specific query.

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
+- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some((((((p_partkey#10 = l_partkey#38) && (p_size#15 >= 1)) && l_shipmode#51 IN 
(AIR,AIR REG)) && (l_shipinstruct#50 = DELIVER IN PERSON)) && (((((((p_brand#13 
= Brand#12) && p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
(l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || 
(((((p_brand#13 = Brand#23) && p_container#16 IN (MED BAG,MED BOX,MED PKG,MED 
PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 
10))) || (((((p_brand#13 = Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG 
PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && 
(p_size#15 <= 15)))))

newLeft:
Filter (l_shipmode#51 IN (AIR,AIR REG) && (l_shipinstruct#50 = DELIVER IN 
PERSON))
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS 
l_discount#43,l_shipinstruct#33 AS l_shipinstruct#50,l_shipmode#34 AS 
l_shipmode#51]
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter (p_size#15 >= 1)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) && 
(l_shipinstruct#33 = DELIVER IN PERSON)))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as int) 
>= 1))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((isnotnull(l_partkey#38) && isnotnull(p_partkey#10)) && ((p_partkey#10 = 
l_partkey#38) && (((((((p_brand#13 = Brand#12) && p_container#16 IN (SM CASE,SM 
BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) && (l_quantity#41 <= 11.0)) && 
(p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) && p_container#16 IN (MED 
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 10.0)) && (l_quantity#41 <= 
20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = Brand#34) && p_container#16 
IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity#41 >= 20.0)) && 
(l_quantity#41 <= 30.0)) && (p_size#15 <= 15))))))

newLeft:
Filter isnotnull(l_partkey#38)
+- Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
   +- Filter (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN (AIR,AIR REG) 
&& (l_shipinstruct#33 = DELIVER IN PERSON)))
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Filter isnotnull(p_partkey#10)
+- Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
   +- Filter (isnotnull(p_size#5) && (cast(cast(p_size#5 as decimal(20,0)) as 
int) >= 1))
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- Project 
[l_partkey#21,l_quantity#24,l_extendedprice#25,l_discount#26,l_shipinstruct#33,l_shipmode#34]
      +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- Project [p_partKey#0,p_brand#3,p_size#5,p_container#6]
      +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

left:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
right:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
joinCondition:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

newLeft:
Project [cast(cast(l_partkey#21 as decimal(20,0)) as int) AS 
l_partkey#38,cast(l_quantity#24 as float) AS 
l_quantity#41,cast(l_extendedprice#25 as float) AS 
l_extendedprice#42,cast(l_discount#26 as float) AS l_discount#43]
+- Filter ((isnotnull(l_partkey#21) && isnotnull(cast(cast(l_partkey#21 as 
decimal(20,0)) as int))) && (isnotnull(l_shipinstruct#33) && (l_shipmode#34 IN 
(AIR,AIR REG) && (l_shipinstruct#33 = DELIVER IN PERSON))))
   +- LogicalRDD 
[l_orderkey#20,l_partkey#21,l_suppkey#22,l_linenumber#23,l_quantity#24,l_extendedprice#25,l_discount#26,l_tax#27,l_returnflag#28,l_linestatus#29,l_shipdate#30,l_commitdate#31,l_receiptdate#32,l_shipinstruct#33,l_shipmode#34,l_comment#35],
 MapPartitionsRDD[7] at apply at Transformer.scala:22
newRight:
Project [cast(cast(p_partKey#0 as decimal(20,0)) as int) AS 
p_partKey#10,p_brand#3 AS p_brand#13,cast(cast(p_size#5 as decimal(20,0)) as 
int) AS p_size#15,p_container#6 AS p_container#16]
+- Filter ((isnotnull(p_partKey#0) && isnotnull(cast(cast(p_partKey#0 as 
decimal(20,0)) as int))) && (isnotnull(p_size#5) && (cast(cast(p_size#5 as 
decimal(20,0)) as int) >= 1)))
   +- LogicalRDD 
[p_partKey#0,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8],
 MapPartitionsRDD[5] at apply at Transformer.scala:22
newJoinCond:
Some(((p_partkey#10 = l_partkey#38) && (((((((p_brand#13 = Brand#12) && 
p_container#16 IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity#41 >= 1.0)) 
&& (l_quantity#41 <= 11.0)) && (p_size#15 <= 5)) || (((((p_brand#13 = Brand#23) 
&& p_container#16 IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity#41 >= 
10.0)) && (l_quantity#41 <= 20.0)) && (p_size#15 <= 10))) || (((((p_brand#13 = 
Brand#34) && p_container#16 IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity#41 >= 20.0)) && (l_quantity#41 <= 30.0)) && (p_size#15 <= 15)))))

As I explained in pdf file this optimization can go further and push more 
filter command before join starts. I'm going to send PR for this issue shortly. 
My plan is improve and add intelligent to PushPredicateThroughJoin.

> Reduce shuffle data by pushing filter toward storage
> ----------------------------------------------------
>
>                 Key: SPARK-14820
>                 URL: https://issues.apache.org/jira/browse/SPARK-14820
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Ali Tootoonchian
>            Priority: Trivial
>         Attachments: Reduce Shuffle Data by pushing filter toward storage.pdf
>
>
> SQL query planner can have intelligence to push down filter commands towards 
> the storage layer. If we optimize the query planner such that the IO to the 
> storage is reduced at the cost of running multiple filters (i.e., compute), 
> this should be desirable when the system is IO bound.
> Proven analysis and example is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to