[
https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15808507#comment-15808507
]
Tejas Patil commented on SPARK-18067:
-------------------------------------
[~hvanhovell] :
* You are right about the data distribution. Both approaches are prone to OOMs
but my approach is more likely to OOM based on the data. If the tables are
bucketed + sorted on a single column, both approaches will distribute the data
in same manner. When I checked at Hive behavior, its doing the same thing that
I mentioned and it works across all our workloads.
* Adding shuffle makes the job to be less performant in general irrespective of
data distribution. I assume that "significant performance problems" will only
happen if the data is heavily skewed on the join key ?
* I have seen OOMs happen with current spark approach (which hashes
everything). I personally feel that the row buffering should be backed by disk
spill to be more reliable. It will run a bit slower but at least will be
reliable and not page people in the middle of the night due to OOM.
I have seen some other scenario in SPARK-19122 but the proposed solution there
will help this issue as well.
> SortMergeJoin adds shuffle if join predicates have non partitioned columns
> --------------------------------------------------------------------------
>
> Key: SPARK-18067
> URL: https://issues.apache.org/jira/browse/SPARK-18067
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Paul Jones
> Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a
> shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" ===
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
> :- Sort [value1#1 ASC,key#0 ASC], false, 0
> : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
> : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- Sort [value2#13 ASC,key#12 ASC], false, 0
> +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed
> in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >=
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]