[
https://issues.apache.org/jira/browse/FLINK-2131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15537374#comment-15537374
]
ASF GitHub Bot commented on FLINK-2131:
---------------------------------------
Github user skonto commented on a diff in the pull request:
https://github.com/apache/flink/pull/757#discussion_r81432816
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
---
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml._
+import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BLAS, Vector}
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least
within-cluster sum of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean
distance, this is intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[Vector] =
env.fromCollection(Clustering.trainingData)
+ * val initialCentroids: DataSet[LabledVector] =
env.fromCollection(Clustering.initCentroids)
+ *
+ * val kmeans = KMeans()
+ * .setInitialCentroids(initialCentroids)
+ * .setNumIterations(10)
+ *
+ * kmeans.fit(trainingDS)
+ *
+ * // getting the computed centroids
+ * val centroidsResult = kmeans.centroids.get.collect()
+ *
+ * // get matching clusters for new points
+ * val testDS: DataSet[Vector] =
env.fromCollection(Clustering.testData)
+ * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated
as often as set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of
the initial centroids
+ * mainly affects the outcome of the algorithm.
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialStrategy]]:
+ * Defines the initialization strategy to be used for initializing the
KMeans algorithm in case
+ * the initial centroids are not provided. Allowed values are "random",
"kmeans++" and "kmeans||".
+ * (Default Value: '''random''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumClusters]]:
+ * Defines the number of clusters required. This is essential to provide
when only the
+ * initialization strategy is specified, not the initial centroids
themselves.
+ * (Default Value: '''0''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.OversamplingFactor]]:
+ * Defines the oversampling rate for the kmeans|| initialization.
+ * (Default Value: '''2k'''), where k is the number of clusters.
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.KMeansParRounds]]:
+ * Defines the number of rounds for the kmeans|| initialization.
+ * (Default Value: '''5''')
+ *
+ */
+class KMeans extends Predictor[KMeans] {
+
+ import KMeans._
+
+ /**
+ * Stores the learned clusters after the fit operation
+ */
+ var centroids: Option[DataSet[Seq[LabeledVector]]] = None
+
+ /**
+ * Sets the maximum number of iterations.
+ *
+ * @param numIterations The maximum number of iterations.
+ * @return itself
+ */
+ def setNumIterations(numIterations: Int): KMeans = {
+ parameters.add(NumIterations, numIterations)
+ this
+ }
+
+ /**
+ * Sets the number of clusters.
+ *
+ * @param numClusters The number of clusters
+ * @return itself
+ */
+ def setNumClusters(numClusters: Int): KMeans = {
+ parameters.add(NumClusters, numClusters)
+ this
+ }
+
+ /**
+ * Sets the initial centroids on which the algorithm will start
computing. These points should
+ * depend on the data and will significantly influence the resulting
centroids.
+ * Note that this setting will override [[setInitializationStrategy())]]
and the size of
+ * initialCentroids will override the value, if set, by
[[setNumClusters()]]
+ *
+ * @param initialCentroids A set of labeled vectors.
+ * @return itself
+ */
+ def setInitialCentroids(initialCentroids: Seq[LabeledVector]): KMeans = {
+ parameters.add(InitialCentroids, initialCentroids)
+ this
+ }
+
+ /**
+ * Automatically initialize the KMeans algorithm. Allowed options are
"random", "kmeans++" and
+ * "kmeans||"
+ *
+ * @param initialStrategy
+ * @return itself
+ */
+ def setInitializationStrategy(initialStrategy: String): KMeans = {
+ require(Array("random", "kmeans++",
"kmeans||").contains(initialStrategy), s"$initialStrategy" +
+ s" is not supported")
+ parameters.add(InitialStrategy, initialStrategy)
+ this
+ }
+
+ /**
+ * Oversampling factor to be used in case the initialization strategy is
set to be "kmeans||"
+ *
+ * @param oversamplingFactor Oversampling factor(\ell)
+ * @return this
+ */
+ def setOversamplingFactor(oversamplingFactor: Double): KMeans = {
+ require(oversamplingFactor > 0, "Oversampling factor must be
positive.")
+ parameters.add(OversamplingFactor, oversamplingFactor)
+ this
+ }
+
+ /**
+ * Number of initialization rounds to be done when the initialization
strategy is set to be
+ * "kmeans||"
+ *
+ * @param numRounds Number of rounds(r)
+ * @return this
+ */
+ def setNumRounds(numRounds: Int): KMeans = {
+ require(numRounds > 0, "Number of rounds must be positive")
+ parameters.add(KMeansParRounds, numRounds)
+ this
+ }
+
+}
+
+/**
+ * Companion object of KMeans. Contains convenience functions, the
parameter type definitions
+ * of the algorithm and the [[FitOperation]] & [[PredictOperation]].
+ */
+object KMeans {
+
+ private val RANDOM_FRACTION = "random_sample_fraction"
+ private val PARINIT_SET = "par_init_solution_set"
+ private val PARINIT_COST = "par_init_solution_cost"
+ private val PARINIT_SAMPLE = "par_init_oversample_factor"
+
+ /** Euclidean Distance Metric */
+ val euclidean = EuclideanDistanceMetric()
+
+ case object NumIterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object InitialCentroids extends Parameter[Seq[LabeledVector]] {
+ val defaultValue = None
+ }
+
+ case object InitialStrategy extends Parameter[String]{
+ val defaultValue = Some("kmeans||")
--- End diff --
"kmeans||" seems awkward I would expect a something like KMeansParallel...
at least in the code if not in the description...
> Add Initialization schemes for K-means clustering
> -------------------------------------------------
>
> Key: FLINK-2131
> URL: https://issues.apache.org/jira/browse/FLINK-2131
> Project: Flink
> Issue Type: Task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
>
> The Lloyd's [KMeans] algorithm takes initial centroids as its input. However,
> in case the user doesn't provide the initial centers, they may ask for a
> particular initialization scheme to be followed. The most commonly used are
> these:
> 1. Random initialization: Self-explanatory
> 2. kmeans++ initialization: http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
> 3. kmeans|| : http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf
> For very large data sets, or for large values of k, the kmeans|| method is
> preferred as it provides the same approximation guarantees as kmeans++ and
> requires lesser number of passes over the input data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)