Repository: spark
Updated Branches:
  refs/heads/master 46c63417c -> fae095bc7


[SPARK-3961] [MLlib] [PySpark] Python API for mllib.feature

Added completed Python API for MLlib.feature

Normalizer
StandardScalerModel
StandardScaler
HashTF
IDFModel
IDF

cc mengxr

Author: Davies Liu <[email protected]>
Author: Davies Liu <[email protected]>

Closes #2819 from davies/feature and squashes the following commits:

4f48f48 [Davies Liu] add a note for HashingTF
67f6d21 [Davies Liu] address comments
b628693 [Davies Liu] rollback changes in Word2Vec
efb4f4f [Davies Liu] Merge branch 'master' into feature
806c7c2 [Davies Liu] address comments
3abb8c2 [Davies Liu] address comments
59781b9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
feature
a405ae7 [Davies Liu] fix tests
7a1891a [Davies Liu] fix tests
486795f [Davies Liu] update programming guide, HashTF -> HashingTF
8a50584 [Davies Liu] Python API for mllib.feature


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fae095bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fae095bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fae095bc

Branch: refs/heads/master
Commit: fae095bc7c4097859af522ced77f09cf6be17691
Parents: 46c6341
Author: Davies Liu <[email protected]>
Authored: Tue Oct 28 03:50:22 2014 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Tue Oct 28 03:50:22 2014 -0700

----------------------------------------------------------------------
 docs/mllib-feature-extraction.md                |  85 ++++
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  49 ++-
 .../spark/mllib/feature/VectorTransformer.scala |  11 +
 .../apache/spark/mllib/feature/Word2Vec.scala   |   4 +-
 python/pyspark/mllib/feature.py                 | 395 ++++++++++++++++---
 python/pyspark/mllib/linalg.py                  |  16 +-
 6 files changed, 499 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/docs/mllib-feature-extraction.md
----------------------------------------------------------------------
diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md
index 1162241..886d71d 100644
--- a/docs/mllib-feature-extraction.md
+++ b/docs/mllib-feature-extraction.md
@@ -95,8 +95,49 @@ tf.cache()
 val idf = new IDF(minDocFreq = 2).fit(tf)
 val tfidf: RDD[Vector] = idf.transform(tf)
 {% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+
+TF and IDF are implemented in 
[HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF)
+and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF).
+`HashingTF` takes an RDD of list as the input.
+Each record could be an iterable of strings or other types.
+
+{% highlight python %}
+from pyspark import SparkContext
+from pyspark.mllib.feature import HashingTF
+
+sc = SparkContext()
 
+# Load documents (one per line).
+documents = sc.textFile("...").map(lambda line: line.split(" "))
+
+hashingTF = HashingTF()
+tf = hashingTF.transform(documents)
+{% endhighlight %}
+
+While applying `HashingTF` only needs a single pass to the data, applying 
`IDF` needs two passes: 
+first to compute the IDF vector and second to scale the term frequencies by 
IDF.
+
+{% highlight python %}
+from pyspark.mllib.feature import IDF
+
+# ... continue from the previous example
+tf.cache()
+idf = IDF().fit(tf)
+tfidf = idf.transform(tf)
+{% endhighlight %}
+
+MLLib's IDF implementation provides an option for ignoring terms which occur 
in less than a
+minimum number of documents.  In such cases, the IDF for these terms is set to 
0.  This feature
+can be used by passing the `minDocFreq` value to the IDF constructor.
 
+{% highlight python %}
+# ... continue from the previous example
+tf.cache()
+idf = IDF(minDocFreq=2).fit(tf)
+tfidf = idf.transform(tf)
+{% endhighlight %}
 </div>
 </div>
 
@@ -223,6 +264,29 @@ val data1 = data.map(x => (x.label, 
scaler1.transform(x.features)))
 val data2 = data.map(x => (x.label, 
scaler2.transform(Vectors.dense(x.features.toArray))))
 {% endhighlight %}
 </div>
