[
https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601455#comment-15601455
]
Herman van Hovell commented on SPARK-18067:
-------------------------------------------
[~tejasp] This makes sense to me. However there are a few potential problems:
- You generally have a better chance of getting nicely distributed data if you
hash by multiple values. If the `key` in your example has a relatively low
cardinality we can hit significant performance problems and OOMs if we need to
buffer a lot of rows.
- I am pretty sure this will break
{{outputPartitioning/requiredChildDistribution}}. This would allow
EnsureRequirements to give us a different distribution then we have asked for.
This can be extremely problematic in case of shuffle joins, since we need to
make sure that both the left and the right relation have exactly the same
distribution.
> SortMergeJoin adds shuffle if join predicates have non partitioned columns
> --------------------------------------------------------------------------
>
> Key: SPARK-18067
> URL: https://issues.apache.org/jira/browse/SPARK-18067
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Paul Jones
> Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a
> shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" ===
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
> :- Sort [value1#1 ASC,key#0 ASC], false, 0
> : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
> : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- Sort [value2#13 ASC,key#12 ASC], false, 0
> +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed
> in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >=
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]