Repository: spark Updated Branches: refs/heads/master 48ea64bf5 -> 35c551635
[SPARK-26024][SQL] Update documentation for repartitionByRange Following [SPARK-26024](https://issues.apache.org/jira/browse/SPARK-26024), I noticed the number of elements in each partition after repartitioning using `df.repartitionByRange` can vary for the same setup: ```scala // Shuffle numbers from 0 to 1000, and make a DataFrame val df = Random.shuffle(0.to(1000)).toDF("val") // Repartition it using 3 partitions // Sum up number of elements in each partition, and collect it. // And do it several times for (i <- 0 to 9) { var counts = df.repartitionByRange(3, col("val")) .mapPartitions{part => Iterator(part.size)} .collect() println(counts.toList) } // -> the number of elements in each partition varies ``` This is expected as for performance reasons this method uses sampling to estimate the ranges (with default size of 100). Hence, the output may not be consistent, since sampling can return different values. But documentation was not mentioning it at all, leading to misunderstanding. ## What changes were proposed in this pull request? Update the documentation (Spark & PySpark) to mention the impact of `spark.sql.execution.rangeExchange.sampleSizePerPartition` on the resulting partitioned DataFrame. Closes #23025 from JulienPeloton/SPARK-26024. Authored-by: Julien <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35c55163 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35c55163 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35c55163 Branch: refs/heads/master Commit: 35c55163555f3671edd02ed0543785af82de07ca Parents: 48ea64b Author: Julien <[email protected]> Authored: Mon Nov 19 22:24:53 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Nov 19 22:24:53 2018 +0800 ---------------------------------------------------------------------- R/pkg/R/DataFrame.R | 8 ++++++++ python/pyspark/sql/dataframe.py | 5 +++++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 11 +++++++++++ 3 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/35c55163/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c99ad76..52e7657 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -767,6 +767,14 @@ setMethod("repartition", #' using \code{spark.sql.shuffle.partitions} as number of partitions.} #'} #' +#' At least one partition-by expression must be specified. +#' When no explicit sort order is specified, "ascending nulls first" is assumed. +#' +#' Note that due to performance reasons this method uses sampling to estimate the ranges. +#' Hence, the output may not be consistent, since sampling can return different values. +#' The sample size can be controlled by the config +#' \code{spark.sql.execution.rangeExchange.sampleSizePerPartition}. +#' #' @param x a SparkDataFrame. #' @param numPartitions the number of partitions to use. #' @param col the column by which the range partitioning will be performed. http://git-wip-us.apache.org/repos/asf/spark/blob/35c55163/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5748f6c..c4f4d81 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -732,6 +732,11 @@ class DataFrame(object): At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. + Note that due to performance reasons this method uses sampling to estimate the ranges. + Hence, the output may not be consistent, since sampling can return different values. + The sample size can be controlled by the config + `spark.sql.execution.rangeExchange.sampleSizePerPartition`. + >>> df.repartitionByRange(2, "age").rdd.getNumPartitions() 2 >>> df.show() http://git-wip-us.apache.org/repos/asf/spark/blob/35c55163/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f5caaf3..0e77ec0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2787,6 +2787,12 @@ class Dataset[T] private[sql]( * When no explicit sort order is specified, "ascending nulls first" is assumed. * Note, the rows are not sorted in each partition of the resulting Dataset. * + * + * Note that due to performance reasons this method uses sampling to estimate the ranges. + * Hence, the output may not be consistent, since sampling can return different values. + * The sample size can be controlled by the config + * `spark.sql.execution.rangeExchange.sampleSizePerPartition`. + * * @group typedrel * @since 2.3.0 */ @@ -2811,6 +2817,11 @@ class Dataset[T] private[sql]( * When no explicit sort order is specified, "ascending nulls first" is assumed. * Note, the rows are not sorted in each partition of the resulting Dataset. * + * Note that due to performance reasons this method uses sampling to estimate the ranges. + * Hence, the output may not be consistent, since sampling can return different values. + * The sample size can be controlled by the config + * `spark.sql.execution.rangeExchange.sampleSizePerPartition`. + * * @group typedrel * @since 2.3.0 */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
