[
https://issues.apache.org/jira/browse/FLINK-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908176#comment-14908176
]
ASF GitHub Bot commented on FLINK-1719:
---------------------------------------
Github user sachingoel0101 commented on a diff in the pull request:
https://github.com/apache/flink/pull/1156#discussion_r40442722
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/MultinomialNaiveBayes.scala
---
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import java.{lang, util}
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.ml.common.{ParameterMap, Parameter}
+import org.apache.flink.ml.pipeline.{PredictDataSetOperation,
FitOperation, Predictor}
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.Map
+
+/**
+ * While building the model different approaches need to be compared.
+ * For that purpose the fitParameters are used. Every possibility that
might enhance
+ * the implementation can be chosen separately by using the following list
of parameters:
+ *
+ * Possibility 1: way of calculating document count
+ * P1 = 0 -> use .count() to get count of all documents
+ * P1 = 1 -> use a reducer and a mapper to create a broadcast data set
containing the count of
+ * all documents
+ *
+ * Possibility 2: all words in class (order of operators)
+ * If p2 = 1 improves the speed, many other calculations must switch
their operators, too.
+ * P2 = 0 -> first the reducer, than the mapper
+ * P2 = 1 -> first the mapper, than the reducer
+ *
+ * Possibility 3: way of calculating pwc
+ * P2 = 0 -> join singleWordsInClass and allWordsInClass to wordsInClass
data set
+ * P3 = 1 -> work on singleWordsInClass data set and broadcast
allWordsInClass data set
+ *
+ * Schneider/Rennie 1: ignore/reduce word frequency information
+ * SR1 = 0 -> word frequency information is not ignored
+ * SR1 = 1 -> word frequency information is ignored (Schneiders approach)
+ * SR1 = 2 -> word frequency information is reduced (Rennies approach)
+ *
+ * Schneider1: ignore P(c_j) in cMAP formula
+ * S1 = 0 -> normal cMAP formula
+ * S2 = 1 -> cMAP without P(c_j)
+ *
+ * Rennie1: transform document frequency
+ * R1 = 0 -> normal formula
+ * R1 = 1 -> apply inverse document frequecy
+ * Note: if R1 = 1 and SR1 = 2, both approaches get applied.
+ *
+ */
+class MultinomialNaiveBayes extends Predictor[MultinomialNaiveBayes] {
+
+ import MultinomialNaiveBayes._
+
+ //The model, that stores all needed information that are related to one
specific word
+ var wordRelatedModelData: Option[DataSet[(String, String, Double)]] =
+ None // (class name -> word -> log P(w|c))
+
+ //The model, that stores all needed information that are related to one
specifc class+
+ var classRelatedModelData: Option[DataSet[(String, Double, Double)]] =
+ None // (class name -> p(c) -> log p(w|c) not in class)
+
+ //A data set that stores additional needed information for some of the
improvements
+ var improvementData: Option[DataSet[(String, Double)]] =
+ None // (word -> log number of documents in all classes / word
frequency in all classes
+
+ // ============================== Parameter configuration
========================================
+
+ def setP1(value: Int): MultinomialNaiveBayes = {
+ parameters.add(P1, value)
+ this
+ }
+
+ def setP2(value: Int): MultinomialNaiveBayes = {
+ parameters.add(P2, value)
+ this
+ }
+
+ def setP3(value: Int): MultinomialNaiveBayes = {
+ parameters.add(P3, value)
+ this
+ }
+
+ def setSR1(value: Int): MultinomialNaiveBayes = {
+ parameters.add(SR1, value)
+ this
+ }
+
+ def setS1(value: Int): MultinomialNaiveBayes = {
+ parameters.add(S1, value)
+ this
+ }
+
+ def setR1(value: Int): MultinomialNaiveBayes = {
+ parameters.add(R1, value)
+ this
+ }
+
+ // =============================================== Methods
=======================================
+
+ /**
+ * Save already existing model data created by the NaiveBayes algorithm.
Requires the designated
+ * locations. The saved data is a representation of the
[[wordRelatedModelData]] and
+ * [[classRelatedModelData]].
+ * @param wordRelated, the save location for the wordRelated data
+ * @param classRelated, the save location for the classRelated data
+ */
+ def saveModelDataSet(wordRelated: String, classRelated: String) : Unit =
{
+ wordRelatedModelData.get.writeAsCsv(wordRelated, "\n", "|",
WriteMode.OVERWRITE)
+ classRelatedModelData.get.writeAsCsv(classRelated, "\n", "|",
WriteMode.OVERWRITE)
+ }
+
+ /**
+ * Save the improvment data set. Requires the designated save location.
The saved data is a
+ * representation of the [[improvementData]] data set.
+ * @param path, the save location for the improvment data
+ */
+ def saveImprovementDataSet(path: String) : Unit = {
+ improvementData.get.writeAsCsv(path, "\n", "|", WriteMode.OVERWRITE)
+ }
+
+ /**
+ * Sets the [[wordRelatedModelData]] and the [[classRelatedModelData]]
to the given data sets.
+ * @param wordRelated, the data set representing the wordRelated model
+ * @param classRelated, the data set representing the classRelated model
+ */
+ def setModelDataSet(wordRelated : DataSet[(String, String, Double)],
+ classRelated: DataSet[(String, Double, Double)]) :
Unit = {
+ this.wordRelatedModelData = Some(wordRelated)
+ this.classRelatedModelData = Some(classRelated)
+ }
+
+ def setImprovementDataSet(impSet : DataSet[(String, Double)]) : Unit = {
+ this.improvementData = Some(impSet)
+ }
+
+}
+
+object MultinomialNaiveBayes {
+
+ // ========================================== Parameters
=========================================
+ case object P1 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ case object P2 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ case object P3 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ case object SR1 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ case object S1 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ case object R1 extends Parameter[Int] {
+ override val defaultValue: Option[Int] = Some(0)
+ }
+
+ // ======================================== Factory Methods
======================================
+ def apply(): MultinomialNaiveBayes = {
+ new MultinomialNaiveBayes()
+ }
+
+ // ====================================== Operations
=============================================
+ /**
+ * Trains the models to fit the training data. The resulting
+ * [[MultinomialNaiveBayes.wordRelatedModelData]] and
+ * [[MultinomialNaiveBayes.classRelatedModelData]] are stored in the
[[MultinomialNaiveBayes]]
+ * instance.
+ */
+
+ implicit val fitNNB = new FitOperation[MultinomialNaiveBayes, (String,
String)] {
+ /**
+ * The [[FitOperation]] used to create the model. Requires an instance
of
+ * [[MultinomialNaiveBayes]], a [[ParameterMap]] and the input data
set. This data set
+ * maps (string -> string) containing (label -> text, words separated
by ",")
+ * @param instance of [[MultinomialNaiveBayes]]
+ * @param fitParameters, additional parameters
+ * @param input, the to processed data set
+ */
+ override def fit(instance: MultinomialNaiveBayes,
+ fitParameters: ParameterMap,
+ input: DataSet[(String, String)]): Unit = {
+
+ val resultingParameters = instance.parameters ++ fitParameters
+
+ //Count the amount of documents for each class.
+ // 1. Map: replace the document text by a 1
+ // 2. Group-Reduce: sum the 1s by class
+ val documentsPerClass: DataSet[(String, Int)] = input.map { input =>
(input._1, 1)}
+ .groupBy(0).sum(1) // (class name -> count of documents)
+
+ //Count the amount of occurrences of each word for each class.
+ // 1. FlatMap: split the document into its words and add a 1 to each
tuple
+ // 2. Group-Reduce: sum the 1s by class, word
+ var singleWordsInClass: DataSet[(String, String, Int)] = input
+ .flatMap(new SingleWordSplitter())
+ .groupBy(0, 1).sum(2) // (class name -> word -> count of that word)
+
+ //POSSIBILITY 2: all words in class (order of operators)
+ //SCHNEIDER/RENNIE 1: ignore/reduce word frequency information
+ //the allWordsInClass data set does only contain distinct
+ //words for schneiders approach: ndw(cj), nothing changes for
rennies approach
+
+ val p2 = resultingParameters(P2)
+
+ val sr1 = resultingParameters(SR1)
+
+ var allWordsInClass: DataSet[(String, Int)] =
+ null // (class name -> count of all words in that class)
+
+ if (p2 == 0) {
+ if (sr1 == 0 || sr1 == 2) {
+ //Count all the words for each class.
+ // 1. Reduce: add the count for each word in a class together
+ // 2. Map: remove the field that contains the word
+ allWordsInClass = singleWordsInClass.groupBy(0).reduce {
+ (singleWords1, singleWords2) =>
+ (singleWords1._1, singleWords1._2, singleWords1._3 +
singleWords2._3)
+ }.map(singleWords =>
+ (singleWords._1, singleWords._3)) // (class name -> count of
all words in that class)
+ } else if (sr1 == 1) {
+ //Count all distinct words for each class.
+ // 1. Map: set the word count to 1
+ // 2. Reduce: add the count for each word in a class together
+ // 3. Map: remove the field that contains the word
+ allWordsInClass = singleWordsInClass
+ .map(singleWords => (singleWords._1, singleWords._2, 1))
+ .groupBy(0).reduce {
+ (singleWords1, singleWords2) =>
+ (singleWords1._1, singleWords1._2, singleWords1._3 +
singleWords2._3)
+ }.map(singleWords =>
+ (singleWords._1, singleWords._3))//(class name -> count of
distinct words in that class)
+ }
+ } else if (p2 == 1) {
+ if (sr1 == 0 || sr1 == 2) {
+ //Count all the words for each class.
+ // 1. Map: remove the field that contains the word
+ // 2. Reduce: add the count for each word in a class together
+ allWordsInClass = singleWordsInClass.map(singleWords =>
(singleWords._1, singleWords._3))
+ .groupBy(0).reduce {
+ (singleWords1, singleWords2) => (singleWords1._1,
singleWords1._2 + singleWords2._2)
+ } // (class name -> count of all words in that class)
+ } else if (sr1 == 1) {
+ //Count all distinct words for each class.
+ // 1. Map: remove the field that contains the word, set the word
count to 1
+ // 2. Reduce: add the count for each word in a class together
+ allWordsInClass = singleWordsInClass.map(singleWords =>
(singleWords._1, 1))
+ .groupBy(0).reduce {
+ (singleWords1, singleWords2) => (singleWords1._1,
singleWords1._2 + singleWords2._2)
+ } // (class name -> count of distinct words in that class)
+ }
+
+ }
+
+ //END SCHNEIDER/RENNIE 1
+ //END POSSIBILITY 2
+
+ //POSSIBILITY 1: way of calculating document count
+ val p1 = resultingParameters(P1)
+
+ var pc: DataSet[(String, Double)] = null // (class name -> P(c) in
class)
+
+ if (p1 == 0) {
+ val documentsCount: Double = input.count() //count of all documents
+ //Calculate P(c)
+ // 1. Map: divide count of documents for a class through total
count of documents
+ pc = documentsPerClass.map(line => (line._1, line._2 /
documentsCount))
+
+ } else if (p1 == 1) {
+ //Create a data set that contains only one double value: the count
of all documents
+ // 1. Reduce: At the count of documents together
+ // 2. Map: Remove field that contains document identifier
+ val documentCount: DataSet[(Double)] = documentsPerClass
+ .reduce((line1, line2) => (line1._1, line1._2 + line2._2))
+ .map(line => line._2) //(count of all documents)
+
+ //calculate P(c)
+ // 1. Map: divide count of documents for a class through total
count of documents
+ // (only element in documentCount data set)
+ pc = documentsPerClass.map(new RichMapFunction[(String, Int),
(String, Double)] {
+
+ var broadcastSet: util.List[Double] = null
+
+ override def open(config: Configuration): Unit = {
+ broadcastSet =
getRuntimeContext.getBroadcastVariable[Double]("documentCount")
+ if (broadcastSet.size() != 1) {
+ throw new RuntimeException("The document count data set
used by p1 = 1 has the " +
+ "wrong size! Please use p1 = 0 if the problem can not be
solved.")
+ }
+ }
+
+ override def map(value: (String, Int)): (String, Double) = {
+ (value._1, value._2 / broadcastSet.get(0))
+ }
+ }).withBroadcastSet(documentCount, "documentCount")
+ }
+ //END POSSIBILITY 1
+
+ // (list of all words, but distinct)
+ val vocabulary = singleWordsInClass.map(tuple => (tuple._2,
1)).distinct(0)
+ // (count of items in vocabulary list)
+ val vocabularyCount: Double = vocabulary.count()
+
+ //calculate the P(w|c) value for words, that are not part of a
class, needed for smoothing
+ // 1. Map: use P(w|c) formula with smoothing with n(c_j, w_t) = 0
+ val pwcNotInClass: DataSet[(String, Double)] = allWordsInClass
+ .map(line =>
+ (line._1, 1 / (line._2 + vocabularyCount))) // (class name ->
P(w|c) word not in class)
+
+ //SCHNEIDER/RENNIE 1: ignore/reduce word frequency information
+ //The singleWordsInClass data set must be changed before, the
calculation of pwc starts for
+ //schneider, it needs this form classname -> word -> number of
documents containing wt in cj
+
+ if (sr1 == 1) {
+ //Calculate the required data set (see above)
+ // 1. FlatMap: class -> word -> 1 (one tuple for each document in
which this word occurs)
+ // 2. Group-Reduce: sum all 1s where the first two fields equal
+ // 3. Map: Remove unesseccary count of word and replace with 1
+ singleWordsInClass = input
+ .flatMap(new SingleDistinctWordSplitter())
+ .groupBy(0, 1)
+ .reduce((line1, line2) => (line1._1, line1._2, line1._3 +
line2._3))
+ }
+
+ //END SCHNEIDER/RENNIE 1
+
+ //POSSIBILITY 3: way of calculating pwc
+
+ val p3 = resultingParameters(P3)
+
--- End diff --
line breaks
> Add naive Bayes classification algorithm to machine learning library
> --------------------------------------------------------------------
>
> Key: FLINK-1719
> URL: https://issues.apache.org/jira/browse/FLINK-1719
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Till Rohrmann
> Assignee: Jonathan Hasenburg
> Labels: ML
>
> Add naive Bayes algorithm to Flink's machine learning library as a basic
> classification algorithm. Maybe we can incorporate some of the improvements
> developed by [Karl-Michael
> Schneider|http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.59.2085&rep=rep1&type=pdf],
> [Sang-Bum Kim et
> al.|http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=1704799] or
> [Jason Rennie et
> al.|http://people.csail.mit.edu/jrennie/papers/icml03-nb.pdf] into the
> implementation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)