This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2e86cb  [SPARK-26616][MLLIB] Expose document frequency in IDFModel
d2e86cb is described below

commit d2e86cb3cd8aaf5a2db639bb7402e41bba16e9fe
Author: Jatin Puri <purija...@gmail.com>
AuthorDate: Tue Jan 22 07:41:54 2019 -0600

    [SPARK-26616][MLLIB] Expose document frequency in IDFModel
    
    ## What changes were proposed in this pull request?
    
    This change exposes the `df` (document frequency) as a public val along 
with the number of documents (`m`) as part of the IDF model.
    
    * The document frequency is returned as an `Array[Long]`
    * If the minimum  document frequency is set, this is considered in the df 
calculation. If the count is less than minDocFreq, the df is 0 for such terms
    * numDocs is not very required. But it can be useful, if we plan to provide 
a provision in future for user to give their own idf function, instead of using 
a default (log((1+m)/(1+df))). In such cases, the user can provide a function 
taking input of `m` and `df` and returning the idf value
    * Pyspark changes
    
    ## How was this patch tested?
    
    The existing test case was edited to also check for the document frequency 
values.
    
    I  am not very good with python or pyspark. I have committed and run tests 
based on my understanding. Kindly let me know if I have missed anything
    
    Reviewer request: mengxr  zjffdu yinxusen
    
    Closes #23549 from purijatin/master.
    
    Authored-by: Jatin Puri <purija...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../scala/org/apache/spark/ml/feature/IDF.scala    | 31 +++++++++++++++++-----
 .../scala/org/apache/spark/mllib/feature/IDF.scala | 19 ++++++++-----
 .../org/apache/spark/ml/feature/IDFSuite.scala     |  7 +++--
 .../org/apache/spark/mllib/feature/IDFSuite.scala  | 12 ++++++---
 project/MimaExcludes.scala                         |  6 ++++-
 python/pyspark/ml/feature.py                       | 20 ++++++++++++++
 python/pyspark/ml/tests/test_feature.py            |  2 ++
 python/pyspark/mllib/feature.py                    | 14 ++++++++++
 8 files changed, 91 insertions(+), 20 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
index 58897cc..98a9674 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
@@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.VersionUtils.majorVersion
 
 /**
  * Params for [[IDF]] and [[IDFModel]].
@@ -151,6 +152,15 @@ class IDFModel private[ml] (
   @Since("2.0.0")
   def idf: Vector = idfModel.idf.asML
 
+  /** Returns the document frequency */
+  @Since("3.0.0")
+  def docFreq: Array[Long] = idfModel.docFreq
+
+  /** Returns number of documents evaluated to compute idf */
+  @Since("3.0.0")
+  def numDocs: Long = idfModel.numDocs
+
+
   @Since("1.6.0")
   override def write: MLWriter = new IDFModelWriter(this)
 }
@@ -160,11 +170,11 @@ object IDFModel extends MLReadable[IDFModel] {
 
   private[IDFModel] class IDFModelWriter(instance: IDFModel) extends MLWriter {
 
-    private case class Data(idf: Vector)
+    private case class Data(idf: Vector, docFreq: Array[Long], numDocs: Long)
 
     override protected def saveImpl(path: String): Unit = {
       DefaultParamsWriter.saveMetadata(instance, path, sc)
-      val data = Data(instance.idf)
+      val data = Data(instance.idf, instance.docFreq, instance.numDocs)
       val dataPath = new Path(path, "data").toString
       
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
     }
@@ -178,10 +188,19 @@ object IDFModel extends MLReadable[IDFModel] {
       val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
       val dataPath = new Path(path, "data").toString
       val data = sparkSession.read.parquet(dataPath)
-      val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
-        .select("idf")
-        .head()
-      val model = new IDFModel(metadata.uid, new 
feature.IDFModel(OldVectors.fromML(idf)))
+
+      val model = if (majorVersion(metadata.sparkVersion) >= 3) {
+        val Row(idf: Vector, df: Seq[_], numDocs: Long) = data.select("idf", 
"docFreq", "numDocs")
+          .head()
+        new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf),
+          df.asInstanceOf[Seq[Long]].toArray, numDocs))
+      } else {
+        val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
+          .select("idf")
+          .head()
+        new IDFModel(metadata.uid,
+          new feature.IDFModel(OldVectors.fromML(idf), new 
Array[Long](idf.size), 0L))
+      }
       metadata.getAndSetParams(model)
       model
     }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
