[
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605595#comment-14605595
]
ASF GitHub Bot commented on FLINK-1731:
---------------------------------------
Github user thvasilo commented on a diff in the pull request:
https://github.com/apache/flink/pull/700#discussion_r33462036
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least
within-cluster sum of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[Vector] =
env.fromCollection(Clustering.trainingData)
+ * val initialCentroids: DataSet[LabledVector] =
env.fromCollection(Clustering.initCentroids)
+ *
+ * val kmeans = KMeans()
+ * .setInitialCentroids(initialCentroids)
+ * .setNumIterations(10)
+ *
+ * kmeans.fit(trainingDS)
+ *
+ * // getting the computed centroids
+ * val centroidsResult = kmeans.centroids.get.collect()
+ *
+ * // get matching clusters for new points
+ * val testDS: DataSet[Vector] =
env.fromCollection(Clustering.testData)
+ * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of
the initial centroids
+ * mainly affects the outcome of the algorithm.
+ *
+ */
+class KMeans extends Predictor[KMeans] {
+
+ import KMeans._
+
+ /** Stores the learned clusters after the fit operation */
+ var centroids: Option[DataSet[LabeledVector]] = None
+
+ /**
+ * Sets the number of iterations.
+ *
+ * @param numIterations
+ * @return itself
+ */
+ def setNumIterations(numIterations: Int): KMeans = {
+ parameters.add(NumIterations, numIterations)
+ this
+ }
+
+ /**
+ * Sets the initial centroids on which the algorithm will start
computing.
+ * These points should depend on the data and significantly influence
the resulting centroids.
+ *
+ * @param initialCentroids A sequence of labeled vectors.
+ * @return itself
+ */
+ def setInitialCentroids(initialCentroids: DataSet[LabeledVector]):
KMeans = {
+ parameters.add(InitialCentroids, initialCentroids)
+ this
+ }
+
+}
+
+/**
+ * Companion object of KMeans. Contains convenience functions, the
parameter type definitions
+ * of the algorithm and the [[FitOperation]] & [[PredictOperation]].
+ */
+object KMeans {
+ val CENTROIDS = "centroids"
+
+ case object NumIterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object InitialCentroids extends Parameter[DataSet[LabeledVector]] {
+ val defaultValue = None
+ }
+
+ // ========================================== Factory methods
====================================
+
+ def apply(): KMeans = {
+ new KMeans()
+ }
+
+ // ========================================== Operations
=========================================
+
+ /**
+ * [[PredictOperation]] for vector types. The result type is a
[[LabeledVector]].
+ */
+ implicit def predictValues = {
+ new PredictOperation[KMeans, Vector, LabeledVector] {
+ override def predict(
+ instance: KMeans,
+ predictParameters: ParameterMap,
+ input: DataSet[Vector])
+ : DataSet[LabeledVector] = {
+
+ instance.centroids match {
+ case Some(centroids) => {
+ input.map(new
SelectNearestCenterMapper).withBroadcastSet(centroids, CENTROIDS)
+ }
+
+ case None => {
+ throw new RuntimeException("The KMeans model has not been
trained. Call first fit" +
+ "before calling the predict operation.")
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * [[FitOperation]] which iteratively computes centroids that match the
given input DataSet by
+ * adjusting the given initial centroids.
+ */
+ implicit def fitKMeans = {
+ new FitOperation[KMeans, Vector] {
+ override def fit(
+ instance: KMeans,
+ fitParameters: ParameterMap,
+ input: DataSet[Vector])
+ : Unit = {
+ val resultingParameters = instance.parameters ++ fitParameters
+
+ val centroids: DataSet[LabeledVector] =
resultingParameters.get(InitialCentroids).get
+ val numIterations: Int = resultingParameters.get(NumIterations).get
+
+ val finalCentroids = centroids.iterate(numIterations) {
currentCentroids =>
+ val newCentroids: DataSet[LabeledVector] = input
+ .map(new
SelectNearestCenterMapper).withBroadcastSet(currentCentroids, CENTROIDS)
+ .map(x => (x.label, x.vector,
1.0)).withForwardedFields("label->_1; vector->_2")
+ .groupBy(x => x._1)
+ .reduce((p1, p2) => (p1._1,(p1._2.asBreeze +
p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
+ .withForwardedFields("_1")
+ .map(x => LabeledVector(x._1, (x._2.asBreeze :/
x._3).fromBreeze))
--- End diff --
Can use flink.ml.BLAS.scal(1.0/count, vector) here
> Add kMeans clustering algorithm to machine learning library
> -----------------------------------------------------------
>
> Key: FLINK-1731
> URL: https://issues.apache.org/jira/browse/FLINK-1731
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Till Rohrmann
> Assignee: Peter Schrott
> Labels: ML
>
> The Flink repository already contains a kMeans implementation but it is not
> yet ported to the machine learning library. I assume that only the used data
> types have to be adapted and then it can be more or less directly moved to
> flink-ml.
> The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better
> implementation because the improve the initial seeding phase to achieve near
> optimal clustering. It might be worthwhile to implement kMeans||.
> Resources:
> [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
> [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)