Repository: spark
Updated Branches:
refs/heads/master cf842d42a -> 94761485b
[SPARK-6258] [MLLIB] GaussianMixture Python API parity check
Implement Python API for major disparities of GaussianMixture cluster algorithm
between Scala & Python
```scala
GaussianMixture
setInitialModel
GaussianMixtureModel
k
```
Author: Yanbo Liang <[email protected]>
Closes #6087 from yanboliang/spark-6258 and squashes the following commits:
b3af21c [Yanbo Liang] fix typo
2b645c1 [Yanbo Liang] fix doc
638b4b7 [Yanbo Liang] address comments
b5bcade [Yanbo Liang] GaussianMixture Python API parity check
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94761485
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94761485
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94761485
Branch: refs/heads/master
Commit: 94761485b207fa1f12a8410a68920300d851bf61
Parents: cf842d4
Author: Yanbo Liang <[email protected]>
Authored: Fri May 15 00:18:39 2015 -0700
Committer: Joseph K. Bradley <[email protected]>
Committed: Fri May 15 00:18:39 2015 -0700
----------------------------------------------------------------------
.../spark/mllib/api/python/PythonMLLibAPI.scala | 24 +++++--
.../mllib/clustering/GaussianMixtureModel.scala | 9 ++-
python/pyspark/mllib/clustering.py | 67 ++++++++++++++++----
3 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/94761485/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f4c4775..2fa54df 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable
{
* Returns a list containing weights, mean and covariance of each mixture
component.
*/
def trainGaussianMixture(
- data: JavaRDD[Vector],
- k: Int,
- convergenceTol: Double,
+ data: JavaRDD[Vector],
+ k: Int,
+ convergenceTol: Double,
maxIterations: Int,
- seed: java.lang.Long): JList[Object] = {
+ seed: java.lang.Long,
+ initialModelWeights: java.util.ArrayList[Double],
+ initialModelMu: java.util.ArrayList[Vector],
+ initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)
+ if (initialModelWeights != null && initialModelMu != null &&
initialModelSigma != null) {
+ val gaussians =
initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
+ case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector],
y.asInstanceOf[Matrix])
+ }
+ val initialModel = new GaussianMixtureModel(
+ initialModelWeights.asScala.toArray, gaussians.toArray)
+ gmmAlg.setInitialModel(initialModel)
+ }
+
if (seed != null) gmmAlg.setSeed(seed)
try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
var wt = ArrayBuffer.empty[Double]
- var mu = ArrayBuffer.empty[Vector]
+ var mu = ArrayBuffer.empty[Vector]
var sigma = ArrayBuffer.empty[Matrix]
for (i <- 0 until model.k) {
wt += model.weights(i)
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
- }
+ }
List(Vectors.dense(wt.toArray), mu.toArray,
sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
http://git-wip-us.apache.org/repos/asf/spark/blob/94761485/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
----------------------------------------------------------------------
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index ec65a3d..c22862c 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and
sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
- * @param weight Weights for each Gaussian distribution in the mixture, where
weight(i) is
- * the weight for Gaussian i, and weight.sum == 1
- * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean
for Gaussian i
- * @param sigma Covariance maxtrix for each Gaussian in the mixture, where
sigma(i) is the
- * covariance matrix for Gaussian i
+ * @param weights Weights for each Gaussian distribution in the mixture, where
weights(i) is
+ * the weight for Gaussian i, and weights.sum == 1
+ * @param gaussians Array of MultivariateGaussian where gaussians(i) represents
+ * the Multivariate Gaussian (Normal) Distribution for
Gaussian i
*/
@Experimental
class GaussianMixtureModel(
http://git-wip-us.apache.org/repos/asf/spark/blob/94761485/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py
b/python/pyspark/mllib/clustering.py
index 04e6715..a53333d 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -142,6 +142,7 @@ class GaussianMixtureModel(object):
"""A clustering model derived from the Gaussian Mixture Model method.
+ >>> from pyspark.mllib.linalg import Vectors, DenseMatrix
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
... 0.9,0.8,0.75,0.935,
... -0.83,-0.68,-0.91,-0.76
]).reshape(6, 2))
@@ -154,11 +155,12 @@ class GaussianMixtureModel(object):
True
>>> labels[4]==labels[5]
True
- >>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220,
- ... -5.2211, -5.0602, 4.7118,
- ... 6.8989, 3.4592, 4.6322,
- ... 5.7048, 4.6567, 5.5026,
- ... 4.5605, 5.2043,
6.2734]).reshape(5, 3))
+ >>> data = array([-5.1971, -2.5359, -3.8220,
+ ... -5.2211, -5.0602, 4.7118,
+ ... 6.8989, 3.4592, 4.6322,
+ ... 5.7048, 4.6567, 5.5026,
+ ... 4.5605, 5.2043, 6.2734])
+ >>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
@@ -166,12 +168,38 @@ class GaussianMixtureModel(object):
True
>>> labels[3]==labels[4]
True
+ >>> clusterdata_3 = sc.parallelize(data.reshape(15, 1))
+ >>> im = GaussianMixtureModel([0.5, 0.5],
+ ... [MultivariateGaussian(Vectors.dense([-1.0]), DenseMatrix(1, 1,
[1.0])),
+ ... MultivariateGaussian(Vectors.dense([1.0]), DenseMatrix(1, 1,
[1.0]))])
+ >>> model = GaussianMixture.train(clusterdata_3, 2, initialModel=im)
"""
def __init__(self, weights, gaussians):
- self.weights = weights
- self.gaussians = gaussians
- self.k = len(self.weights)
+ self._weights = weights
+ self._gaussians = gaussians
+ self._k = len(self._weights)
+
+ @property
+ def weights(self):
+ """
+ Weights for each Gaussian distribution in the mixture, where
weights[i] is
+ the weight for Gaussian i, and weights.sum == 1.
+ """
+ return self._weights
+
+ @property
+ def gaussians(self):
+ """
+ Array of MultivariateGaussian where gaussians[i] represents
+ the Multivariate Gaussian (Normal) Distribution for Gaussian i.
+ """
+ return self._gaussians
+
+ @property
+ def k(self):
+ """Number of gaussians in mixture."""
+ return self._k
def predict(self, x):
"""
@@ -193,9 +221,9 @@ class GaussianMixtureModel(object):
:return: membership_matrix. RDD of array of double values.
"""
if isinstance(x, RDD):
- means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
+ means, sigmas = zip(*[(g.mu, g.sigma) for g in self._gaussians])
membership_matrix = callMLlibFunc("predictSoftGMM",
x.map(_convert_to_vector),
-
_convert_to_vector(self.weights), means, sigmas)
+
_convert_to_vector(self._weights), means, sigmas)
return membership_matrix.map(lambda x: pyarray.array('d', x))
@@ -208,13 +236,24 @@ class GaussianMixture(object):
:param convergenceTol: Threshold value to check the convergence criteria.
Defaults to 1e-3
:param maxIterations: Number of iterations. Default to 100
:param seed: Random Seed
+ :param initialModel: GaussianMixtureModel for initializing learning
"""
@classmethod
- def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
+ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None,
initialModel=None):
"""Train a Gaussian Mixture clustering model."""
- weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
- rdd.map(_convert_to_vector), k,
- convergenceTol, maxIterations, seed)
+ initialModelWeights = None
+ initialModelMu = None
+ initialModelSigma = None
+ if initialModel is not None:
+ if initialModel.k != k:
+ raise Exception("Mismatched cluster count, initialModel.k =
%s, however k = %s"
+ % (initialModel.k, k))
+ initialModelWeights = initialModel.weights
+ initialModelMu = [initialModel.gaussians[i].mu for i in
range(initialModel.k)]
+ initialModelSigma = [initialModel.gaussians[i].sigma for i in
range(initialModel.k)]
+ weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
rdd.map(_convert_to_vector), k,
+ convergenceTol, maxIterations, seed,
initialModelWeights,
+ initialModelMu, initialModelSigma)
mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
return GaussianMixtureModel(weight, mvg_obj)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]