Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0ba99f061 -> c0bb974a4


[SPARK-6258] [MLLIB] GaussianMixture Python API parity check

Implement Python API for major disparities of GaussianMixture cluster algorithm 
between Scala & Python
```scala
GaussianMixture
    setInitialModel
GaussianMixtureModel
    k
```

Author: Yanbo Liang <[email protected]>

Closes #6087 from yanboliang/spark-6258 and squashes the following commits:

b3af21c [Yanbo Liang] fix typo
2b645c1 [Yanbo Liang] fix doc
638b4b7 [Yanbo Liang] address comments
b5bcade [Yanbo Liang] GaussianMixture Python API parity check

(cherry picked from commit 94761485b207fa1f12a8410a68920300d851bf61)
Signed-off-by: Joseph K. Bradley <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0bb974a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0bb974a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0bb974a

Branch: refs/heads/branch-1.4
Commit: c0bb974a462432f1b8ff730605e6dc57d76a14e9
Parents: 0ba99f0
Author: Yanbo Liang <[email protected]>
Authored: Fri May 15 00:18:39 2015 -0700
Committer: Joseph K. Bradley <[email protected]>
Committed: Fri May 15 00:19:20 2015 -0700

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 24 +++++--
 .../mllib/clustering/GaussianMixtureModel.scala |  9 ++-
 python/pyspark/mllib/clustering.py              | 67 ++++++++++++++++----
 3 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0bb974a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f4c4775..2fa54df 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable 
{
    * Returns a list containing weights, mean and covariance of each mixture 
component.
    */
   def trainGaussianMixture(
-      data: JavaRDD[Vector], 
-      k: Int, 
-      convergenceTol: Double, 
+      data: JavaRDD[Vector],
+      k: Int,
+      convergenceTol: Double,
       maxIterations: Int,
-      seed: java.lang.Long): JList[Object] = {
+      seed: java.lang.Long,
+      initialModelWeights: java.util.ArrayList[Double],
+      initialModelMu: java.util.ArrayList[Vector],
+      initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
     val gmmAlg = new GaussianMixture()
       .setK(k)
       .setConvergenceTol(convergenceTol)
       .setMaxIterations(maxIterations)
 
+    if (initialModelWeights != null && initialModelMu != null && 
initialModelSigma != null) {
+      val gaussians = 
initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
+        case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], 
y.asInstanceOf[Matrix])
+      }
+      val initialModel = new GaussianMixtureModel(
+        initialModelWeights.asScala.toArray, gaussians.toArray)
+      gmmAlg.setInitialModel(initialModel)
+    }
+
     if (seed != null) gmmAlg.setSeed(seed)
 
     try {
       val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
       var wt = ArrayBuffer.empty[Double]
-      var mu = ArrayBuffer.empty[Vector]      
+      var mu = ArrayBuffer.empty[Vector]
       var sigma = ArrayBuffer.empty[Matrix]
       for (i <- 0 until model.k) {
           wt += model.weights(i)
           mu += model.gaussians(i).mu
           sigma += model.gaussians(i).sigma
-      }    
+      }
       List(Vectors.dense(wt.toArray), mu.toArray, 
sigma.toArray).map(_.asInstanceOf[Object]).asJava
     } finally {
       data.rdd.unpersist(blocking = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/c0bb974a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index ec65a3d..c22862c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
  * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and 
sigma(i) are 
  * the respective mean and covariance for each Gaussian distribution i=1..k. 
  * 
- * @param weight Weights for each Gaussian distribution in the mixture, where 
weight(i) is
- *               the weight for Gaussian i, and weight.sum == 1
- * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean 
for Gaussian i
- * @param sigma Covariance maxtrix for each Gaussian in the mixture, where 
sigma(i) is the
- *              covariance matrix for Gaussian i
+ * @param weights Weights for each Gaussian distribution in the mixture, where 
weights(i) is
+ *                the weight for Gaussian i, and weights.sum == 1
+ * @param gaussians Array of MultivariateGaussian where gaussians(i) represents
+ *                  the Multivariate Gaussian (Normal) Distribution for 
Gaussian i
  */
 @Experimental
 class GaussianMixtureModel(

http://git-wip-us.apache.org/repos/asf/spark/blob/c0bb974a/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index 04e6715..a53333d 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -142,6 +142,7 @@ class GaussianMixtureModel(object):
 
     """A clustering model derived from the Gaussian Mixture Model method.
 
+    >>> from pyspark.mllib.linalg import Vectors, DenseMatrix
     >>> clusterdata_1 =  sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
     ...                                         0.9,0.8,0.75,0.935,
     ...                                        -0.83,-0.68,-0.91,-0.76 
]).reshape(6, 2))
@@ -154,11 +155,12 @@ class GaussianMixtureModel(object):
     True
     >>> labels[4]==labels[5]
     True
-    >>> clusterdata_2 =  sc.parallelize(array([-5.1971, -2.5359, -3.8220,
-    ...                                        -5.2211, -5.0602,  4.7118,
-    ...                                         6.8989, 3.4592,  4.6322,
-    ...                                         5.7048,  4.6567, 5.5026,
-    ...                                         4.5605,  5.2043,  
6.2734]).reshape(5, 3))
+    >>> data =  array([-5.1971, -2.5359, -3.8220,
+    ...                -5.2211, -5.0602,  4.7118,
+    ...                 6.8989, 3.4592,  4.6322,
+    ...                 5.7048,  4.6567, 5.5026,
+    ...                 4.5605,  5.2043,  6.2734])
+    >>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
     >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
     ...                               maxIterations=150, seed=10)
     >>> labels = model.predict(clusterdata_2).collect()
@@ -166,12 +168,38 @@ class GaussianMixtureModel(object):
     True
     >>> labels[3]==labels[4]
     True
+    >>> clusterdata_3 = sc.parallelize(data.reshape(15, 1))
+    >>> im = GaussianMixtureModel([0.5, 0.5],
+    ...      [MultivariateGaussian(Vectors.dense([-1.0]), DenseMatrix(1, 1, 
[1.0])),
+    ...      MultivariateGaussian(Vectors.dense([1.0]), DenseMatrix(1, 1, 
[1.0]))])
+    >>> model = GaussianMixture.train(clusterdata_3, 2, initialModel=im)
     """
 
     def __init__(self, weights, gaussians):
-        self.weights = weights
-        self.gaussians = gaussians
-        self.k = len(self.weights)
+        self._weights = weights
+        self._gaussians = gaussians
+        self._k = len(self._weights)
+
+    @property
+    def weights(self):
+        """
+        Weights for each Gaussian distribution in the mixture, where 
weights[i] is
+        the weight for Gaussian i, and weights.sum == 1.
+        """
+        return self._weights
+
+    @property
+    def gaussians(self):
+        """
+        Array of MultivariateGaussian where gaussians[i] represents
+        the Multivariate Gaussian (Normal) Distribution for Gaussian i.
+        """
+        return self._gaussians
+
+    @property
+    def k(self):
+        """Number of gaussians in mixture."""
+        return self._k
 
     def predict(self, x):
         """
@@ -193,9 +221,9 @@ class GaussianMixtureModel(object):
         :return:     membership_matrix. RDD of array of double values.
         """
         if isinstance(x, RDD):
-            means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
+            means, sigmas = zip(*[(g.mu, g.sigma) for g in self._gaussians])
             membership_matrix = callMLlibFunc("predictSoftGMM", 
x.map(_convert_to_vector),
-                                              
_convert_to_vector(self.weights), means, sigmas)
+                                              
_convert_to_vector(self._weights), means, sigmas)
             return membership_matrix.map(lambda x: pyarray.array('d', x))
 
 
@@ -208,13 +236,24 @@ class GaussianMixture(object):
     :param convergenceTol:  Threshold value to check the convergence criteria. 
Defaults to 1e-3
     :param maxIterations:   Number of iterations. Default to 100
     :param seed:            Random Seed
+    :param initialModel:    GaussianMixtureModel for initializing learning
     """
     @classmethod
-    def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
+    def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, 
initialModel=None):
         """Train a Gaussian Mixture clustering model."""
-        weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
-                                          rdd.map(_convert_to_vector), k,
-                                          convergenceTol, maxIterations, seed)
+        initialModelWeights = None
+        initialModelMu = None
+        initialModelSigma = None
+        if initialModel is not None:
+            if initialModel.k != k:
+                raise Exception("Mismatched cluster count, initialModel.k = 
%s, however k = %s"
+                                % (initialModel.k, k))
+            initialModelWeights = initialModel.weights
+            initialModelMu = [initialModel.gaussians[i].mu for i in 
range(initialModel.k)]
+            initialModelSigma = [initialModel.gaussians[i].sigma for i in 
range(initialModel.k)]
+        weight, mu, sigma = callMLlibFunc("trainGaussianMixture", 
rdd.map(_convert_to_vector), k,
+                                          convergenceTol, maxIterations, seed, 
initialModelWeights,
+                                          initialModelMu, initialModelSigma)
         mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
         return GaussianMixtureModel(weight, mvg_obj)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to