Repository: spark
Updated Branches:
  refs/heads/master f86f71763 -> 5886b6217


[SPARK-14533][MLLIB] RowMatrix.computeCovariance inaccurate when values are 
very large (partial fix)

## What changes were proposed in this pull request?

Fix for part of SPARK-14533: trivial simplification and more accurate 
computation of column means. See also 
https://github.com/apache/spark/pull/12299 which contained a complete fix that 
was very slow. This PR does _not_ resolve SPARK-14533 entirely.

## How was this patch tested?

Existing tests.

Author: Sean Owen <so...@cloudera.com>

Closes #12779 from srowen/SPARK-14533.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5886b621
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5886b621
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5886b621

Branch: refs/heads/master
Commit: 5886b6217b7ac783ec605e38f5d960048d448976
Parents: f86f717
Author: Sean Owen <so...@cloudera.com>
Authored: Sat Apr 30 00:15:41 2016 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Sat Apr 30 00:15:41 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/linalg/BLAS.scala    |  5 ++--
 .../mllib/linalg/distributed/RowMatrix.scala    | 24 +++++++-------------
 .../stat/correlation/PearsonCorrelation.scala   |  2 +-
 .../spark/mllib/stat/CorrelationSuite.scala     | 22 ++++++++++++------
 4 files changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5886b621/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
index 19cc942..6a85608 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
@@ -237,7 +237,7 @@ private[spark] object BLAS extends Serializable with 
Logging {
   }
 
   /**
-   * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's 
?SPR.
+   * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's 
?SPR.
    *
    * @param U the upper triangular part of the matrix in a 
[[DenseVector]](column major)
    */
@@ -246,7 +246,7 @@ private[spark] object BLAS extends Serializable with 
Logging {
   }
 
   /**
-   * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's 
?SPR.
+   * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's 
?SPR.
    *
    * @param U the upper triangular part of the matrix packed in an array 
(column major)
    */
@@ -267,7 +267,6 @@ private[spark] object BLAS extends Serializable with 
Logging {
           col = indices(j)
           // Skip empty columns.
           colStartIdx += (col - prevCol) * (col + prevCol + 1) / 2
-          col = indices(j)
           av = alpha * values(j)
           i = 0
           while (i <= j) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5886b621/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index f6183a5..4b8ed30 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -115,10 +115,10 @@ class RowMatrix @Since("1.0.0") (
     checkNumColumns(n)
     // Computes n*(n+1)/2, avoiding overflow in the multiplication.
     // This succeeds when n <= 65535, which is checked above
-    val nt: Int = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
+    val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
 
     // Compute the upper triangular part of the gram matrix.
-    val GU = rows.treeAggregate(new BDV[Double](new Array[Double](nt)))(
+    val GU = rows.treeAggregate(new BDV[Double](nt))(
       seqOp = (U, v) => {
         BLAS.spr(1.0, v, U.data)
         U
@@ -328,25 +328,17 @@ class RowMatrix @Since("1.0.0") (
     val n = numCols().toInt
     checkNumColumns(n)
 
-    val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, 
BDV.zeros[Double](n)))(
-      seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += 
v.toBreeze),
-      combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) =>
-        (s1._1 + s2._1, s1._2 += s2._2)
-    )
-
-    if (m <= 1) {
-      sys.error(s"RowMatrix.computeCovariance called on matrix with only $m 
rows." +
-        "  Cannot compute the covariance of a RowMatrix with <= 1 row.")
-    }
-    updateNumRows(m)
-
-    mean :/= m.toDouble
+    val summary = computeColumnSummaryStatistics()
+    val m = summary.count
+    require(m > 1, s"RowMatrix.computeCovariance called on matrix with only $m 
rows." +
+      "  Cannot compute the covariance of a RowMatrix with <= 1 row.")
+    val mean = summary.mean
 
     // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not 
accurate if E[X * Y] is
     // large but Cov(X, Y) is small, but it is good for sparse computation.
     // TODO: find a fast and stable way for sparse data.
 
-    val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+    val G = computeGramianMatrix().toBreeze
 
     var i = 0
     var j = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/5886b621/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
index f131f69..515be0b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
@@ -52,7 +52,7 @@ private[stat] object PearsonCorrelation extends Correlation 
with Logging {
 
   /**
    * Compute the Pearson correlation matrix from the covariance matrix.
-   * 0 covariance results in a correlation value of Double.NaN.
+   * 0 variance results in a correlation value of Double.NaN.
    */
   def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): Matrix 
= {
     val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]

http://git-wip-us.apache.org/repos/asf/spark/blob/5886b621/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
index eaa819c..700f803 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
@@ -22,6 +22,7 @@ import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.random.RandomRDDs
 import org.apache.spark.mllib.stat.correlation.{Correlations, 
PearsonCorrelation,
   SpearmanCorrelation}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -42,10 +43,10 @@ class CorrelationSuite extends SparkFunSuite with 
MLlibTestSparkContext with Log
   test("corr(x, y) pearson, 1 value in data") {
     val x = sc.parallelize(Array(1.0))
     val y = sc.parallelize(Array(4.0))
-    intercept[RuntimeException] {
+    intercept[IllegalArgumentException] {
       Statistics.corr(x, y, "pearson")
     }
-    intercept[RuntimeException] {
+    intercept[IllegalArgumentException] {
       Statistics.corr(x, y, "spearman")
     }
   }
@@ -127,15 +128,22 @@ class CorrelationSuite extends SparkFunSuite with 
MLlibTestSparkContext with Log
     assert(Correlations.getCorrelationFromName("pearson") === pearson)
     assert(Correlations.getCorrelationFromName("spearman") === spearman)
 
-    // Should throw IllegalArgumentException
-    try {
+    intercept[IllegalArgumentException] {
       Correlations.getCorrelationFromName("kendall")
-      assert(false)
-    } catch {
-      case ie: IllegalArgumentException =>
     }
   }
 
+  ignore("Pearson correlation of very large uncorrelated values 
(SPARK-14533)") {
+    // The two RDDs should have 0 correlation because they're random;
+    // this should stay the same after shifting them by any amount
+    // In practice a large shift produces very large values which can reveal
+    // round-off problems
+    val a = RandomRDDs.normalRDD(sc, 100000, 10).map(_ + 1000000000.0)
+    val b = RandomRDDs.normalRDD(sc, 100000, 10).map(_ + 1000000000.0)
+    val p = Statistics.corr(a, b, method = "pearson")
+    assert(approxEqual(p, 0.0, 0.01))
+  }
+
   def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = 
{
     if (v1.isNaN) {
       v2.isNaN


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to