Repository: spark
Updated Branches:
  refs/heads/branch-1.3 4e099d757 -> d71099133


[SPARK-5769] Set params in constructors and in setParams in Python ML pipelines

This PR allow Python users to set params in constructors and in setParams, 
where we use decorator `keyword_only` to force keyword arguments. The trade-off 
is discussed in the design doc of SPARK-4586.

Generated doc:
![screen shot 2015-02-12 at 3 06 58 
am](https://cloud.githubusercontent.com/assets/829644/6166491/9cfcd06a-b265-11e4-99ea-473d866634fc.png)

CC: davies rxin

Author: Xiangrui Meng <m...@databricks.com>

Closes #4564 from mengxr/py-pipeline-kw and squashes the following commits:

fedf720 [Xiangrui Meng] use toDF
d565f2c [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
py-pipeline-kw
cbc15d3 [Xiangrui Meng] fix style
5032097 [Xiangrui Meng] update pipeline signature
950774e [Xiangrui Meng] simplify keyword_only and update constructor/setParams 
signatures
fdde5fc [Xiangrui Meng] fix style
c9384b8 [Xiangrui Meng] fix sphinx doc
8e59180 [Xiangrui Meng] add setParams and make constructors take params, where 
we force keyword args

(cherry picked from commit cd4a15366244657c4b7936abe5054754534366f2)
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7109913
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7109913
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7109913

Branch: refs/heads/branch-1.3
Commit: d71099133b64a4b9e9ab430cf1b314ee7deaf08d
Parents: 4e099d7
Author: Xiangrui Meng <m...@databricks.com>
Authored: Sun Feb 15 20:29:26 2015 -0800
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Sun Feb 15 20:29:36 2015 -0800

----------------------------------------------------------------------
 .../ml/simple_text_classification_pipeline.py   | 44 +++++-------
 python/docs/conf.py                             |  4 ++
 python/pyspark/ml/classification.py             | 44 +++++++++---
 python/pyspark/ml/feature.py                    | 72 ++++++++++++++++----
 python/pyspark/ml/param/__init__.py             |  8 +++
 python/pyspark/ml/pipeline.py                   | 19 +++++-
 python/pyspark/ml/util.py                       | 15 ++++
 7 files changed, 153 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/examples/src/main/python/ml/simple_text_classification_pipeline.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py 
b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c7df3d7..b4d9355 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -36,43 +36,33 @@ if __name__ == "__main__":
     sqlCtx = SQLContext(sc)
 
     # Prepare training documents, which are labeled.
-    LabeledDocument = Row('id', 'text', 'label')
-    training = sqlCtx.inferSchema(
-        sc.parallelize([(0L, "a b c d e spark", 1.0),
-                        (1L, "b d", 0.0),
-                        (2L, "spark f g h", 1.0),
-                        (3L, "hadoop mapreduce", 0.0)])
-          .map(lambda x: LabeledDocument(*x)))
+    LabeledDocument = Row("id", "text", "label")
+    training = sc.parallelize([(0L, "a b c d e spark", 1.0),
+                               (1L, "b d", 0.0),
+                               (2L, "spark f g h", 1.0),
+                               (3L, "hadoop mapreduce", 0.0)]) \
+        .map(lambda x: LabeledDocument(*x)).toDF()
 
     # Configure an ML pipeline, which consists of tree stages: tokenizer, 
hashingTF, and lr.
-    tokenizer = Tokenizer() \
-        .setInputCol("text") \
-        .setOutputCol("words")
-    hashingTF = HashingTF() \
-        .setInputCol(tokenizer.getOutputCol()) \
-        .setOutputCol("features")
-    lr = LogisticRegression() \
-        .setMaxIter(10) \
-        .setRegParam(0.01)
-    pipeline = Pipeline() \
-        .setStages([tokenizer, hashingTF, lr])
+    tokenizer = Tokenizer(inputCol="text", outputCol="words")
+    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
+    lr = LogisticRegression(maxIter=10, regParam=0.01)
+    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
 
     # Fit the pipeline to training documents.
     model = pipeline.fit(training)
 
     # Prepare test documents, which are unlabeled.
-    Document = Row('id', 'text')
-    test = sqlCtx.inferSchema(
-        sc.parallelize([(4L, "spark i j k"),
-                        (5L, "l m n"),
-                        (6L, "mapreduce spark"),
-                        (7L, "apache hadoop")])
-          .map(lambda x: Document(*x)))
+    Document = Row("id", "text")
+    test = sc.parallelize([(4L, "spark i j k"),
+                           (5L, "l m n"),
+                           (6L, "mapreduce spark"),
+                           (7L, "apache hadoop")]) \
+        .map(lambda x: Document(*x)).toDF()
 
     # Make predictions on test documents and print columns of interest.
     prediction = model.transform(test)
-    prediction.registerTempTable("prediction")
-    selected = sqlCtx.sql("SELECT id, text, prediction from prediction")
+    selected = prediction.select("id", "text", "prediction")
     for row in selected.collect():
         print row
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/docs/conf.py
----------------------------------------------------------------------
diff --git a/python/docs/conf.py b/python/docs/conf.py
index b00dce9..cbbf7ff 100644
--- a/python/docs/conf.py
+++ b/python/docs/conf.py
@@ -97,6 +97,10 @@ pygments_style = 'sphinx'
 # If true, keep warnings as "system message" paragraphs in the built documents.
 #keep_warnings = False
 
+# -- Options for autodoc --------------------------------------------------
+
+# Look at the first line of the docstring for function and method signatures.
+autodoc_docstring_signature = True
 
 # -- Options for HTML output ----------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 6bd2aa8..b6de749 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
 from pyspark.ml.wrapper import JavaEstimator, JavaModel
 from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, 
HasPredictionCol, HasMaxIter,\
     HasRegParam
@@ -32,22 +32,46 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredicti
 
     >>> from pyspark.sql import Row
     >>> from pyspark.mllib.linalg import Vectors
-    >>> dataset = sqlCtx.inferSchema(sc.parallelize([ \
-            Row(label=1.0, features=Vectors.dense(1.0)), \
-            Row(label=0.0, features=Vectors.sparse(1, [], []))]))
-    >>> lr = LogisticRegression() \
-            .setMaxIter(5) \
-            .setRegParam(0.01)
-    >>> model = lr.fit(dataset)
-    >>> test0 = 
sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.dense(-1.0))]))
+    >>> df = sc.parallelize([
+    ...     Row(label=1.0, features=Vectors.dense(1.0)),
+    ...     Row(label=0.0, features=Vectors.sparse(1, [], []))]).toDF()
+    >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+    >>> model = lr.fit(df)
+    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
     >>> print model.transform(test0).head().prediction
     0.0
