Github user aalexandrov commented on a diff in the pull request:
https://github.com/apache/flink/pull/730#discussion_r31114073
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/TfIdfTransformer.scala
---
@@ -0,0 +1,107 @@
+package org.apache.flink.ml.feature
+
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.SparseVector
+
+import scala.math.log
+import scala.util.hashing.MurmurHash3
+import scala.collection.mutable.LinkedHashSet;
+
+/**
+ * This transformer calcuates the term-frequency times inverse document
frequency for the give DataSet of documents.
+ * The DataSet will be treated as the corpus of documents.
+ * <p>
+ * The TF is the frequence of a word inside one document
+ * <p>
+ * The IDF for a word is calculated: log(total number of documents /
documents that contain the word) + 1
+ * <p>
+ * This transformer returns a SparseVector where the index is the hash of
the word and the value the tf-idf.
+ * @author Ronny Bräunlich
+ * @author Vassil Dimov
+ * @author Filip Perisic
+ */
+class TfIdfTransformer extends Transformer[(Int, Seq[String]), (Int,
SparseVector)] {
+
+ override def transform(input: DataSet[(Int /* docId */ , Seq[String]
/*The document */ )], transformParameters: ParameterMap): DataSet[(Int,
SparseVector)] = {
+
+ val params = transformParameters.get(StopWordParameter)
+
+ // Here we will store the words in he form (docId, word, count)
+ // Count represent the occurrence of "word" in document "docId"
+ val wordCounts = input
+ //count the words
+ .flatMap(t => {
+ //create tuples docId, word, 1
+ t._2.map(s => (t._1, s, 1))
+ })
+ .filter(t =>
!transformParameters.apply(StopWordParameter).contains(t._2))
+ //group by document and word
+ .groupBy(0, 1)
+ // calculate the occurrence count of each word in specific document
+ .sum(2)
+
+ val dictionary = wordCounts
+ .map(t => LinkedHashSet(t._2))
+ .reduce((set1, set2) => set1 ++ set2)
+ .map(set => set.zipWithIndex.toMap)
+ .flatMap(m => m.toList)
+
+ val numberOfWords = wordCounts
+ .map(t => (t._2))
+ .distinct(t => t)
+ .map(t => 1)
+ .reduce(_ + _);
+
+ val idf: DataSet[(String, Double)] = calculateIDF(wordCounts)
+ val tf: DataSet[(Int, String, Int)] = wordCounts
+
+ // docId, word, tfIdf
+ val tfIdf = tf.join(idf).where(1).equalTo(0) {
+ (t1, t2) => (t1._1, t1._2, t1._3.toDouble * t2._2)
+ }
+
+ val res = tfIdf.crossWithTiny(numberOfWords)
+ // docId, word, tfIdf, numberOfWords
+ .map(t => (t._1._1, t._1._2, t._1._3, t._2))
+ //assign every word its position
+ .joinWithHuge(dictionary).where(1).equalTo(0)
--- End diff --
Use `.joinWithTiny` or `.join` or a broadcast variable.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---