index bb4b37e..6407be6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
@@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
  * This implementation supports filtering out terms which do not appear in a 
minimum number
  * of documents (controlled by the variable `minDocFreq`). For terms that are 
not in
  * at least `minDocFreq` documents, the IDF is found as 0, resulting in 
TF-IDFs of 0.
+ * The document frequency is 0 as well for such terms
  *
  * @param minDocFreq minimum of documents in which a term
  *                   should appear for filtering
@@ -50,12 +51,12 @@ class IDF @Since("1.2.0") (@Since("1.2.0") val minDocFreq: 
Int) {
    */
   @Since("1.1.0")
   def fit(dataset: RDD[Vector]): IDFModel = {
-    val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
-          minDocFreq = minDocFreq))(
+    val (idf: Vector, docFreq: Array[Long], numDocs: Long) = 
dataset.treeAggregate(
+      new IDF.DocumentFrequencyAggregator(minDocFreq = minDocFreq))(
       seqOp = (df, v) => df.add(v),
       combOp = (df1, df2) => df1.merge(df2)
     ).idf()
-    new IDFModel(idf)
+    new IDFModel(idf, docFreq, numDocs)
   }
 
   /**
@@ -128,13 +129,14 @@ private object IDF {
 
     private def isEmpty: Boolean = m == 0L
 
-    /** Returns the current IDF vector. */
-    def idf(): Vector = {
+    /** Returns the current IDF vector, docFreq, number of documents */
+    def idf(): (Vector, Array[Long], Long) = {
       if (isEmpty) {
         throw new IllegalStateException("Haven't seen any document yet.")
       }
       val n = df.length
       val inv = new Array[Double](n)
+      val dfv = new Array[Long](n)
       var j = 0
       while (j < n) {
         /*
@@ -148,10 +150,11 @@ private object IDF {
          */
         if (df(j) >= minDocFreq) {
           inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
+          dfv(j) = df(j)
         }
         j += 1
       }
-      Vectors.dense(inv)
+      (Vectors.dense(inv), dfv, m)
     }
   }
 }