-    >>> test1 = 
sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.sparse(1, [0], 
[1.0]))]))
+    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], 
[1.0]))]).toDF()
     >>> print model.transform(test1).head().prediction
     1.0
+    >>> lr.setParams("vector")
+    Traceback (most recent call last):
+        ...
+    TypeError: Method setParams forces keyword arguments.
     """
     _java_class = "org.apache.spark.ml.classification.LogisticRegression"
 
+    @keyword_only
+    def __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
+                 maxIter=100, regParam=0.1):
+        """
+        __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
+                 maxIter=100, regParam=0.1)
+        """
+        super(LogisticRegression, self).__init__()
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    def setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
+                  maxIter=100, regParam=0.1):
+        """
+        setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
+                  maxIter=100, regParam=0.1)
+        Sets params for logistic regression.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set_params(**kwargs)
+
     def _create_model(self, java_model):
         return LogisticRegressionModel(java_model)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index e088acd..f1ddbb4 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -16,7 +16,7 @@
 #
 
 from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
 from pyspark.ml.wrapper import JavaTransformer
 
 __all__ = ['Tokenizer', 'HashingTF']
@@ -29,18 +29,45 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
     splits it by white spaces.
 
     >>> from pyspark.sql import Row
-    >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(text="a b c")]))
-    >>> tokenizer = Tokenizer() \
-            .setInputCol("text") \
-            .setOutputCol("words")
-    >>> print tokenizer.transform(dataset).head()
+    >>> df = sc.parallelize([Row(text="a b c")]).toDF()
+    >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
+    >>> print tokenizer.transform(df).head()
     Row(text=u'a b c', words=[u'a', u'b', u'c'])
-    >>> print tokenizer.transform(dataset, {tokenizer.outputCol: 
"tokens"}).head()
+    >>> # Change a parameter.
+    >>> print tokenizer.setParams(outputCol="tokens").transform(df).head()
     Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+    >>> # Temporarily modify a parameter.
