Repository: spark
Updated Branches:
  refs/heads/master 3e5b4ae63 -> a99d284c1


[SPARK-19826][ML][PYTHON] add spark.ml Python API for PIC

## What changes were proposed in this pull request?

add spark.ml Python API for PIC

## How was this patch tested?

add doctest

Author: Huaxin Gao <huax...@us.ibm.com>

Closes #21513 from huaxingao/spark--19826.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a99d284c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a99d284c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a99d284c

Branch: refs/heads/master
Commit: a99d284c16cc4e00ce7c83ecdc3db6facd467552
Parents: 3e5b4ae
Author: Huaxin Gao <huax...@us.ibm.com>
Authored: Mon Jun 11 12:15:14 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Mon Jun 11 12:15:14 2018 -0700

----------------------------------------------------------------------
 python/pyspark/ml/clustering.py | 184 ++++++++++++++++++++++++++++++++++-
 1 file changed, 179 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a99d284c/python/pyspark/ml/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index b3d5fb1..4aa1cf8 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -19,14 +19,15 @@ import sys
 
 from pyspark import since, keyword_only
 from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
+from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, 
JavaWrapper
 from pyspark.ml.param.shared import *
 from pyspark.ml.common import inherit_doc
+from pyspark.sql import DataFrame
 
 __all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
            'KMeans', 'KMeansModel',
            'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
-           'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
+           'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 
'PowerIterationClustering']
 
 
 class ClusteringSummary(JavaWrapper):
@@ -836,7 +837,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, 
HasSeed, HasCheckpointInter
 
     Terminology:
 
-     - "term" = "word": an el
+     - "term" = "word": an element of the vocabulary
      - "token": instance of a term appearing in a document
      - "topic": multinomial distribution over terms representing some concept
      - "document": one piece of text, corresponding to one row in the input 
data
@@ -938,7 +939,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, 
HasSeed, HasCheckpointInter
                   k=10, optimizer="online", learningOffset=1024.0, 
learningDecay=0.51,\
                   subsamplingRate=0.05, optimizeDocConcentration=True,\
                   docConcentration=None, topicConcentration=None,\
-                  topicDistributionCol="topicDistribution", 
keepLastCheckpoint=True):
+                  topicDistributionCol="topicDistribution", 
keepLastCheckpoint=True)
         """
         super(LDA, self).__init__()
         self._java_obj = 
self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
@@ -967,7 +968,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, 
HasSeed, HasCheckpointInter
                   k=10, optimizer="online", learningOffset=1024.0, 
learningDecay=0.51,\
                   subsamplingRate=0.05, optimizeDocConcentration=True,\
                   docConcentration=None, topicConcentration=None,\
-                  topicDistributionCol="topicDistribution", 
keepLastCheckpoint=True):
+                  topicDistributionCol="topicDistribution", 
keepLastCheckpoint=True)
 
         Sets params for LDA.
         """
@@ -1156,6 +1157,179 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, 
HasSeed, HasCheckpointInter
         return self.getOrDefault(self.keepLastCheckpoint)
 
 
