This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 32259c9  [SPARK-31492][ML] flatten the result dataframe of FValueTest
32259c9 is described below

commit 32259c9733bfeed76a164a4d53b7452467569a19
Author: zhengruifeng <[email protected]>
AuthorDate: Tue Apr 21 11:09:05 2020 +0800

    [SPARK-31492][ML] flatten the result dataframe of FValueTest
    
    ### What changes were proposed in this pull request?
    add a new method  `def test(dataset: DataFrame, featuresCol: String, 
labelCol: String, flatten: Boolean): DataFrame`
    
    ### Why are the changes needed?
    
    Similar to new test method in ChiSquareTest, it will:
    1, support df operation on the returned df;
    2, make driver no longer a bottleneck with large `numFeatures`
    
    ### Does this PR introduce any user-facing change?
    Yes, add a new method
    
    ### How was this patch tested?
    existing testsuites
    
    Closes #28268 from zhengruifeng/flatten_fvalue.
    
    Authored-by: zhengruifeng <[email protected]>
    Signed-off-by: zhengruifeng <[email protected]>
---
 .../apache/spark/ml/feature/FValueSelector.scala   |  76 ++++++++-------
 .../org/apache/spark/ml/stat/FValueTest.scala      | 105 +++++++++++++--------
 2 files changed, 107 insertions(+), 74 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
index 7019590..b43954b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ml.attribute._
 import org.apache.spark.ml.linalg._
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
-import org.apache.spark.ml.stat.{FValueTest, SelectionTestResult}
+import org.apache.spark.ml.stat.FValueTest
 import org.apache.spark.ml.util._
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
@@ -200,46 +200,54 @@ final class FValueSelector @Since("3.1.0") (override val 
uid: String)
   @Since("3.1.0")
   override def fit(dataset: Dataset[_]): FValueSelectorModel = {
     transformSchema(dataset.schema, logging = true)
+    val spark = dataset.sparkSession
+    import spark.implicits._
+
+    val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol))
+    val resultDF = FValueTest.test(dataset.toDF, $(featuresCol), $(labelCol), 
true)
+
+    def getTopIndices(k: Int): Array[Int] = {
+      resultDF.sort("pValue", "featureIndex")
+        .select("featureIndex")
+        .limit(k)
+        .as[Int]
+        .collect()
+    }
 
-    val testResult = FValueTest.testRegression(dataset, getFeaturesCol, 
getLabelCol)
-      .zipWithIndex
-    val features = $(selectorType) match {
+    val indices = $(selectorType) match {
       case "numTopFeatures" =>
-        testResult
-          .sortBy { case (res, _) => res.pValue }
-          .take(getNumTopFeatures)
+        getTopIndices($(numTopFeatures))
       case "percentile" =>
-        testResult
-          .sortBy { case (res, _) => res.pValue }
-          .take((testResult.length * getPercentile).toInt)
+        getTopIndices((numFeatures * getPercentile).toInt)
       case "fpr" =>
-        testResult
-          .filter { case (res, _) => res.pValue < getFpr }
+        resultDF.select("featureIndex")
+          .where(col("pValue") < $(fpr))
+          .as[Int].collect()
       case "fdr" =>
         // This uses the Benjamini-Hochberg procedure.
         // 
https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure
-        val tempRes = testResult
-          .sortBy { case (res, _) => res.pValue }
-        val selected = tempRes
+        val f = $(fdr) / numFeatures
+        val maxIndex = resultDF.sort("pValue", "featureIndex")
+          .select("pValue")
+          .as[Double].rdd
           .zipWithIndex
-          .filter { case ((res, _), index) =>
-            res.pValue <= getFdr * (index + 1) / testResult.length
-          }
-        if (selected.isEmpty) {
-          Array.empty[(SelectionTestResult, Int)]
-        } else {
-          val maxIndex = selected.map(_._2).max
-          tempRes.take(maxIndex + 1)
-        }
+          .flatMap { case (pValue, index) =>
+            if (pValue <= f * (index + 1)) {
+              Iterator.single(index.toInt)
+            } else Iterator.empty
+          }.fold(-1)(math.max)
+        if (maxIndex >= 0) {
+          getTopIndices(maxIndex + 1)
+        } else Array.emptyIntArray
       case "fwe" =>
-        testResult
-          .filter { case (res, _) => res.pValue < getFwe / testResult.length }
+        resultDF.select("featureIndex")
+          .where(condition = col("pValue") < $(fwe) / numFeatures)
+          .as[Int].collect()
       case errorType =>
         throw new IllegalStateException(s"Unknown Selector Type: $errorType")
     }
-    val indices = features.map { case (_, index) => index }
-    copyValues(new FValueSelectorModel(uid, indices.sorted)
-      .setParent(this))
+
+    copyValues(new FValueSelectorModel(uid, indices.sorted).setParent(this))
   }
 
   @Since("3.1.0")
