[ 
https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705309#comment-14705309
 ] 

ASF GitHub Bot commented on FLINK-2030:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/861#discussion_r37555199
  
    --- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
    @@ -119,4 +123,65 @@ object MLUtils {
     
         stringRepresentation.writeAsText(filePath)
       }
    +
    +  /** Create a [[ContinuousHistogram]] from the input data
    +    *
    +    * @param bins Number of bins required
    +    * @param data input [[DataSet]] of [[Double]]
    +    * @return [[ContinuousHistogram]] over the data
    +    */
    +  def createContinuousHistogram(data: DataSet[Double], bins: Int): 
DataSet[ContinuousHistogram] = {
    +    val min = data.reduce((x, y) => Math.min(x, y))
    +    val max = data.reduce((x, y) => Math.max(x, y))
    +
    +    val stats = min.mapWithBcVariable(max) {
    +      (minimum, maximum) => (minimum - 2 * (maximum - minimum), maximum + 
2 * (maximum - minimum))
    +    }
    +
    +    data.mapPartition(new RichMapPartitionFunction[Double, 
ContinuousHistogram] {
    +      var statistics: (Double, Double) = _
    +
    +      override def open(configuration: Configuration): Unit = {
    +        statistics = 
getRuntimeContext.getBroadcastVariable(HISTOGRAM_STATS).get(0)
    +        val minimum = statistics._1
    +        val maximum = statistics._2
    +        statistics = (minimum - 2 * (maximum - minimum), maximum + 2 * 
(maximum - minimum))
    +      }
    +
    +      override def mapPartition(
    +          values: java.lang.Iterable[Double],
    +          out: Collector[ContinuousHistogram])
    +        : Unit = {
    +        val localHistogram = new ContinuousHistogram(bins, statistics._1, 
statistics._2)
    +        val iterator = values.iterator()
    +        while (iterator.hasNext) {
    +          localHistogram.add(iterator.next())
    +        }
    +        out.collect(localHistogram)
    +      }
    +    })
    +      .withBroadcastSet(stats, HISTOGRAM_STATS)
    +      .reduce((x, y) => x.merge(y, bins))
    +  }
    +
    +  /** Create a [[DiscreteHistogram]] from the input data
    +    *
    +    * @param data input [[DataSet]] of [[Double]]
    +    * @return [[DiscreteHistogram]] over the data
    +    */
    +  def createDiscreteHistogram(data: DataSet[Double]): 
DataSet[DiscreteHistogram] = {
    +    data.mapPartition(new RichMapPartitionFunction[Double, 
DiscreteHistogram] {
    --- End diff --
    
    I don't understand. You can write something like `data.mapPartition{ 
iterator => val myHistogram = ... ; do something with myHistogram; 
Seq(myHistogram) }`. This also only creates a single histogram.


> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
>                 Key: FLINK-2030
>                 URL: https://issues.apache.org/jira/browse/FLINK-2030
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Machine Learning Library
>            Reporter: Sachin Goel
>            Assignee: Sachin Goel
>            Priority: Minor
>              Labels: ML
>
> For the implementation of the decision tree in 
> https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an 
> histogram with online updates, merging and equalization features. A reference 
> implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to