+
+<div data-lang="python">
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.feature import StandardScaler
+
+data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
+label = data.map(lambda x: x.label)
+features = data.map(lambda x: x.features)
+
+scaler1 = StandardScaler().fit(features)
+scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
+
+# data1 will be unit variance.
+data1 = label.zip(scaler1.transform(features))
+
+# Without converting the features into dense vectors, transformation with zero 
mean will raise
+# exception on sparse vector.
+# data2 will be unit variance and zero mean.
+data2 = label.zip(scaler1.transform(features.map(lambda x: 
Vectors.dense(x.toArray()))))
+{% endhighlight %}
+</div>
 </div>
 
 ## Normalizer
@@ -267,4 +331,25 @@ val data1 = data.map(x => (x.label, 
normalizer1.transform(x.features)))
 val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
 {% endhighlight %}
 </div>
+
+<div data-lang="python">
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.feature import Normalizer
+
+data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
+labels = data.map(lambda x: x.label)
+features = data.map(lambda x: x.features)
+
+normalizer1 = Normalizer()
+normalizer2 = Normalizer(p=float("inf"))
+
+# Each sample in data1 will be normalized using $L^2$ norm.
+data1 = labels.zip(normalizer1.transform(features))
+
+# Each sample in data2 will be normalized using $L^\infty$ norm.
+data2 = labels.zip(normalizer2.transform(features))
+{% endhighlight %}
+</div>
 </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index b478c21..485abe2 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -31,8 +31,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.feature.Word2Vec
-import org.apache.spark.mllib.feature.Word2VecModel
+import org.apache.spark.mllib.feature._
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.random.{RandomRDDs => RG}
@@ -292,6 +291,43 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
+   * Java stub for Normalizer.transform()
+   */
+  def normalizeVector(p: Double, vector: Vector): Vector = {
+    new Normalizer(p).transform(vector)
+  }
+
+  /**
+   * Java stub for Normalizer.transform()
+   */
+  def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = {
+    new Normalizer(p).transform(rdd)
+  }
+
+  /**
+   * Java stub for IDF.fit(). This stub returns a
+   * handle to the Java object instead of the content of the Java object.
+   * Extra care needs to be taken in the Python code to ensure it gets freed on
+   * exit; see the Py4J documentation.
+   */
+  def fitStandardScaler(
+      withMean: Boolean,
+      withStd: Boolean,
+      data: JavaRDD[Vector]): StandardScalerModel = {
+    new StandardScaler(withMean, withStd).fit(data.rdd)
+  }
+
+  /**
+   * Java stub for IDF.fit(). This stub returns a
+   * handle to the Java object instead of the content of the Java object.
+   * Extra care needs to be taken in the Python code to ensure it gets freed on
+   * exit; see the Py4J documentation.
+   */
+  def fitIDF(minDocFreq: Int, dataset: JavaRDD[Vector]): IDFModel = {
+    new IDF(minDocFreq).fit(dataset)
+  }
+
+  /**
    * Java stub for Python mllib Word2Vec fit(). This stub returns a
    * handle to the Java object instead of the content of the Java object.
    * Extra care needs to be taken in the Python code to ensure it gets freed on
@@ -328,6 +364,15 @@ class PythonMLLibAPI extends Serializable {
       model.transform(word)
     }
 
+    /**
+     * Transforms an RDD of words to its vector representation
+     * @param rdd an RDD of words
+     * @return an RDD of vector representations of words
+     */
+    def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = {
+      rdd.rdd.map(model.transform)
+    }
+
     def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] 