+    >>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
+    Row(text=u'a b c', words=[u'a', u'b', u'c'])
+    >>> print tokenizer.transform(df).head()
+    Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+    >>> # Must use keyword arguments to specify params.
+    >>> tokenizer.setParams("text")
+    Traceback (most recent call last):
+        ...
+    TypeError: Method setParams forces keyword arguments.
     """
 
     _java_class = "org.apache.spark.ml.feature.Tokenizer"
 
+    @keyword_only
+    def __init__(self, inputCol="input", outputCol="output"):
+        """
+        __init__(self, inputCol="input", outputCol="output")
+        """
+        super(Tokenizer, self).__init__()
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    def setParams(self, inputCol="input", outputCol="output"):
+        """
+        setParams(self, inputCol="input", outputCol="output")
+        Sets params for this Tokenizer.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set_params(**kwargs)
+
 
 @inherit_doc
 class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
@@ -49,20 +76,37 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, 
HasNumFeatures):
     hashing trick.
 
     >>> from pyspark.sql import Row
-    >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(words=["a", "b", 
"c"])]))
-    >>> hashingTF = HashingTF() \
-            .setNumFeatures(10) \
-            .setInputCol("words") \
-            .setOutputCol("features")
-    >>> print hashingTF.transform(dataset).head().features
+    >>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF()
+    >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", 
outputCol="features")
+    >>> print hashingTF.transform(df).head().features
+    (10,[7,8,9],[1.0,1.0,1.0])
+    >>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
     (10,[7,8,9],[1.0,1.0,1.0])
     >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
-    >>> print hashingTF.transform(dataset, params).head().vector
+    >>> print hashingTF.transform(df, params).head().vector
     (5,[2,3,4],[1.0,1.0,1.0])
     """
 
     _java_class = "org.apache.spark.ml.feature.HashingTF"
 
+    @keyword_only
+    def __init__(self, numFeatures=1 << 18, inputCol="input", 
outputCol="output"):
+        """
+        __init__(self, numFeatures=1 << 18, inputCol="input", 
outputCol="output")
+        """
+        super(HashingTF, self).__init__()
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    def setParams(self, numFeatures=1 << 18, inputCol="input", 
outputCol="output"):
+        """
+        setParams(self, numFeatures=1 << 18, inputCol="input", 
outputCol="output")
+        Sets params for this HashingTF.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set_params(**kwargs)
+
 
 if __name__ == "__main__":
     import doctest

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/pyspark/ml/param/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/__init__.py 
b/python/pyspark/ml/param/__init__.py
index 5566792..e3a53dd 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -80,3 +80,11 @@ class Params(Identifiable):
         dummy = Params()
         dummy.uid = "undefined"
         return dummy
+
+    def _set_params(self, **kwargs):
+        """
+        Sets params.
+        """
+        for param, value in kwargs.iteritems():
+            self.paramMap[getattr(self, param)] = value
+        return self

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/pyspark/ml/pipeline.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 2d239f8..18d8a58 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -18,7 +18,7 @@
 from abc import ABCMeta, abstractmethod
 
 from pyspark.ml.param import Param, Params
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
 
 
 __all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel']
@@ -89,10 +89,16 @@ class Pipeline(Estimator):
     identity transformer.
     """
 
-    def __init__(self):
+    @keyword_only
+    def __init__(self, stages=[]):
+        """
+        __init__(self, stages=[])
+        """
         super(Pipeline, self).__init__()
         #: Param for pipeline stages.
         self.stages = Param(self, "stages", "pipeline stages")
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
 
     def setStages(self, value):
         """
@@ -110,6 +116,15 @@ class Pipeline(Estimator):
         if self.stages in self.paramMap:
             return self.paramMap[self.stages]
 
+    @keyword_only
+    def setParams(self, stages=[]):
+        """
+        setParams(self, stages=[])
+        Sets params for Pipeline.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set_params(**kwargs)
+
     def fit(self, dataset, params={}):
         paramMap = self._merge_params(params)
         stages = paramMap[self.stages]

http://git-wip-us.apache.org/repos/asf/spark/blob/d7109913/python/pyspark/ml/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index b1caa84..81d3f08 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+from functools import wraps
 import uuid
 
 
@@ -32,6 +33,20 @@ def inherit_doc(cls):
     return cls
 
 
+def keyword_only(func):
+    """
+    A decorator that forces keyword arguments in the wrapped method
+    and saves actual input keyword arguments in `_input_kwargs`.
+    """
+    @wraps(func)
+    def wrapper(*args, **kwargs):
+        if len(args) > 1:
+            raise TypeError("Method %s forces keyword arguments." % 
func.__name__)
+        wrapper._input_kwargs = kwargs
+        return func(*args, **kwargs)
+    return wrapper
+
+
 class Identifiable(object):
     """
     Object with a unique ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to