[
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-19468:
------------------------------------
Assignee: (was: Apache Spark)
> Dataset slow because of unnecessary shuffles
> --------------------------------------------
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: koert kuipers
>
> we noticed that some algos we ported from rdd to dataset are significantly
> slower, and the main reason seems to be more shuffles that we successfully
> avoid for rdds by careful partitioning. this seems to be dataset specific as
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 10000, StorageLevel(disk, 1
> replicas)
> : +- *Sort [key#5 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
> +- InMemoryRelation [key2#27, value2#28], true, 10000,
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(key2#27, 4)
> +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before
> being used in the join. however if i try to do the same with dataset i have
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> : +- InMemoryTableScan [_1#83, _2#84]
> : +- InMemoryRelation [_1#83, _2#84], true, 10000,
> StorageLevel(disk, 1 replicas)
> : +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(_1#83, 4)
> : +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_2#106._1, 4)
> +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
> +- InMemoryTableScan [_1#100, _2#101]
> +- InMemoryRelation [_1#100, _2#101], true, 10000,
> StorageLevel(disk, 1 replicas)
> +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
> +- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the
> issue seems to be in joinWith, which does some preprocessing that seems to
> confuse the planner. if i change the joinWith to join (which returns a
> dataframe) it looks a little better in that only one side gets shuffled
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- InMemoryTableScan [_1#83, _2#84]
> : +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1
> replicas)
> : +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(_1#83, 4)
> : +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_1#100 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#100, 4)
> +- InMemoryTableScan [_1#100, _2#101]
> +- InMemoryRelation [_1#100, _2#101], true, 10000,
> StorageLevel(disk, 1 replicas)
> +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
> +- LocalTableScan [_1#83, _2#84]
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]