@@ -269,10 +277,12 @@ class FValueSelectorModel private[ml](
     val selectedFeatures: Array[Int])
   extends Model[FValueSelectorModel] with FValueSelectorParams with MLWritable 
{
 
-  var prev = -1
-  selectedFeatures.foreach { i =>
-    require(prev < i, s"Index $i follows $prev and is not strictly increasing")
-    prev = i
+  {
+    var prev = -1
+    selectedFeatures.foreach { i =>
+      require(prev < i, s"Index $i follows $prev and is not strictly 
increasing")
+      prev = i
+    }
   }
 
   /** @group setParam */
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
index 2c83c98..ad506ab 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
@@ -22,6 +22,7 @@ import org.apache.commons.math3.distribution.FDistribution
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 
@@ -50,31 +51,60 @@ object FValueTest {
    */
   @Since("3.1.0")
   def test(dataset: DataFrame, featuresCol: String, labelCol: String): 
DataFrame = {
+    test(dataset, featuresCol, labelCol, false)
+  }
+
+  /**
+   * @param dataset     DataFrame of continuous labels and continuous features.
+   * @param featuresCol Name of features column in dataset, of type `Vector` 
(`VectorUDT`)
+   * @param labelCol    Name of label column in dataset, of any numerical type
+   * @param flatten     If false, the returned DataFrame contains only a 
single Row, otherwise, one
+   *                    row per feature.
+   */
+  @Since("3.1.0")
+  def test(
+      dataset: DataFrame,
+      featuresCol: String,
+      labelCol: String,
+      flatten: Boolean): DataFrame = {
     val spark = dataset.sparkSession
-    val testResults = testRegression(dataset, featuresCol, labelCol)
-    val pValues = Vectors.dense(testResults.map(_.pValue))
-    val degreesOfFreedom = testResults.map(_.degreesOfFreedom)
-    val fValues = Vectors.dense(testResults.map(_.statistic))
-    spark.createDataFrame(Seq(FValueResult(pValues, degreesOfFreedom, 
fValues)))
+    import spark.implicits._
+
+    val resultDF = testRegression(dataset, featuresCol, labelCol)
+      .toDF("featureIndex", "pValue", "degreesOfFreedom", "fValue")
+
+    if (flatten) {
+      resultDF
+    } else {
+      resultDF.groupBy()
+        .agg(collect_list(struct("*")))
+        .as[Seq[(Int, Double, Long, Double)]]
+        .map { seq =>
+          val results = seq.toArray.sortBy(_._1)
+          val pValues = Vectors.dense(results.map(_._2))
+          val degreesOfFreedom = results.map(_._3)
+          val fValues = Vectors.dense(results.map(_._4))
+          (pValues, degreesOfFreedom, fValues)
+        }.toDF("pValues", "degreesOfFreedom", "fValues")
+    }
   }
 
   /**
    * @param dataset  DataFrame of continuous labels and continuous features.
    * @param featuresCol  Name of features column in dataset, of type `Vector` 
(`VectorUDT`)
    * @param labelCol  Name of label column in dataset, of any numerical type
-   * @return Array containing the FValueTestResult for every feature against 
the label.
+   * @return RDD containing test result of each feature, one row per feature.
    */
   private[ml] def testRegression(
       dataset: Dataset[_],
       featuresCol: String,
-      labelCol: String): Array[SelectionTestResult] = {
+      labelCol: String): RDD[(Int, Double, Long, Double)] = {
+    SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
+    SchemaUtils.checkNumericType(dataset.schema, labelCol)
 
     val spark = dataset.sparkSession
     import spark.implicits._
 
-    SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
-    SchemaUtils.checkNumericType(dataset.schema, labelCol)
-
     val Row(xMeans: Vector, xStd: Vector, yMean: Double, yStd: Double, count: 
Long) = dataset
       .select(Summarizer.metrics("mean", "std", 
"count").summary(col(featuresCol)).as("summary"),
         avg(col(labelCol)).as("yMean"),
@@ -82,9 +112,6 @@ object FValueTest {
       .select("summary.mean", "summary.std", "yMean", "yStd", "summary.count")
       .first()
 
-    val labeledPointRdd = dataset.select(col(labelCol).cast("double"), 
col(featuresCol))
-      .as[(Double, Vector)].rdd
-
     val numFeatures = xMeans.size
     val numSamples = count
     val degreesOfFreedom = numSamples - 2
@@ -92,36 +119,32 @@ object FValueTest {
     // Use two pass equation Cov[X,Y] = E[(X - E[X]) * (Y - E[Y])] to compute 
covariance because
     // one pass equation Cov[X,Y] = E[XY] - E[X]E[Y] is susceptible to 
catastrophic cancellation
     // sumForCov = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y)))
-    val sumForCov = labeledPointRdd.mapPartitions { iter =>
-      if (iter.hasNext) {
-        val array = Array.ofDim[Double](numFeatures)
-        while (iter.hasNext) {
-          val (label, features) = iter.next
-          val yDiff = label - yMean
-          if (yDiff != 0) {
-            features.iterator.zip(xMeans.iterator)
-              .foreach { case ((col, x), (_, xMean)) => array(col) += yDiff * 
(x - xMean) }
+    dataset.select(col(labelCol).cast("double"), col(featuresCol))
+      .as[(Double, Vector)].rdd
+      .mapPartitions { iter =>
+        if (iter.hasNext) {
+          val array = Array.ofDim[Double](numFeatures)
+          while (iter.hasNext) {
+            val (label, features) = iter.next
+            val yDiff = label - yMean
+            if (yDiff != 0) {
+              features.iterator.zip(xMeans.iterator)
+                .foreach { case ((col, x), (_, xMean)) => array(col) += yDiff 
* (x - xMean) }
+            }
           }
+          Iterator.tabulate(numFeatures)(col => (col, array(col)))
+        } else Iterator.empty
+      }.reduceByKey(_ + _
+      ).mapPartitions { iter =>
+        val fd = new FDistribution(1, degreesOfFreedom)
+        iter.map { case (col, sumForCov) =>
+          // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
+          val covariance = sumForCov / (numSamples - 1)
+          val corr = covariance / (yStd * xStd(col))
+          val fValue = corr * corr / (1 - corr * corr) * degreesOfFreedom
+          val pValue = 1.0 - fd.cumulativeProbability(fValue)
+          (col, pValue, degreesOfFreedom, fValue)
         }
-        Iterator.single(array)
-      } else Iterator.empty
-    }.treeReduce { case (array1, array2) =>
-      var i = 0
-      while (i < numFeatures) {
-        array1(i) += array2(i)
-        i += 1
       }
-      array1
-    }
-
-    val fd = new FDistribution(1, degreesOfFreedom)
-    Array.tabulate(numFeatures) { i =>
-      // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
-      val covariance = sumForCov (i) / (numSamples - 1)
-      val corr = covariance / (yStd * xStd(i))
-      val fValue = corr * corr / (1 - corr * corr) * degreesOfFreedom
-      val pValue = 1.0 - fd.cumulativeProbability(fValue)
-      new FValueTestResult(pValue, degreesOfFreedom, fValue)
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to