@@ -160,7 +163,9 @@ private object IDF {
  * Represents an IDF model that can transform term frequency vectors.
  */
 @Since("1.1.0")
-class IDFModel private[spark] (@Since("1.1.0") val idf: Vector) extends 
Serializable {
+class IDFModel private[spark](@Since("1.1.0") val idf: Vector,
+                              @Since("3.0.0") val docFreq: Array[Long],
+                              @Since("3.0.0") val numDocs: Long) extends 
Serializable {
 
   /**
    * Transforms term frequency (TF) vectors to TF-IDF vectors.
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
index cdd62be..73b2b82 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
@@ -44,7 +44,7 @@ class IDFSuite extends MLTest with DefaultReadWriteTest {
 
   test("params") {
     ParamsSuite.checkParams(new IDF)
-    val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0)))
+    val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0), 
Array(1L), 1))
     ParamsSuite.checkParams(model)
   }
 
@@ -112,10 +112,13 @@ class IDFSuite extends MLTest with DefaultReadWriteTest {
   }
 
   test("IDFModel read/write") {
-    val instance = new IDFModel("myIDFModel", new 
OldIDFModel(Vectors.dense(1.0, 2.0)))
+    val instance = new IDFModel("myIDFModel",
+      new OldIDFModel(Vectors.dense(1.0, 2.0), Array(1, 2), 2))
       .setInputCol("myInputCol")
       .setOutputCol("myOutputCol")
     val newInstance = testDefaultReadWrite(instance)
     assert(newInstance.idf === instance.idf)
+    assert(newInstance.docFreq === instance.docFreq)
+    assert(newInstance.numDocs === instance.numDocs)
   }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
index 5c938a6..1049730 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
@@ -39,9 +39,11 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       math.log((m + 1.0) / (x + 1.0))
     })
     assert(model.idf ~== expected absTol 1e-12)
+    assert(model.numDocs === 3)
+    assert(model.docFreq === Array(0, 3, 1, 2))
 
     val assertHelper = (tfidf: Array[Vector]) => {
-      assert(tfidf.size === 3)
+      assert(tfidf.length === 3)
       val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
       assert(tfidf0.indices === Array(1, 3))
       assert(Vectors.dense(tfidf0.values) ~==
@@ -70,19 +72,21 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     )
     val m = localTermFrequencies.size
     val termFrequencies = sc.parallelize(localTermFrequencies, 2)
-    val idf = new IDF(minDocFreq = 1)
+    val idf = new IDF(minDocFreq = 2)
     val model = idf.fit(termFrequencies)
     val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
-      if (x > 0) {
+      if (x >= 2) {
         math.log((m + 1.0) / (x + 1.0))
       } else {
         0
       }
     })
     assert(model.idf ~== expected absTol 1e-12)
+    assert(model.numDocs === 3)
+    assert(model.docFreq === Array(0, 3, 0, 2))
 
     val assertHelper = (tfidf: Array[Vector]) => {
-      assert(tfidf.size === 3)
+      assert(tfidf.length === 3)
       val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
       assert(tfidf0.indices === Array(1, 3))
       assert(Vectors.dense(tfidf0.values) ~==
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3e232ba..4cf312d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -273,7 +273,11 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$1"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productIterator"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productPrefix"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3"),
+
+    // [SPARK-26616][MLlib] Expose document frequency in IDFModel
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
   )
 
   // Exclude rules for 2.4.x
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 08ae582..23d56c8 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -966,6 +966,10 @@ class IDF(JavaEstimator, HasInputCol, HasOutputCol, 
JavaMLReadable, JavaMLWritab
     >>> model = idf.fit(df)
     >>> model.idf
     DenseVector([0.0, 0.0])
+    >>> model.docFreq
+    [0, 3]
+    >>> model.numDocs == df.count()
+    True
     >>> model.transform(df).head().idf
     DenseVector([0.0, 0.0])
     >>> 
idf.setParams(outputCol="freqs").fit(df).transform(df).collect()[1].freqs
@@ -1045,6 +1049,22 @@ class IDFModel(JavaModel, JavaMLReadable, 
JavaMLWritable):
         """
         return self._call_java("idf")
 
+    @property
+    @since("3.0.0")
+    def docFreq(self):
+        """
+        Returns the document frequency.
+        """
+        return self._call_java("docFreq")
+
+    @property
+    @since("3.0.0")
+    def numDocs(self):
+        """
+        Returns number of documents evaluated to compute idf
+        """
+        return self._call_java("numDocs")
+
 
 @inherit_doc
 class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable):
diff --git a/python/pyspark/ml/tests/test_feature.py 
b/python/pyspark/ml/tests/test_feature.py
index 325feab..53d3ff9 100644
--- a/python/pyspark/ml/tests/test_feature.py
+++ b/python/pyspark/ml/tests/test_feature.py
@@ -67,6 +67,8 @@ class FeatureTests(SparkSessionTestCase):
                          "Model should inherit the UID from its parent 
estimator.")
         output = idf0m.transform(dataset)
         self.assertIsNotNone(output.head().idf)
+        self.assertIsNotNone(idf0m.docFreq)
+        self.assertEqual(idf0m.numDocs, 3)
         # Test that parameters transferred to Python Model
         check_params(self, idf0m)
 
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index b1bcdb9..905c4da 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -518,6 +518,20 @@ class IDFModel(JavaVectorTransformer):
         """
         return self.call('idf')
 
+    @since('3.0.0')
+    def docFreq(self):
+        """
+        Returns the document frequency.
+        """
+        return self.call('docFreq')
+
+    @since('3.0.0')
+    def numDocs(self):
+        """
+        Returns number of documents evaluated to compute idf
+        """
+        return self.call('numDocs')
+
 
 class IDF(object):
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to