[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15859364#comment-15859364
 ] 

ASF GitHub Bot commented on FLINK-1731:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3192#discussion_r100282403
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
    @@ -0,0 +1,263 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.clustering
    +
    +import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
    +import org.apache.flink.api.scala.{DataSet, _}
    +import org.apache.flink.ml._
    +import org.apache.flink.ml.common.{LabeledVector, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BLAS, Vector}
    +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
    +import org.apache.flink.ml.pipeline._
    +
    +
    +/**
    +  * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
    +  * points and a set of k initial centroids.
    +  *
    +  * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
    +  * used to assign new points to the learned cluster centroids.
    +  *
    +  * The KMeans algorithm works as described on Wikipedia
    +  * (http://en.wikipedia.org/wiki/K-means_clustering):
    +  *
    +  * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
    +  * between two steps:
    +  *
    +  * ===Assignment step:===
    +  *
    +  * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
    +  * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
    +  * the "nearest" mean. (Mathematically, this means partitioning the 
observations according to the
    +  * Voronoi diagram generated by the means).
    +  *
    +  * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
    +  * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
    +  * more of them.
    +  *
    +  * ===Update step:===
    +  *
    +  * Calculate the new means to be the centroids of the observations in the 
new clusters.
    +  *
    +  * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
    +  *
    +  * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
    +  * sum of squares (WCSS) objective.
    +  *
    +  * @example
    +  * {{{
    +  *       val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
    +  *       val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
    +  *
    +  *       val kmeans = KMeans()
    +  *         .setInitialCentroids(initialCentroids)
    +  *         .setNumIterations(10)
    +  *
    +  *       kmeans.fit(trainingDS)
    +  *
    +  *       // getting the computed centroids
    +  *       val centroidsResult = kmeans.centroids.get.collect()
    +  *
    +  *       // get matching clusters for new points
    +  *       val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
    +  *       val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
    +  * }}}
    +  *
    +  * =Parameters=
    +  *
    +  * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
    +  * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
    +  * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
    +  * centroids of the clusters and the reassignment of the data points will 
be repeated till the
    +  * given number of iterations is reached.
    +  * (Default value: '''10''')
    +  *
    +  * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
    +  * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
    +  * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
    +  * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of 
the initial centroids
    +  * mainly affects the outcome of the algorithm.
    +  *
    +  */
    +class KMeans extends Predictor[KMeans] {
    +
    +  import KMeans._
    +
    +  /**
    +    * Stores the learned clusters after the fit operation
    +    */
    +  var centroids: Option[DataSet[Seq[LabeledVector]]] = None
    +
    +  /**
    +    * Sets the maximum number of iterations.
    +    *
    +    * @param numIterations The maximum number of iterations.
    +    * @return itself
    +    */
    +  def setNumIterations(numIterations: Int): KMeans = {
    +    parameters.add(NumIterations, numIterations)
    +    this
    +  }
    +
    +  /**
    +    * Sets the initial centroids on which the algorithm will start 
computing. These points should
    +    * depend on the data and significantly influence the resulting 
centroids.
    +    *
    +    * @param initialCentroids A set of labeled vectors.
    +    * @return itself
    +    */
    +  def setInitialCentroids(initialCentroids: Seq[LabeledVector]): KMeans = {
    +    parameters.add(InitialCentroids, initialCentroids)
    +    this
    +  }
    +}
    +
    +/**
    +  * Companion object of KMeans. Contains convenience functions, the 
parameter type definitions
    +  * of the algorithm and the [[FitOperation]] & [[PredictOperation]].
    +  */
    +object KMeans {
    +
    +  /** Euclidean Distance Metric */
    +  val euclidean = EuclideanDistanceMetric()
    +
    +  case object NumIterations extends Parameter[Int] {
    +    val defaultValue = Some(10)
    +  }
    +
    +  case object InitialCentroids extends Parameter[Seq[LabeledVector]] {
    +    val defaultValue = None
    +  }
    +
    +  // ========================================== Factory methods 
====================================
    +
    +  def apply(): KMeans = {
    +    new KMeans()
    +  }
    +
    +  // ========================================== Operations 
=========================================
    +
    +  /**
    +    * [[PredictOperation]] for vector types. The result type is a 
[[LabeledVector]].
    +    *
    +    * @return Anew [[PredictDataSetOperation]] to predict the labels of a 
[[DataSet]] of
    +    *         [[Vector]]s.
    +    **/
    +  implicit def predictDataSet = {
    +    new PredictDataSetOperation[KMeans, Vector, LabeledVector] {
    +
    +      /** Calculates the predictions for all elements in the [[DataSet]] 
input
    +        *
    +        * @param instance          Reference to the current KMeans 
instance.
    +        * @param predictParameters Container for predication parameter.
    +        * @param testDS            Data set to make predict on.
    +        * @return A [[DataSet[LabeledVectors]] containing the nearest 
centroids.
    +        */
    +      override def predictDataSet(
    +        instance: KMeans,
    +        predictParameters: ParameterMap,
    +        testDS: DataSet[Vector])
    +      : DataSet[LabeledVector] = {
    +        instance.centroids match {
    +          case Some(centroids) =>
    +            testDS.mapWithBcVariable(centroids) {
    +              (dataPoint, centroids) => selectNearestCentroid(dataPoint, 
centroids)
    +            }
    +          case None =>
    +            throw new RuntimeException("The KMeans model has not been 
trained. Call first fit" +
    +              "before calling the predict operation.")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +    * [[FitOperation]] which iteratively computes centroids that match the 
given input DataSet by
    +    * adjusting the given initial centroids.
    +    *
    +    * @return A new  [[FitOperation]] to train the model using the 
training data set.
    +    */
    +  implicit def fitKMeans = {
    +    new FitOperation[KMeans, Vector] {
    +      override def fit(
    +        instance: KMeans,
    +        fitParameters: ParameterMap,
    +        trainingDS: DataSet[Vector])
    +      : Unit = {
    +        val resultingParameters = instance.parameters ++ fitParameters
    +
    +        val centroids: DataSet[Seq[LabeledVector]] = trainingDS.
    +          
getExecutionEnvironment.fromElements(resultingParameters.get(InitialCentroids).get)
    +        val numIterations: Int = resultingParameters.get(NumIterations).get
    +
    +        val finalCentroids = centroids.iterate(numIterations) {
    +          currentCentroids =>
    +            val newCentroids: DataSet[LabeledVector] = trainingDS
    +              .mapWithBcVariable(currentCentroids) {
    +                (dataPoint, centroids) => selectNearestCentroid(dataPoint, 
centroids)
    +              }
    +              .map(x => (x.label, x.vector, 
1.0)).withForwardedFields("label->_1; vector->_2")
    +              .groupBy(x => x._1)
    +              .reduce(
    +                (p1, p2) => (p1._1, (p1._2.asBreeze + 
p2._2.asBreeze).fromBreeze, p1._3 + p2._3)
    +              )
    +              .withForwardedFields("_1")
    +              .map(
    +                x => {
    +                  BLAS.scal(1.0 / x._3, x._2)
    +                  LabeledVector(x._1, x._2)
    +                })
    +              .withForwardedFields("_1->label")
    +
    +          // currentCentroids contains only one element. So, this is 
output only once
    +          currentCentroids.mapWithBcSet(newCentroids) {
    +            (_, newCenters) => newCenters
    +          }
    +        }
    +
    +        instance.centroids = Some(finalCentroids)
    +      }
    +    }
    +  }
    +
    +  /**
    +    * Converts a given vector into a labeled vector where the label 
denotes the label of the closest
    +    * centroid.
    +    *
    +    * @param dataPoint The vector to determine the nearest centroid.
    +    * @param centroids A collection of the centroids.
    +    * @return A [[LabeledVector]] consisting of the input vector and the 
label of the closest
    +    *         centroid.
    +    */
    +  @ForwardedFields(Array("*->vector"))
    --- End diff --
    
    These annotations don't work on functions. Instead you have to add the 
annotation to the `mapWithBcVariable` call:
    ```
    mapWithBcVariable(...) {}.withForwardedFields("*->vector")
    ```
    
    As side remark, I don't think that `LabeledVector` is the right return type 
here. Why not returning a `(closestCentroidLabel, dataPoint, 1)`. Then we could 
also save the following mapper.


> Add kMeans clustering algorithm to machine learning library
> -----------------------------------------------------------
>
>                 Key: FLINK-1731
>                 URL: https://issues.apache.org/jira/browse/FLINK-1731
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Till Rohrmann
>            Assignee: Peter Schrott
>              Labels: ML
>
> The Flink repository already contains a kMeans implementation but it is not 
> yet ported to the machine learning library. I assume that only the used data 
> types have to be adapted and then it can be more or less directly moved to 
> flink-ml.
> The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better 
> implementation because the improve the initial seeding phase to achieve near 
> optimal clustering. It might be worthwhile to implement kMeans||.
> Resources:
> [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
> [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to