Repository: spark
Updated Branches:
  refs/heads/master 76fb173dd -> 655f6f86f


[SPARK-22208][SQL] Improve percentile_approx by not rounding up targetError and 
starting from index 0

## What changes were proposed in this pull request?

Currently percentile_approx never returns the first element when percentile is 
in (relativeError, 1/N], where relativeError default 1/10000, and N is the 
total number of elements. But ideally, percentiles in [0, 1/N] should all 
return the first element as the answer.

For example, given input data 1 to 10, if a user queries 10% (or even less) 
percentile, it should return 1, because the first value 1 already reaches 10%. 
Currently it returns 2.

Based on the paper, targetError is not rounded up, and searching index should 
start from 0 instead of 1. By following the paper, we should be able to fix the 
cases mentioned above.

## How was this patch tested?

Added a new test case and fix existing test cases.

Author: Zhenhua Wang <[email protected]>

Closes #19438 from wzhfy/improve_percentile_approx.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/655f6f86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/655f6f86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/655f6f86

Branch: refs/heads/master
Commit: 655f6f86f84ff5241d1d20766e1ef83bb32ca5e0
Parents: 76fb173
Author: Zhenhua Wang <[email protected]>
Authored: Wed Oct 11 00:16:12 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Oct 11 00:16:12 2017 -0700

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R              |  8 ++++----
 .../org/apache/spark/ml/feature/ImputerSuite.scala |  2 +-
 python/pyspark/sql/dataframe.py                    |  6 +++---
 .../sql/catalyst/util/QuantileSummaries.scala      |  4 ++--
 .../sql/catalyst/util/QuantileSummariesSuite.scala | 10 ++++++++--
 .../sql/ApproximatePercentileQuerySuite.scala      | 17 ++++++++++++++++-
 .../org/apache/spark/sql/DataFrameStatSuite.scala  |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala      |  2 +-
 8 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index bbea25b..4382ef2 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2538,7 +2538,7 @@ test_that("describe() and summary() on a DataFrame", {
 
   stats2 <- summary(df)
   expect_equal(collect(stats2)[5, "summary"], "25%")
-  expect_equal(collect(stats2)[5, "age"], "30")
+  expect_equal(collect(stats2)[5, "age"], "19")
 
   stats3 <- summary(df, "min", "max", "55.1%")
 
@@ -2738,7 +2738,7 @@ test_that("sampleBy() on a DataFrame", {
 })
 
 test_that("approxQuantile() on a DataFrame", {
-  l <- lapply(c(0:99), function(i) { list(i, 99 - i) })
+  l <- lapply(c(0:100), function(i) { list(i, 100 - i) })
   df <- createDataFrame(l, list("a", "b"))
   quantiles <- approxQuantile(df, "a", c(0.5, 0.8), 0.0)
   expect_equal(quantiles, list(50, 80))
@@ -2749,8 +2749,8 @@ test_that("approxQuantile() on a DataFrame", {
   dfWithNA <- createDataFrame(data.frame(a = c(NA, 30, 19, 11, 28, 15),
                                          b = c(-30, -19, NA, -11, -28, -15)))
   quantiles3 <- approxQuantile(dfWithNA, c("a", "b"), c(0.5), 0.0)
-  expect_equal(quantiles3[[1]], list(28))
-  expect_equal(quantiles3[[2]], list(-15))
+  expect_equal(quantiles3[[1]], list(19))
+  expect_equal(quantiles3[[2]], list(-19))
 })
 
 test_that("SQL error message is returned from JVM", {

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index ee2ba73..c08b35b 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -43,7 +43,7 @@ class ImputerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Default
       (0, 1.0, 1.0, 1.0),
       (1, 3.0, 3.0, 3.0),
       (2, Double.NaN, Double.NaN, Double.NaN),
-      (3, -1.0, 2.0, 3.0)
+      (3, -1.0, 2.0, 1.0)
     )).toDF("id", "value", "expected_mean_value", "expected_median_value")
     val imputer = new 
Imputer().setInputCols(Array("value")).setOutputCols(Array("out"))
       .setMissingValue(-1.0)

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2d59622..38b01f0 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1038,8 +1038,8 @@ class DataFrame(object):
         |   mean|               3.5| null|
         | stddev|2.1213203435596424| null|
         |    min|                 2|Alice|
-        |    25%|                 5| null|
-        |    50%|                 5| null|
+        |    25%|                 2| null|
+        |    50%|                 2| null|
         |    75%|                 5| null|
         |    max|                 5|  Bob|
         +-------+------------------+-----+
@@ -1050,7 +1050,7 @@ class DataFrame(object):
         +-------+---+-----+
         |  count|  2|    2|
         |    min|  2|Alice|
-        |    25%|  5| null|
+        |    25%|  2| null|
         |    75%|  5| null|
         |    max|  5|  Bob|
         +-------+---+-----+

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index af543b0..eb7941c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -193,10 +193,10 @@ class QuantileSummaries(
 
     // Target rank
     val rank = math.ceil(quantile * count).toInt
-    val targetError = math.ceil(relativeError * count)
+    val targetError = relativeError * count
     // Minimum rank at current sample
     var minRank = 0
-    var i = 1
+    var i = 0
     while (i < sampled.length - 1) {
       val curSample = sampled(i)
       minRank += curSample.g

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index df579d5..6508139 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -57,8 +57,14 @@ class QuantileSummariesSuite extends SparkFunSuite {
   private def checkQuantile(quant: Double, data: Seq[Double], summary: 
QuantileSummaries): Unit = {
     if (data.nonEmpty) {
       val approx = summary.query(quant).get
-      // The rank of the approximation.
-      val rank = data.count(_ < approx) // has to be <, not <= to be exact
+      // Get the rank of the approximation.
+      val rankOfValue = data.count(_ <= approx)
+      val rankOfPreValue = data.count(_ < approx)
+      // `rankOfValue` is the last position of the quantile value. If the 
input repeats the value
+      // chosen as the quantile, e.g. in (1,2,2,2,2,2,3), the 50% quantile is 
2, then it's
+      // improper to choose the last position as its rank. Instead, we get the 
rank by averaging
+      // `rankOfValue` and `rankOfPreValue`.
+      val rank = math.ceil((rankOfValue + rankOfPreValue) / 2.0)
       val lower = math.floor((quant - summary.relativeError) * data.size)
       val upper = math.ceil((quant + summary.relativeError) * data.size)
       val msg =

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
index 1aea337..137c5be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
@@ -53,6 +53,21 @@ class ApproximatePercentileQuerySuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
+  test("percentile_approx, the first element satisfies small percentages") {
+    withTempView(table) {
+      (1 to 10).toDF("col").createOrReplaceTempView(table)
+      checkAnswer(
+        spark.sql(
+          s"""
+             |SELECT
+             |  percentile_approx(col, array(0.01, 0.1, 0.11))
+             |FROM $table
+           """.stripMargin),
+        Row(Seq(1, 1, 2))
+      )
+    }
+  }
+
   test("percentile_approx, array of percentile value") {
     withTempView(table) {
       (1 to 1000).toDF("col").createOrReplaceTempView(table)
@@ -130,7 +145,7 @@ class ApproximatePercentileQuerySuite extends QueryTest 
with SharedSQLContext {
       (1 to 1000).toDF("col").createOrReplaceTempView(table)
       checkAnswer(
         spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 
800D) FROM $table"),
-        Row(Seq(500D))
+        Row(Seq(499))
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 247c30e..46b21c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -141,7 +141,7 @@ class DataFrameStatSuite extends QueryTest with 
SharedSQLContext {
 
   test("approximate quantile") {
     val n = 1000
-    val df = Seq.tabulate(n)(i => (i, 2.0 * i)).toDF("singles", "doubles")
+    val df = Seq.tabulate(n + 1)(i => (i, 2.0 * i)).toDF("singles", "doubles")
 
     val q1 = 0.5
     val q2 = 0.8

http://git-wip-us.apache.org/repos/asf/spark/blob/655f6f86/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index dd8f54b..ad461fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -855,7 +855,7 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       Row("mean", null, "33.0", "178.0"),
       Row("stddev", null, "19.148542155126762", "11.547005383792516"),
       Row("min", "Alice", "16", "164"),
-      Row("25%", null, "24", "176"),
+      Row("25%", null, "16", "164"),
       Row("50%", null, "24", "176"),
       Row("75%", null, "32", "180"),
       Row("max", "David", "60", "192"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to