This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 32259c9 [SPARK-31492][ML] flatten the result dataframe of FValueTest
32259c9 is described below
commit 32259c9733bfeed76a164a4d53b7452467569a19
Author: zhengruifeng <[email protected]>
AuthorDate: Tue Apr 21 11:09:05 2020 +0800
[SPARK-31492][ML] flatten the result dataframe of FValueTest
### What changes were proposed in this pull request?
add a new method `def test(dataset: DataFrame, featuresCol: String,
labelCol: String, flatten: Boolean): DataFrame`
### Why are the changes needed?
Similar to new test method in ChiSquareTest, it will:
1, support df operation on the returned df;
2, make driver no longer a bottleneck with large `numFeatures`
### Does this PR introduce any user-facing change?
Yes, add a new method
### How was this patch tested?
existing testsuites
Closes #28268 from zhengruifeng/flatten_fvalue.
Authored-by: zhengruifeng <[email protected]>
Signed-off-by: zhengruifeng <[email protected]>
---
.../apache/spark/ml/feature/FValueSelector.scala | 76 ++++++++-------
.../org/apache/spark/ml/stat/FValueTest.scala | 105 +++++++++++++--------
2 files changed, 107 insertions(+), 74 deletions(-)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
index 7019590..b43954b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
-import org.apache.spark.ml.stat.{FValueTest, SelectionTestResult}
+import org.apache.spark.ml.stat.FValueTest
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@@ -200,46 +200,54 @@ final class FValueSelector @Since("3.1.0") (override val
uid: String)
@Since("3.1.0")
override def fit(dataset: Dataset[_]): FValueSelectorModel = {
transformSchema(dataset.schema, logging = true)
+ val spark = dataset.sparkSession
+ import spark.implicits._
+
+ val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol))
+ val resultDF = FValueTest.test(dataset.toDF, $(featuresCol), $(labelCol),
true)
+
+ def getTopIndices(k: Int): Array[Int] = {
+ resultDF.sort("pValue", "featureIndex")
+ .select("featureIndex")
+ .limit(k)
+ .as[Int]
+ .collect()
+ }
- val testResult = FValueTest.testRegression(dataset, getFeaturesCol,
getLabelCol)
- .zipWithIndex
- val features = $(selectorType) match {
+ val indices = $(selectorType) match {
case "numTopFeatures" =>
- testResult
- .sortBy { case (res, _) => res.pValue }
- .take(getNumTopFeatures)
+ getTopIndices($(numTopFeatures))
case "percentile" =>
- testResult
- .sortBy { case (res, _) => res.pValue }
- .take((testResult.length * getPercentile).toInt)
+ getTopIndices((numFeatures * getPercentile).toInt)
case "fpr" =>
- testResult
- .filter { case (res, _) => res.pValue < getFpr }
+ resultDF.select("featureIndex")
+ .where(col("pValue") < $(fpr))
+ .as[Int].collect()
case "fdr" =>
// This uses the Benjamini-Hochberg procedure.
//
https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure
- val tempRes = testResult
- .sortBy { case (res, _) => res.pValue }
- val selected = tempRes
+ val f = $(fdr) / numFeatures
+ val maxIndex = resultDF.sort("pValue", "featureIndex")
+ .select("pValue")
+ .as[Double].rdd
.zipWithIndex
- .filter { case ((res, _), index) =>
- res.pValue <= getFdr * (index + 1) / testResult.length
- }
- if (selected.isEmpty) {
- Array.empty[(SelectionTestResult, Int)]
- } else {
- val maxIndex = selected.map(_._2).max
- tempRes.take(maxIndex + 1)
- }
+ .flatMap { case (pValue, index) =>
+ if (pValue <= f * (index + 1)) {
+ Iterator.single(index.toInt)
+ } else Iterator.empty
+ }.fold(-1)(math.max)
+ if (maxIndex >= 0) {
+ getTopIndices(maxIndex + 1)
+ } else Array.emptyIntArray
case "fwe" =>
- testResult
- .filter { case (res, _) => res.pValue < getFwe / testResult.length }
+ resultDF.select("featureIndex")
+ .where(condition = col("pValue") < $(fwe) / numFeatures)
+ .as[Int].collect()
case errorType =>
throw new IllegalStateException(s"Unknown Selector Type: $errorType")
}
- val indices = features.map { case (_, index) => index }
- copyValues(new FValueSelectorModel(uid, indices.sorted)
- .setParent(this))
+
+ copyValues(new FValueSelectorModel(uid, indices.sorted).setParent(this))
}
@Since("3.1.0")
@@ -269,10 +277,12 @@ class FValueSelectorModel private[ml](
val selectedFeatures: Array[Int])
extends Model[FValueSelectorModel] with FValueSelectorParams with MLWritable
{
- var prev = -1
- selectedFeatures.foreach { i =>
- require(prev < i, s"Index $i follows $prev and is not strictly increasing")
- prev = i
+ {
+ var prev = -1
+ selectedFeatures.foreach { i =>
+ require(prev < i, s"Index $i follows $prev and is not strictly
increasing")
+ prev = i
+ }
}
/** @group setParam */
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
index 2c83c98..ad506ab 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
@@ -22,6 +22,7 @@ import org.apache.commons.math3.distribution.FDistribution
import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
@@ -50,31 +51,60 @@ object FValueTest {
*/
@Since("3.1.0")
def test(dataset: DataFrame, featuresCol: String, labelCol: String):
DataFrame = {
+ test(dataset, featuresCol, labelCol, false)
+ }
+
+ /**
+ * @param dataset DataFrame of continuous labels and continuous features.
+ * @param featuresCol Name of features column in dataset, of type `Vector`
(`VectorUDT`)
+ * @param labelCol Name of label column in dataset, of any numerical type
+ * @param flatten If false, the returned DataFrame contains only a
single Row, otherwise, one
+ * row per feature.
+ */
+ @Since("3.1.0")
+ def test(
+ dataset: DataFrame,
+ featuresCol: String,
+ labelCol: String,
+ flatten: Boolean): DataFrame = {
val spark = dataset.sparkSession
- val testResults = testRegression(dataset, featuresCol, labelCol)
- val pValues = Vectors.dense(testResults.map(_.pValue))
- val degreesOfFreedom = testResults.map(_.degreesOfFreedom)
- val fValues = Vectors.dense(testResults.map(_.statistic))
- spark.createDataFrame(Seq(FValueResult(pValues, degreesOfFreedom,
fValues)))
+ import spark.implicits._
+
+ val resultDF = testRegression(dataset, featuresCol, labelCol)
+ .toDF("featureIndex", "pValue", "degreesOfFreedom", "fValue")
+
+ if (flatten) {
+ resultDF
+ } else {
+ resultDF.groupBy()
+ .agg(collect_list(struct("*")))
+ .as[Seq[(Int, Double, Long, Double)]]
+ .map { seq =>
+ val results = seq.toArray.sortBy(_._1)
+ val pValues = Vectors.dense(results.map(_._2))
+ val degreesOfFreedom = results.map(_._3)
+ val fValues = Vectors.dense(results.map(_._4))
+ (pValues, degreesOfFreedom, fValues)
+ }.toDF("pValues", "degreesOfFreedom", "fValues")
+ }
}
/**
* @param dataset DataFrame of continuous labels and continuous features.
* @param featuresCol Name of features column in dataset, of type `Vector`
(`VectorUDT`)
* @param labelCol Name of label column in dataset, of any numerical type
- * @return Array containing the FValueTestResult for every feature against
the label.
+ * @return RDD containing test result of each feature, one row per feature.
*/
private[ml] def testRegression(
dataset: Dataset[_],
featuresCol: String,
- labelCol: String): Array[SelectionTestResult] = {
+ labelCol: String): RDD[(Int, Double, Long, Double)] = {
+ SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
+ SchemaUtils.checkNumericType(dataset.schema, labelCol)
val spark = dataset.sparkSession
import spark.implicits._
- SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
- SchemaUtils.checkNumericType(dataset.schema, labelCol)
-
val Row(xMeans: Vector, xStd: Vector, yMean: Double, yStd: Double, count:
Long) = dataset
.select(Summarizer.metrics("mean", "std",
"count").summary(col(featuresCol)).as("summary"),
avg(col(labelCol)).as("yMean"),
@@ -82,9 +112,6 @@ object FValueTest {
.select("summary.mean", "summary.std", "yMean", "yStd", "summary.count")
.first()
- val labeledPointRdd = dataset.select(col(labelCol).cast("double"),
col(featuresCol))
- .as[(Double, Vector)].rdd
-
val numFeatures = xMeans.size
val numSamples = count
val degreesOfFreedom = numSamples - 2
@@ -92,36 +119,32 @@ object FValueTest {
// Use two pass equation Cov[X,Y] = E[(X - E[X]) * (Y - E[Y])] to compute
covariance because
// one pass equation Cov[X,Y] = E[XY] - E[X]E[Y] is susceptible to
catastrophic cancellation
// sumForCov = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y)))
- val sumForCov = labeledPointRdd.mapPartitions { iter =>
- if (iter.hasNext) {
- val array = Array.ofDim[Double](numFeatures)
- while (iter.hasNext) {
- val (label, features) = iter.next
- val yDiff = label - yMean
- if (yDiff != 0) {
- features.iterator.zip(xMeans.iterator)
- .foreach { case ((col, x), (_, xMean)) => array(col) += yDiff *
(x - xMean) }
+ dataset.select(col(labelCol).cast("double"), col(featuresCol))
+ .as[(Double, Vector)].rdd
+ .mapPartitions { iter =>
+ if (iter.hasNext) {
+ val array = Array.ofDim[Double](numFeatures)
+ while (iter.hasNext) {
+ val (label, features) = iter.next
+ val yDiff = label - yMean
+ if (yDiff != 0) {
+ features.iterator.zip(xMeans.iterator)
+ .foreach { case ((col, x), (_, xMean)) => array(col) += yDiff
* (x - xMean) }
+ }
}
+ Iterator.tabulate(numFeatures)(col => (col, array(col)))
+ } else Iterator.empty
+ }.reduceByKey(_ + _
+ ).mapPartitions { iter =>
+ val fd = new FDistribution(1, degreesOfFreedom)
+ iter.map { case (col, sumForCov) =>
+ // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
+ val covariance = sumForCov / (numSamples - 1)
+ val corr = covariance / (yStd * xStd(col))
+ val fValue = corr * corr / (1 - corr * corr) * degreesOfFreedom
+ val pValue = 1.0 - fd.cumulativeProbability(fValue)
+ (col, pValue, degreesOfFreedom, fValue)
}
- Iterator.single(array)
- } else Iterator.empty
- }.treeReduce { case (array1, array2) =>
- var i = 0
- while (i < numFeatures) {
- array1(i) += array2(i)
- i += 1
}
- array1
- }
-
- val fd = new FDistribution(1, degreesOfFreedom)
- Array.tabulate(numFeatures) { i =>
- // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
- val covariance = sumForCov (i) / (numSamples - 1)
- val corr = covariance / (yStd * xStd(i))
- val fValue = corr * corr / (1 - corr * corr) * degreesOfFreedom
- val pValue = 1.0 - fd.cumulativeProbability(fValue)
- new FValueTestResult(pValue, degreesOfFreedom, fValue)
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]