Repository: spark Updated Branches: refs/heads/master 8a87f7d5c -> 699a4dfd8
[SPARK-14632] randomSplit method fails on dataframes with maps in schema ## What changes were proposed in this pull request? The patch fixes the issue with the randomSplit method which is not able to split dataframes which has maps in schema. The bug was introduced in spark 1.6.1. ## How was this patch tested? Tested with unit tests. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Subhobrata Dey <[email protected]> Closes #12438 from sbcd90/randomSplitIssue. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/699a4dfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/699a4dfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/699a4dfd Branch: refs/heads/master Commit: 699a4dfd89dc598e79955cfd6f66c06b6bf74be6 Parents: 8a87f7d Author: Subhobrata Dey <[email protected]> Authored: Sun Apr 17 15:18:32 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sun Apr 17 15:18:32 2016 -0700 ---------------------------------------------------------------------- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/699a4dfd/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 4edc90d..fb3e184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1502,7 +1502,9 @@ class Dataset[T] private[sql]( // constituent partitions each time a split is materialized which could result in // overlapping splits. To prevent this, we explicitly sort each input partition to make the // ordering deterministic. - val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan) + // MapType cannot be sorted. + val sorted = Sort(logicalPlan.output.filterNot(_.dataType.isInstanceOf[MapType]) + .map(SortOrder(_, Ascending)), global = false, logicalPlan) val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