= {
       val vec = transform(word)
       findSynonyms(vec, num)

http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
index 415a845..7358c1c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.feature
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
 
@@ -48,4 +49,14 @@ trait VectorTransformer extends Serializable {
     data.map(x => this.transform(x))
   }
 
+  /**
+   * Applies transformation on an JavaRDD[Vector].
+   *
+   * @param data JavaRDD[Vector] to be transformed.
+   * @return transformed JavaRDD[Vector].
+   */
+  def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = {
+    transform(data.rdd)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index d321994..f5f7ad6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -432,7 +432,7 @@ class Word2VecModel private[mllib] (
         throw new IllegalStateException(s"$word not in vocabulary")
     }
   }
-  
+
   /**
    * Find synonyms of a word
    * @param word a word
@@ -443,7 +443,7 @@ class Word2VecModel private[mllib] (
     val vector = transform(word)
     findSynonyms(vector,num)
   }
-  
+
   /**
    * Find synonyms of the vector representation of a word
    * @param vector vector representation of a word

http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/python/pyspark/mllib/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index b5a3f22..3243434 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -18,59 +18,357 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+           'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+_old_smart_decode = py4j.protocol.smart_decode
+
+_float_str_mapping = {
+    u'nan': u'NaN',
+    u'inf': u'Infinity',
+    u'-inf': u'-Infinity',
+}
+
+
+def _new_smart_decode(obj):
+    if isinstance(obj, float):
+        s = unicode(obj)
+        return _float_str_mapping.get(s, s)
+    return _old_smart_decode(obj)
+
+py4j.protocol.smart_decode = _new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+    'LinkedList',
+    'SparseVector',
+    'DenseVector',
+    'DenseMatrix',
+    'Rating',
+    'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+    """ Convert Python object into Java """
+    if isinstance(a, RDD):
+        a = _to_java_object_rdd(a)
+    elif not isinstance(a, (int, long, float, bool, basestring)):
+        bytes = bytearray(PickleSerializer().dumps(a))
+        a = sc._jvm.SerDe.loads(bytes)
+    return a
+
+
+def _java2py(sc, r):
+    if isinstance(r, JavaObject):
+        clsName = r.getClass().getSimpleName()
+        if clsName in ("RDD", "JavaRDD"):
+            if clsName == "RDD":
+                r = r.toJavaRDD()
+            jrdd = sc._jvm.SerDe.javaToPython(r)
+            return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+        elif clsName in _picklable_classes:
+            r = sc._jvm.SerDe.dumps(r)
 
+    if isinstance(r, bytearray):
+        r = PickleSerializer().loads(str(r))
+    return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+    """ Call Java Function
     """
-    class for Word2Vec model
+    args = [_py2java(sc, a) for a in args]
+    return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+    """ Call API in PythonMLLibAPI
     """
-    def __init__(self, sc, java_model):
+    api = getattr(sc._jvm.PythonMLLibAPI(), name)
+    return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+    """
+    :: DeveloperApi ::
+
+    Base class for transformation of a vector or RDD of vector
+    """
+    def transform(self, vector):
+        """
+        Applies transformation on a vector.
+
+        :param vector: vector to be transformed.
+        """
+        raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+    """
+    :: Experimental ::
+
+    Normalizes samples individually to unit L\ :sup:`p`\ norm
+
+    For any 1 <= `p` <= float('inf'), normalizes samples using
+    sum(abs(vector). :sup:`p`) :sup:`(1/p)` as norm.
+
+    For `p` = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+    >>> v = Vectors.dense(range(3))
+    >>> nor = Normalizer(1)
+    >>> nor.transform(v)
+    DenseVector([0.0, 0.3333, 0.6667])
+
+    >>> rdd = sc.parallelize([v])
+    >>> nor.transform(rdd).collect()
+    [DenseVector([0.0, 0.3333, 0.6667])]
+
+    >>> nor2 = Normalizer(float("inf"))
+    >>> nor2.transform(v)
+    DenseVector([0.0, 0.5, 1.0])
+    """
+    def __init__(self, p=2.0):
         """
-        :param sc:  Spark context
-        :param java_model:  Handle to Java model object
+        :param p: Normalization in L^p^ space, p = 2 by default.
         """
+        assert p >= 1.0, "p should be greater than 1.0"
+        self.p = float(p)
+
+    def transform(self, vector):
+        """
+        Applies unit length normalization on a vector.
+
+        :param vector: vector to be normalized.
+        :return: normalized vector. If the norm of the input is zero, it
+                will return the input vector.
+        """
+        sc = SparkContext._active_spark_context
+        assert sc is not None, "SparkContext should be initialized first"
+        return _callAPI(sc, "normalizeVector", self.p, vector)
+
+
+class JavaModelWrapper(VectorTransformer):
+    """
+    Wrapper for the model in JVM
+    """
+    def __init__(self, sc, java_model):
         self._sc = sc
         self._java_model = java_model
 
     def __del__(self):
         self._sc._gateway.detach(self._java_model)
 
-    def transform(self, word):
+    def transform(self, dataset):
+        return _callJavaFunc(self._sc, self._java_model.transform, dataset)
+
+
+class StandardScalerModel(JavaModelWrapper):
+    """
+    :: Experimental ::
+
+    Represents a StandardScaler model that can transform vectors.
+    """
+    def transform(self, vector):
         """
-        :param word: a word
-        :return: vector representation of word
+        Applies standardization transformation on a vector.
+
+        :param vector: Vector to be standardized.
+        :return: Standardized vector. If the variance of a column is zero,
+                it will return default `0.0` for the column with zero variance.
+        """
+        return JavaModelWrapper.transform(self, vector)
+
+
+class StandardScaler(object):
+    """
+    :: Experimental ::
+
+    Standardizes features by removing the mean and scaling to unit
+    variance using column summary statistics on the samples in the
+    training set.
 
+    >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
+    >>> dataset = sc.parallelize(vs)
+    >>> standardizer = StandardScaler(True, True)
+    >>> model = standardizer.fit(dataset)
+    >>> result = model.transform(dataset)
+    >>> for r in result.collect(): r
+    DenseVector([-0.7071, 0.7071, -0.7071])
+    DenseVector([0.7071, -0.7071, 0.7071])
+    """
+    def __init__(self, withMean=False, withStd=True):
+        """
+        :param withMean: False by default. Centers the data with mean
+                 before scaling. It will build a dense output, so this
+                 does not work on sparse input and will raise an exception.
+        :param withStd: True by default. Scales the data to unit standard
+                 deviation.
+        """
+        if not (withMean or withStd):
+            warnings.warn("Both withMean and withStd are false. The model does 
nothing.")
+        self.withMean = withMean
+        self.withStd = withStd
+
+    def fit(self, dataset):
+        """
+        Computes the mean and variance and stores as a model to be used for 
later scaling.
+
+        :param data: The data used to compute the mean and variance to build
+                    the transformation model.
+        :return: a StandardScalarModel
+        """
+        sc = dataset.context
+        jmodel = _callAPI(sc, "fitStandardScaler", self.withMean, 
self.withStd, dataset)
+        return StandardScalerModel(sc, jmodel)
+
+
+class HashingTF(object):
+    """
+    :: Experimental ::
+
+    Maps a sequence of terms to their term frequencies using the hashing trick.
+
+    Note: the terms must be hashable (can not be dict/set/list...).
+
+    >>> htf = HashingTF(100)
+    >>> doc = "a a b b c d".split(" ")
+    >>> htf.transform(doc)
+    SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
+    """
+    def __init__(self, numFeatures=1 << 20):
+        """
+        :param numFeatures: number of features (default: 2^20)
+        """
+        self.numFeatures = numFeatures
+
+    def indexOf(self, term):
+        """ Returns the index of the input term. """
+        return hash(term) % self.numFeatures
+
+    def transform(self, document):
+        """
+        Transforms the input document (list of terms) to term frequency 
vectors,
+        or transform the RDD of document to RDD of term frequency vectors.
+        """
+        if isinstance(document, RDD):
+            return document.map(self.transform)
+
+        freq = {}
+        for term in document:
+            i = self.indexOf(term)
+            freq[i] = freq.get(i, 0) + 1.0
+        return Vectors.sparse(self.numFeatures, freq.items())
+
+
+class IDFModel(JavaModelWrapper):
+    """
+    Represents an IDF model that can transform term frequency vectors.
+    """
+    def transform(self, dataset):
+        """
+        Transforms term frequency (TF) vectors to TF-IDF vectors.
+
+        If `minDocFreq` was set for the IDF calculation,
+        the terms which occur in fewer than `minDocFreq`
+        documents will have an entry of 0.
+
+        :param dataset: an RDD of term frequency vectors
+        :return: an RDD of TF-IDF vectors
+        """
+        return JavaModelWrapper.transform(self, dataset)
+
+
+class IDF(object):
+    """
+    :: Experimental ::
+
+    Inverse document frequency (IDF).
+
+    The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`,
+    where `m` is the total number of documents and `d(t)` is the number
+    of documents that contain term `t`.
+
+    This implementation supports filtering out terms which do not appear
+    in a minimum number of documents (controlled by the variable `minDocFreq`).
+    For terms that are not in at least `minDocFreq` documents, the IDF is
+    found as 0, resulting in TF-IDFs of 0.
+
+    >>> n = 4
+    >>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
+    ...          Vectors.dense([0.0, 1.0, 2.0, 3.0]),
+    ...          Vectors.sparse(n, [1], [1.0])]
+    >>> data = sc.parallelize(freqs)
+    >>> idf = IDF()
+    >>> model = idf.fit(data)
+    >>> tfidf = model.transform(data)
+    >>> for r in tfidf.collect(): r
+    SparseVector(4, {1: 0.0, 3: 0.5754})
+    DenseVector([0.0, 0.0, 1.3863, 0.863])
+    SparseVector(4, {1: 0.0})
+    """
+    def __init__(self, minDocFreq=0):
+        """
+        :param minDocFreq: minimum of documents in which a term
+                           should appear for filtering
+        """
+        self.minDocFreq = minDocFreq
+
+    def fit(self, dataset):
+        """
+        Computes the inverse document frequency.
+
+        :param dataset: an RDD of term frequency vectors
+        """
+        sc = dataset.context
+        jmodel = _callAPI(sc, "fitIDF", self.minDocFreq, dataset)
+        return IDFModel(sc, jmodel)
+
+
+class Word2VecModel(JavaModelWrapper):
+    """
+    class for Word2Vec model
+    """
+    def transform(self, word):
+        """
         Transforms a word to its vector representation
 
         Note: local use only
+
+        :param word: a word
+        :return: vector representation of word(s)
         """
-        # TODO: make transform usable in RDD operations from python side
-        result = self._java_model.transform(word)
-        return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result)))
+        try:
+            return _callJavaFunc(self._sc, self._java_model.transform, word)
+        except Py4JJavaError:
+            raise ValueError("%s not found" % word)
 
-    def findSynonyms(self, x, num):
+    def findSynonyms(self, word, num):
         """
-        :param x: a word or a vector representation of word
+        Find synonyms of a word
+
+        :param word: a word or a vector representation of word
         :param num: number of synonyms to find
         :return: array of (word, cosineSimilarity)
 
-        Find synonyms of a word
-
         Note: local use only
         """
-        # TODO: make findSynonyms usable in RDD operations from python side
-        ser = PickleSerializer()
-        if type(x) == str:
-            jlist = self._java_model.findSynonyms(x, num)
-        else:
-            bytes = bytearray(ser.dumps(_convert_to_vector(x)))
-            vec = self._sc._jvm.SerDe.loads(bytes)
-            jlist = self._java_model.findSynonyms(vec, num)
-        words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist)))
+        words, similarity = _callJavaFunc(self._sc, 
self._java_model.findSynonyms, word, num)
         return zip(words, similarity)
 
 
@@ -85,6 +383,7 @@ class Word2Vec(object):
     We used skip-gram model in our implementation and hierarchical softmax
     method to train the model. The variable names in the implementation
     matches the original C implementation.
+
     For original C implementation, see https://code.google.com/p/word2vec/
     For research papers, see
     Efficient Estimation of Word Representations in Vector Space
@@ -95,33 +394,26 @@ class Word2Vec(object):
     >>> localDoc = [sentence, sentence]
     >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
     >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+
     >>> syms = model.findSynonyms("a", 2)
-    >>> str(syms[0][0])
-    'b'
-    >>> str(syms[1][0])
-    'c'
-    >>> len(syms)
-    2
+    >>> [s[0] for s in syms]
+    [u'b', u'c']
     >>> vec = model.transform("a")
-    >>> len(vec)
-    10
     >>> syms = model.findSynonyms(vec, 2)
-    >>> str(syms[0][0])
-    'b'
-    >>> str(syms[1][0])
-    'c'
-    >>> len(syms)
-    2
+    >>> [s[0] for s in syms]
+    [u'b', u'c']
     """
     def __init__(self):
         """
         Construct Word2Vec instance
         """
+        import random  # this can't be on the top because of mllib.random
+
         self.vectorSize = 100
         self.learningRate = 0.025
         self.numPartitions = 1
         self.numIterations = 1
-        self.seed = 42L
+        self.seed = random.randint(0, sys.maxint)
 
     def setVectorSize(self, vectorSize):
         """
@@ -164,20 +456,13 @@ class Word2Vec(object):
         Computes the vector representation of each word in vocabulary.
 
         :param data: training data. RDD of subtype of Iterable[String]
-        :return: python Word2VecModel instance
+        :return: Word2VecModel instance
         """
         sc = data.context
-        ser = PickleSerializer()
-        vectorSize = self.vectorSize
-        learningRate = self.learningRate
-        numPartitions = self.numPartitions
-        numIterations = self.numIterations
-        seed = self.seed
-
-        model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
-            _to_java_object_rdd(data), vectorSize,
-            learningRate, numPartitions, numIterations, seed)
-        return Word2VecModel(sc, model)
+        jmodel = _callAPI(sc, "trainWord2Vec", data, int(self.vectorSize),
+                          float(self.learningRate), int(self.numPartitions),
+                          int(self.numIterations), long(self.seed))
+        return Word2VecModel(sc, jmodel)
 
 
 def _test():
@@ -191,4 +476,8 @@ def _test():
         exit(-1)
 
 if __name__ == "__main__":
+    # remove current path from list of search paths to avoid importing 
mllib.random
+    # for C{import random}, which is done in an external dependency of pyspark 
during doctests.
+    import sys
+    sys.path.pop(0)
     _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/fae095bc/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 773d8d3..1b9bf59 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -111,6 +111,13 @@ def _vector_size(v):
         raise TypeError("Cannot treat type %s as a vector" % type(v))
 
 
+def _format_float(f, digits=4):
+    s = str(round(f, digits))
+    if '.' in s:
+        s = s[:s.index('.') + 1 + digits]
+    return s
+
+
 class Vector(object):
     """
     Abstract class for DenseVector and SparseVector
@@ -228,7 +235,7 @@ class DenseVector(Vector):
         return "[" + ",".join([str(v) for v in self.array]) + "]"
 
     def __repr__(self):
-        return "DenseVector(%r)" % self.array
+        return "DenseVector([%s])" % (', '.join(_format_float(i) for i in 
self.array))
 
     def __eq__(self, other):
         return isinstance(other, DenseVector) and self.array == other.array
@@ -416,7 +423,7 @@ class SparseVector(Vector):
         Returns a copy of this SparseVector as a 1-dimensional NumPy array.
         """
         arr = np.zeros((self.size,), dtype=np.float64)
-        for i in xrange(self.indices.size):
+        for i in xrange(len(self.indices)):
             arr[self.indices[i]] = self.values[i]
         return arr
 
@@ -431,7 +438,8 @@ class SparseVector(Vector):
     def __repr__(self):
         inds = self.indices
         vals = self.values
-        entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in 
xrange(len(inds))])
+        entries = ", ".join(["{0}: {1}".format(inds[i], _format_float(vals[i]))
+                             for i in xrange(len(inds))])
         return "SparseVector({0}, {{{1}}})".format(self.size, entries)
 
     def __eq__(self, other):
@@ -491,7 +499,7 @@ class Vectors(object):
         returns a NumPy array.
 
         >>> Vectors.dense([1, 2, 3])
-        DenseVector(array('d', [1.0, 2.0, 3.0]))
+        DenseVector([1.0, 2.0, 3.0])
         """
         return DenseVector(elements)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to