+@inherit_doc
+class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, 
JavaMLReadable,
+                               JavaMLWritable):
+    """
+    .. note:: Experimental
+
+    Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+    <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the 
abstract:
+    PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+    iteration on a normalized pair-wise similarity matrix of the data.
+
+    This class is not yet an Estimator/Transformer, use 
:py:func:`assignClusters` method
+    to run the PowerIterationClustering algorithm.
+
+    .. seealso:: `Wikipedia on Spectral clustering \
+    <http://en.wikipedia.org/wiki/Spectral_clustering>`_
+
+   >>> data = [(1, 0, 0.5), \
+               (2, 0, 0.5), (2, 1, 0.7), \
+               (3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), \
+               (4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), \
+               (5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)]
+    >>> df = spark.createDataFrame(data).toDF("src", "dst", "weight")
+    >>> pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight")
+    >>> assignments = pic.assignClusters(df)
+    >>> assignments.sort(assignments.id).show(truncate=False)
+    +---+-------+
+    |id |cluster|
+    +---+-------+
+    |0  |1      |
+    |1  |1      |
+    |2  |1      |
+    |3  |1      |
+    |4  |1      |
+    |5  |0      |
+    +---+-------+
+    ...
+    >>> pic_path = temp_path + "/pic"
+    >>> pic.save(pic_path)
+    >>> pic2 = PowerIterationClustering.load(pic_path)
+    >>> pic2.getK()
+    2
+    >>> pic2.getMaxIter()
+    40
+
+    .. versionadded:: 2.4.0
+    """
+
+    k = Param(Params._dummy(), "k",
+              "The number of clusters to create. Must be > 1.",
+              typeConverter=TypeConverters.toInt)
+    initMode = Param(Params._dummy(), "initMode",
+                     "The initialization algorithm. This can be either " +
+                     "'random' to use a random vector as vertex properties, or 
'degree' to use " +
+                     "a normalized sum of similarities with other vertices.  
Supported options: " +
+                     "'random' and 'degree'.",
+                     typeConverter=TypeConverters.toString)
+    srcCol = Param(Params._dummy(), "srcCol",
+                   "Name of the input column for source vertex IDs.",
+                   typeConverter=TypeConverters.toString)
+    dstCol = Param(Params._dummy(), "dstCol",
+                   "Name of the input column for destination vertex IDs.",
+                   typeConverter=TypeConverters.toString)
+
+    @keyword_only
+    def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",
+                 weightCol=None):
+        """
+        __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",\
+                 weightCol=None)
+        """
+        super(PowerIterationClustering, self).__init__()
+        self._java_obj = self._new_java_obj(
+            "org.apache.spark.ml.clustering.PowerIterationClustering", 
self.uid)
+        self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst")
+        kwargs = self._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    @since("2.4.0")
+    def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",
+                  weightCol=None):
+        """
+        setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",\
+                  weightCol=None)
+        Sets params for PowerIterationClustering.
+        """
+        kwargs = self._input_kwargs
+        return self._set(**kwargs)
+
+    @since("2.4.0")
+    def setK(self, value):
+        """
+        Sets the value of :py:attr:`k`.
+        """
+        return self._set(k=value)
+
+    @since("2.4.0")
+    def getK(self):
+        """
+        Gets the value of :py:attr:`k` or its default value.
+        """
+        return self.getOrDefault(self.k)
+
+    @since("2.4.0")
+    def setInitMode(self, value):
+        """
+        Sets the value of :py:attr:`initMode`.
+        """
+        return self._set(initMode=value)
+
+    @since("2.4.0")
+    def getInitMode(self):
+        """
+        Gets the value of :py:attr:`initMode` or its default value.
+        """
+        return self.getOrDefault(self.initMode)
+
+    @since("2.4.0")
+    def setSrcCol(self, value):
+        """
+        Sets the value of :py:attr:`srcCol`.
+        """
+        return self._set(srcCol=value)
+
+    @since("2.4.0")
+    def getSrcCol(self):
+        """
+        Gets the value of :py:attr:`srcCol` or its default value.
+        """
+        return self.getOrDefault(self.srcCol)
+
+    @since("2.4.0")
+    def setDstCol(self, value):
+        """
+        Sets the value of :py:attr:`dstCol`.
+        """
+        return self._set(dstCol=value)
+
+    @since("2.4.0")
+    def getDstCol(self):
+        """
+        Gets the value of :py:attr:`dstCol` or its default value.
+        """
+        return self.getOrDefault(self.dstCol)
+
+    @since("2.4.0")
+    def assignClusters(self, dataset):
+        """
+        Run the PIC algorithm and returns a cluster assignment for each input 
vertex.
+
+        :param dataset:
+          A dataset with columns src, dst, weight representing the affinity 
matrix,
+          which is the matrix A in the PIC paper. Suppose the src column value 
is i,
+          the dst column value is j, the weight column value is similarity 
s,,ij,,
+          which must be nonnegative. This is a symmetric matrix and hence
+          s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there 
should be
+          either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i 
= j are
+          ignored, because we assume s,,ij,, = 0.0.
+
+        :return:
+          A dataset that contains columns of vertex id and the corresponding 
cluster for
+          the id. The schema of it will be:
+          - id: Long
+          - cluster: Int
+
+        .. versionadded:: 2.4.0
+        """
+        self._transfer_params_to_java()
+        jdf = self._java_obj.assignClusters(dataset._jdf)
+        return DataFrame(jdf, dataset.sql_ctx)
+
+
 if __name__ == "__main__":
     import doctest
     import pyspark.ml.clustering


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to