[
https://issues.apache.org/jira/browse/SPARK-20313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148463#comment-16148463
]
Tejas Patil commented on SPARK-20313:
-------------------------------------
I tried to replicate what you shared in the jira but dont see anything wrong
with what planner is doing. Comparing both the approaches, `SortMergeJoin` is
always being picked. The second approach does joins over individual partitions
one by one and then unions the results. Depending on your data size + configs,
it might be possible that for your case a hash based join was used which would
explain why the later approach is faster.
Approach #1
{noformat}
val df1 = hc.sql("SELECT * FROM
bucketed_partitioned_1").filter(functions.col("ds").between("1", "5"))
val df2 = hc.sql("SELECT * FROM
bucketed_partitioned_2").filter(functions.col("ds").between("1", "5"))
val df3 = df1.join(df2, Seq("ds", "user_id")).explain(true)
== Physical Plan ==
*Project [ds#38, user_id#36, name#37, name#45]
+- *SortMergeJoin [ds#38, user_id#36], [ds#46, user_id#44], [ds#38,
user_id#36], [ds#46, user_id#44], Inner
:- *Sort [ds#38 ASC NULLS FIRST, user_id#36 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ds#38, user_id#36, 200)
: +- *Filter isnotnull(user_id#36)
: +- HiveTableScan [user_id#36, name#37, ds#38], HiveTableRelation
`default`.`bucketed_partitioned_1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#36, name#37],
[ds#38], [isnotnull(ds#38), (ds#38 >= 1), (ds#38 <= 5)]
+- *Sort [ds#46 ASC NULLS FIRST, user_id#44 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ds#46, user_id#44, 200)
+- *Filter isnotnull(user_id#44)
+- HiveTableScan [user_id#44, name#45, ds#46], HiveTableRelation
`default`.`bucketed_partitioned_2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#44, name#45],
[ds#46], [isnotnull(ds#46), (ds#46 >= 1), (ds#46 <= 5)]
{noformat}
Approach #2
{noformat}
val df1 = hc.sql("SELECT * FROM bucketed_partitioned_1")
val df2 = hc.sql("SELECT * FROM bucketed_partitioned_2")
val dsValues = Seq("1111-11-11", "4444-44-44")
val df3 = dsValues.map(dsValue => {
val df1filtered = df1.filter(functions.col("ds") === dsValue)
val df2filtered = df2.filter(functions.col("ds") === dsValue)
df1filtered.join(df2filtered, Seq("user_id")) // part1 removed from join
}).reduce(_ union _)
== Physical Plan ==
Union
:- *Project [user_id#63, name#64, ds#65, name#71, ds#72]
: +- *SortMergeJoin [user_id#63], [user_id#70], [user_id#63], [user_id#70],
Inner
: :- *Sort [user_id#63 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(user_id#63, 200)
: : +- *Filter isnotnull(user_id#63)
: : +- HiveTableScan [user_id#63, name#64, ds#65], HiveTableRelation
`default`.`bucketed_partitioned_1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#63, name#64],
[ds#65], [isnotnull(ds#65), (ds#65 = 1111-11-11)]
: +- *Sort [user_id#70 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(user_id#70, 200)
: +- *Filter isnotnull(user_id#70)
: +- HiveTableScan [user_id#70, name#71, ds#72], HiveTableRelation
`default`.`bucketed_partitioned_2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#70, name#71],
[ds#72], [isnotnull(ds#72), (ds#72 = 1111-11-11)]
+- *Project [user_id#63, name#64, ds#65, name#71, ds#72]
+- *SortMergeJoin [user_id#63], [user_id#70], [user_id#63], [user_id#70],
Inner
:- *Sort [user_id#63 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(user_id#63, 200)
: +- *Filter isnotnull(user_id#63)
: +- HiveTableScan [user_id#63, name#64, ds#65], HiveTableRelation
`default`.`bucketed_partitioned_1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#63, name#64],
[ds#65], [isnotnull(ds#65), (ds#65 = 4444-44-44)]
+- *Sort [user_id#70 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user_id#70, 200)
+- *Filter isnotnull(user_id#70)
+- HiveTableScan [user_id#70, name#71, ds#72], HiveTableRelation
`default`.`bucketed_partitioned_2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#70, name#71],
[ds#72], [isnotnull(ds#72), (ds#72 = 4444-44-44)]
{noformat}
> Possible lack of join optimization when partitions are in the join condition
> ----------------------------------------------------------------------------
>
> Key: SPARK-20313
> URL: https://issues.apache.org/jira/browse/SPARK-20313
> Project: Spark
> Issue Type: Improvement
> Components: Optimizer
> Affects Versions: 2.1.0
> Reporter: Albert Meltzer
>
> Given two tables T1 and T2, partitioned on column part1, the following have
> vastly different execution performance:
> // initial, slow
> {noformat}
> val df1 = // load data from T1
> .filter(functions.col("part1").between("val1", "val2")
> val df2 = // load data from T2
> .filter(functions.col("part1").between("val1", "val2")
> val df3 = df1.join(df2, Seq("part1", "col1"))
> {noformat}
> // manually optimized, considerably faster
> {noformat}
> val df1 = // load data from T1
> val df2 = // load data from T2
> val part1values = Seq(...) // a collection of values between val1 and val2
> val df3 = part1values
> .map(part1value => {
> val df1filtered = df1.filter(functions.col("part1") === part1value)
> val df2filtered = df2.filter(functions.col("part1") === part1value)
> df1filtered.join(df2filtered, Seq("col1")) // part1 removed from join
> })
> .reduce(_ union _)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]