[ 
https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119976#comment-15119976
 ] 

Seth Hendrickson commented on SPARK-12944:
------------------------------------------

Based on what I have seen, I think the fix is more involved... could you 
elaborate on your proposed solution? 

We could "fix" it in addGrid by checking that all elements of {{values}} are 
instances of {{param.expectedType}}, but I don't think this really gets at the 
root of the problem. First, this would still throw an error with ints that are 
expected to be floats, which is unsatisfactory since [PR 
9581|https://github.com/apache/spark/pull/9581] adds a type conversion 
mechanism specifically for this purpose. Second, the real problem is that 
params passed through the {{Estimator.fit}} and {{Transformer.transform}} 
methods use the {{Params.copy}} method to circumvent the set methods and type 
checking entirely! By modifying the {{_paramMap}} dictionary directly, the copy 
method allows ml pipeline elements to contain params that have no context (like 
a HashingTF containing a Naive Bayes smoothing parameter in its param map as in 
the example above).

I think the correct way to do this is to change the {{copy}} method to call the 
{{_set}} function instead of directly modifying {{_paramMap}}. That way we 
ensure that the {{_paramMap}} only contains parameters that belong to that 
class and that type checking is performed. I am working on a PR with the method 
I described, but I am open to feedback. Appreciate any thoughts on this.

> CrossValidator doesn't accept a Pipeline as an estimator
> --------------------------------------------------------
>
>                 Key: SPARK-12944
>                 URL: https://issues.apache.org/jira/browse/SPARK-12944
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, PySpark
>    Affects Versions: 1.6.0
>         Environment: spark-1.6.0-bin-hadoop2.6
> Python 3.4.4 :: Anaconda 2.4.1
>            Reporter: John Hogue
>            Priority: Minor
>
> Pipeline is supposed to act as an estimator which CrossValidator currently 
> throws error.
> {code}
> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
> from pyspark.ml.tuning import ParamGridBuilder
> from pyspark.ml.tuning import CrossValidator
> # Configure an ML pipeline, which consists of tree stages: tokenizer, 
> hashingTF, and nb.
> tokenizer = Tokenizer(inputCol="text", outputCol="words")
> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
> nb = NaiveBayes()
> pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])
> paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build()
> cv = CrossValidator(estimator=pipeline, 
>                     estimatorParamMaps=paramGrid, 
>                     evaluator=MulticlassClassificationEvaluator(), 
>                     numFolds=4)
> cvModel = cv.fit(training_df)
> {code}
> Sample dataset can be found here:
> https://github.com/dreyco676/nlp_spark/blob/master/data.zip
> The file can be converted to a DataFrame with:
> {code}
> # Load precleaned training set
> training_rdd = sc.textFile("data/clean_training.txt")
> parts_rdd = training_rdd.map(lambda l: l.split("\t"))
> # Filter bad rows out
> garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
> typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
> # Create DataFrame
> training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
> {code}
> Running the pipeline throws the following stack trace:
> {code}
> ---------------------------------------------------------------------------Py4JJavaError
>                              Traceback (most recent call 
> last)<ipython-input-3-34e9e27acada> in <module>()
>      17                     numFolds=4)
>      18 
> ---> 19 cvModel = cv.fit(training_df)
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      67                 return self.copy(params)._fit(dataset)
>      68             else:
> ---> 69                 return self._fit(dataset)
>      70         else:
>      71             raise ValueError("Params must be either a param map or a 
> list/tuple of param maps, "
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in 
> _fit(self, dataset)
>     237             train = df.filter(~condition)
>     238             for j in range(numModels):
> --> 239                 model = est.fit(train, epm[j])
>     240                 # TODO: duplicate evaluator to take extra params from 
> input
>     241                 metric = eva.evaluate(model.transform(validation, 
> epm[j]))
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      65         elif isinstance(params, dict):
>      66             if params:
> ---> 67                 return self.copy(params)._fit(dataset)
>      68             else:
>      69                 return self._fit(dataset)
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> _fit(self, dataset)
>     211                     dataset = stage.transform(dataset)
>     212                 else:  # must be an Estimator
> --> 213                     model = stage.fit(dataset)
>     214                     transformers.append(model)
>     215                     if i < indexOfLastEstimator:
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      67                 return self.copy(params)._fit(dataset)
>      68             else:
> ---> 69                 return self._fit(dataset)
>      70         else:
>      71             raise ValueError("Params must be either a param map or a 
> list/tuple of param maps, "
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _fit(self, dataset)
>     130 
>     131     def _fit(self, dataset):
> --> 132         java_model = self._fit_java(dataset)
>     133         return self._create_model(java_model)
>     134 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _fit_java(self, dataset)
>     126         :return: fitted Java model
>     127         """
> --> 128         self._transfer_params_to_java()
>     129         return self._java_obj.fit(dataset._jdf)
>     130 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _transfer_params_to_java(self)
>      80         for param in self.params:
>      81             if param in paramMap:
> ---> 82                 pair = self._make_java_param_pair(param, 
> paramMap[param])
>      83                 self._java_obj.set(pair)
>      84 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _make_java_param_pair(self, param, value)
>      71         java_param = self._java_obj.getParam(param.name)
>      72         java_value = _py2java(sc, value)
> ---> 73         return java_param.w(java_value)
>      74 
>      75     def _transfer_params_to_java(self):
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>     811         answer = self.gateway_client.send_command(command)
>     812         return_value = get_return_value(
> --> 813             answer, self.gateway_client, self.target_id, self.name)
>     814 
>     815         for temp_arg in temp_args:
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in 
> deco(*a, **kw)
>      43     def deco(*a, **kw):
>      44         try:
> ---> 45             return f(*a, **kw)
>      46         except py4j.protocol.Py4JJavaError as e:
>      47             s = e.java_exception.toString()
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)
>     306                 raise Py4JJavaError(
>     307                     "An error occurred while calling {0}{1}{2}.\n".
> --> 308                     format(target_id, ".", name), value)
>     309             else:
>     310                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o113.w.
> : java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.Double
>       at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
>       at org.apache.spark.ml.param.DoubleParam.w(params.scala:223)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:209)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Workaround is to run Transformers outside of pipeline. This ruins the purpose 
> of Pipelines.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to