Repository: spark Updated Branches: refs/heads/master 76fb173dd -> 655f6f86f
[SPARK-22208][SQL] Improve percentile_approx by not rounding up targetError and starting from index 0 ## What changes were proposed in this pull request? Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer. For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2. Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above. ## How was this patch tested? Added a new test case and fix existing test cases. Author: Zhenhua Wang <[email protected]> Closes #19438 from wzhfy/improve_percentile_approx. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/655f6f86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/655f6f86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/655f6f86 Branch: refs/heads/master Commit: 655f6f86f84ff5241d1d20766e1ef83bb32ca5e0 Parents: 76fb173 Author: Zhenhua Wang <[email protected]> Authored: Wed Oct 11 00:16:12 2017 -0700 Committer: gatorsmile <[email protected]> Committed: Wed Oct 11 00:16:12 2017 -0700 ---------------------------------------------------------------------- R/pkg/tests/fulltests/test_sparkSQL.R | 8 ++++---- .../org/apache/spark/ml/feature/ImputerSuite.scala | 2 +- python/pyspark/sql/dataframe.py | 6 +++--- .../sql/catalyst/util/QuantileSummaries.scala | 4 ++-- .../sql/catalyst/util/QuantileSummariesSuite.scala | 10 ++++++++-- .../sql/ApproximatePercentileQuerySuite.scala | 17 ++++++++++++++++- .../org/apache/spark/sql/DataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 8 files changed, 36 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index bbea25b..4382ef2 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2538,7 +2538,7 @@ test_that("describe() and summary() on a DataFrame", { stats2 <- summary(df) expect_equal(collect(stats2)[5, "summary"], "25%") - expect_equal(collect(stats2)[5, "age"], "30") + expect_equal(collect(stats2)[5, "age"], "19") stats3 <- summary(df, "min", "max", "55.1%") @@ -2738,7 +2738,7 @@ test_that("sampleBy() on a DataFrame", { }) test_that("approxQuantile() on a DataFrame", { - l <- lapply(c(0:99), function(i) { list(i, 99 - i) }) + l <- lapply(c(0:100), function(i) { list(i, 100 - i) }) df <- createDataFrame(l, list("a", "b")) quantiles <- approxQuantile(df, "a", c(0.5, 0.8), 0.0) expect_equal(quantiles, list(50, 80)) @@ -2749,8 +2749,8 @@ test_that("approxQuantile() on a DataFrame", { dfWithNA <- createDataFrame(data.frame(a = c(NA, 30, 19, 11, 28, 15), b = c(-30, -19, NA, -11, -28, -15))) quantiles3 <- approxQuantile(dfWithNA, c("a", "b"), c(0.5), 0.0) - expect_equal(quantiles3[[1]], list(28)) - expect_equal(quantiles3[[2]], list(-15)) + expect_equal(quantiles3[[1]], list(19)) + expect_equal(quantiles3[[2]], list(-19)) }) test_that("SQL error message is returned from JVM", { http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index ee2ba73..c08b35b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -43,7 +43,7 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default (0, 1.0, 1.0, 1.0), (1, 3.0, 3.0, 3.0), (2, Double.NaN, Double.NaN, Double.NaN), - (3, -1.0, 2.0, 3.0) + (3, -1.0, 2.0, 1.0) )).toDF("id", "value", "expected_mean_value", "expected_median_value") val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out")) .setMissingValue(-1.0) http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2d59622..38b01f0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1038,8 +1038,8 @@ class DataFrame(object): | mean| 3.5| null| | stddev|2.1213203435596424| null| | min| 2|Alice| - | 25%| 5| null| - | 50%| 5| null| + | 25%| 2| null| + | 50%| 2| null| | 75%| 5| null| | max| 5| Bob| +-------+------------------+-----+ @@ -1050,7 +1050,7 @@ class DataFrame(object): +-------+---+-----+ | count| 2| 2| | min| 2|Alice| - | 25%| 5| null| + | 25%| 2| null| | 75%| 5| null| | max| 5| Bob| +-------+---+-----+ http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index af543b0..eb7941c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -193,10 +193,10 @@ class QuantileSummaries( // Target rank val rank = math.ceil(quantile * count).toInt - val targetError = math.ceil(relativeError * count) + val targetError = relativeError * count // Minimum rank at current sample var minRank = 0 - var i = 1 + var i = 0 while (i < sampled.length - 1) { val curSample = sampled(i) minRank += curSample.g http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala index df579d5..6508139 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala @@ -57,8 +57,14 @@ class QuantileSummariesSuite extends SparkFunSuite { private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { if (data.nonEmpty) { val approx = summary.query(quant).get - // The rank of the approximation. - val rank = data.count(_ < approx) // has to be <, not <= to be exact + // Get the rank of the approximation. + val rankOfValue = data.count(_ <= approx) + val rankOfPreValue = data.count(_ < approx) + // `rankOfValue` is the last position of the quantile value. If the input repeats the value + // chosen as the quantile, e.g. in (1,2,2,2,2,2,3), the 50% quantile is 2, then it's + // improper to choose the last position as its rank. Instead, we get the rank by averaging + // `rankOfValue` and `rankOfPreValue`. + val rank = math.ceil((rankOfValue + rankOfPreValue) / 2.0) val lower = math.floor((quant - summary.relativeError) * data.size) val upper = math.ceil((quant + summary.relativeError) * data.size) val msg = http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala index 1aea337..137c5be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala @@ -53,6 +53,21 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { } } + test("percentile_approx, the first element satisfies small percentages") { + withTempView(table) { + (1 to 10).toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s""" + |SELECT + | percentile_approx(col, array(0.01, 0.1, 0.11)) + |FROM $table + """.stripMargin), + Row(Seq(1, 1, 2)) + ) + } + } + test("percentile_approx, array of percentile value") { withTempView(table) { (1 to 1000).toDF("col").createOrReplaceTempView(table) @@ -130,7 +145,7 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { (1 to 1000).toDF("col").createOrReplaceTempView(table) checkAnswer( spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"), - Row(Seq(500D)) + Row(Seq(499)) ) } } http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 247c30e..46b21c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -141,7 +141,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { test("approximate quantile") { val n = 1000 - val df = Seq.tabulate(n)(i => (i, 2.0 * i)).toDF("singles", "doubles") + val df = Seq.tabulate(n + 1)(i => (i, 2.0 * i)).toDF("singles", "doubles") val q1 = 0.5 val q2 = 0.8 http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index dd8f54b..ad461fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -855,7 +855,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row("mean", null, "33.0", "178.0"), Row("stddev", null, "19.148542155126762", "11.547005383792516"), Row("min", "Alice", "16", "164"), - Row("25%", null, "24", "176"), + Row("25%", null, "16", "164"), Row("50%", null, "24", "176"), Row("75%", null, "32", "180"), Row("max", "David", "60", "192")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
