Repository: spark
Updated Branches:
  refs/heads/master 183242382 -> 71ad945bb


[SPARK-16426][MLLIB] Fix bug that caused NaNs in IsotonicRegression

## What changes were proposed in this pull request?

Fixed a bug that caused `NaN`s in `IsotonicRegression`. The problem occurs when 
training rows with the same feature value but different labels end up on 
different partitions. This patch changes a `sortBy` call to a 
`partitionBy(RangePartitioner)` followed by a `mapPartitions(sortBy)` in order 
to ensure that all rows with the same feature value end up on the same 
partition.

## How was this patch tested?

Added a unit test.

Author: z001qdp <nicholas.egg...@target.com>

Closes #14140 from neggert/SPARK-16426-isotonic-nan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71ad945b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71ad945b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71ad945b

Branch: refs/heads/master
Commit: 71ad945bbbdd154eae852cd7f841e98f7a83e8d4
Parents: 1832423
Author: z001qdp <nicholas.egg...@target.com>
Authored: Fri Jul 15 12:30:22 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jul 15 12:30:22 2016 +0100

----------------------------------------------------------------------
 .../spark/mllib/regression/IsotonicRegression.scala      |  9 ++++++---
 .../spark/mllib/regression/IsotonicRegressionSuite.scala | 11 +++++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71ad945b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 1cd6f2a..377326f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.RangePartitioner
 
 /**
  * Regression model for isotonic regression.
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
    */
   private def parallelPoolAdjacentViolators(
       input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = 
{
-    val parallelStepResult = input
-      .sortBy(x => (x._2, x._1))
-      .glom()
+    val keyedInput = input.keyBy(_._2)
+    val parallelStepResult = keyedInput
+      .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
+      .values
+      .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
       .flatMap(poolAdjacentViolators)
       .collect()
       .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't 
promise ordering.

http://git-wip-us.apache.org/repos/asf/spark/blob/71ad945b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index ea4f286..94da626 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
     assert(model.predictions === Array(1, 2, 2))
   }
 
+  test("SPARK-16426 isotonic regression with duplicate features that produce 
NaNs") {
+    val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 
1, 1), (0, 2, 1),
+                                                                (1, 2, 1), 
(0.5, 3, 1), (0, 3, 1)),
+                                  2)
+
+    val model = new IsotonicRegression().run(trainRDD)
+
+    assert(model.boundaries === Array(1.0, 3.0))
+    assert(model.predictions === Array(0.75, 0.75))
+  }
+
   test("isotonic regression prediction") {
     val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to