This is an automated email from the ASF dual-hosted git repository.
meng pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4421178 [SPARK-31497][ML][PYSPARK] Fix Pyspark
CrossValidator/TrainValidationSplit with pipeline estimator cannot save and
load model
4421178 is described below
commit 442117812ca6edc6e0ab271da829032b9637e89e
Author: Weichen Xu <[email protected]>
AuthorDate: Sun Apr 26 21:04:14 2020 -0700
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit
with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator
cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some
estimators are special (in order to support pure python implemented nested
estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model
reader/writer do NOT support pure python implemented nested
estimators/transformers. Because they use java reader/writer wrapper as python
side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all
estimators define the `_transfer_param_map_to_java` and
`_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it
lead to this bug.
In this PR I add `_transfer_param_map_to_java` and
`_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel,
ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer,
hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest
estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression,
LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel,
ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid,
evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
(cherry picked from commit 4a21c4cc92805b034ade0593eea3c4a9b6122083)
Signed-off-by: Xiangrui Meng <[email protected]>
---
python/pyspark/ml/pipeline.py | 53 ++++++++++++-
python/pyspark/ml/tests/test_tuning.py | 139 ++++++++++++++++++++++++++++++++-
2 files changed, 189 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 09e0748..53d07ec 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext
from pyspark.ml.base import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params
from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaParams
-from pyspark.ml.common import inherit_doc
+from pyspark.ml.wrapper import JavaParams, JavaWrapper
+from pyspark.ml.common import inherit_doc, _java2py, _py2java
@inherit_doc
@@ -174,6 +174,55 @@ class Pipeline(Estimator, MLReadable, MLWritable):
return _java_obj
+ def _make_java_param_pair(self, param, value):
+ """
+ Makes a Java param pair.
+ """
+ sc = SparkContext._active_spark_context
+ param = self._resolveParam(param)
+ java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent,
param.name, param.doc)
+ if isinstance(value, Params) and hasattr(value, "_to_java"):
+ # Convert JavaEstimator/JavaTransformer object or
Estimator/Transformer object which
+ # implements `_to_java` method (such as OneVsRest, Pipeline
object) to java object.
+ # used in the case of an estimator having another estimator as a
parameter
+ # the reason why this is not in _py2java in common.py is that
importing
+ # Estimator and Model in common.py results in a circular import
with inherit_doc
+ java_value = value._to_java()
+ else:
+ java_value = _py2java(sc, value)
+ return java_param.w(java_value)
+
+ def _transfer_param_map_to_java(self, pyParamMap):
+ """
+ Transforms a Python ParamMap into a Java ParamMap.
+ """
+ paramMap =
JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
+ for param in self.params:
+ if param in pyParamMap:
+ pair = self._make_java_param_pair(param, pyParamMap[param])
+ paramMap.put([pair])
+ return paramMap
+
+ def _transfer_param_map_from_java(self, javaParamMap):
+ """
+ Transforms a Java ParamMap into a Python ParamMap.
+ """
+ sc = SparkContext._active_spark_context
+ paramMap = dict()
+ for pair in javaParamMap.toList():
+ param = pair.param()
+ if self.hasParam(str(param.name())):
+ java_obj = pair.value()
+ if
sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj):
+ # Note: JavaParams._from_java support both
JavaEstimator/JavaTransformer class
+ # and Estimator/Transformer class which implements
`_from_java` static method
+ # (such as OneVsRest, Pipeline class).
+ py_obj = JavaParams._from_java(java_obj)
+ else:
+ py_obj = _java2py(sc, java_obj)
+ paramMap[self.getParam(param.name())] = py_obj
+ return paramMap
+
@inherit_doc
class PipelineWriter(MLWriter):
diff --git a/python/pyspark/ml/tests/test_tuning.py
b/python/pyspark/ml/tests/test_tuning.py
index 9d8ba37..6bcc3f9 100644
--- a/python/pyspark/ml/tests/test_tuning.py
+++ b/python/pyspark/ml/tests/test_tuning.py
@@ -18,7 +18,8 @@
import tempfile
import unittest
-from pyspark.ml import Estimator, Model
+from pyspark.ml.feature import HashingTF, Tokenizer
+from pyspark.ml import Estimator, Pipeline, Model
from pyspark.ml.classification import LogisticRegression,
LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
@@ -310,6 +311,75 @@ class CrossValidatorTests(SparkSessionTestCase):
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
+ def test_save_load_pipeline_estimator(self):
+ temp_path = tempfile.mkdtemp()
+ training = self.spark.createDataFrame([
+ (0, "a b c d e spark", 1.0),
+ (1, "b d", 0.0),
+ (2, "spark f g h", 1.0),
+ (3, "hadoop mapreduce", 0.0),
+ (4, "b spark who", 1.0),
+ (5, "g d a y", 0.0),
+ (6, "spark fly", 1.0),
+ (7, "was mapreduce", 0.0),
+ ], ["id", "text", "label"])
+
+ # Configure an ML pipeline, which consists of tree stages: tokenizer,
hashingTF, and lr.
+ tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
outputCol="features")
+
+ ova = OneVsRest(classifier=LogisticRegression())
+ lr1 = LogisticRegression().setMaxIter(5)
+ lr2 = LogisticRegression().setMaxIter(10)
+
+ pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
+
+ paramGrid = ParamGridBuilder() \
+ .addGrid(hashingTF.numFeatures, [10, 100]) \
+ .addGrid(ova.classifier, [lr1, lr2]) \
+ .build()
+
+ crossval = CrossValidator(estimator=pipeline,
+ estimatorParamMaps=paramGrid,
+
evaluator=MulticlassClassificationEvaluator(),
+ numFolds=2) # use 3+ folds in practice
+
+ # Run cross-validation, and choose the best set of parameters.
+ cvModel = crossval.fit(training)
+
+ # test save/load of CrossValidatorModel
+ cvModelPath = temp_path + "/cvModel"
+ cvModel.save(cvModelPath)
+ loadedModel = CrossValidatorModel.load(cvModelPath)
+ self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
+ self.assertEqual(len(loadedModel.bestModel.stages),
len(cvModel.bestModel.stages))
+ for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
+ cvModel.bestModel.stages):
+ self.assertEqual(loadedStage.uid, originalStage.uid)
+
+ # Test nested pipeline
+ nested_pipeline = Pipeline(stages=[tokenizer,
Pipeline(stages=[hashingTF, ova])])
+ crossval2 = CrossValidator(estimator=nested_pipeline,
+ estimatorParamMaps=paramGrid,
+
evaluator=MulticlassClassificationEvaluator(),
+ numFolds=2) # use 3+ folds in practice
+
+ # Run cross-validation, and choose the best set of parameters.
+ cvModel2 = crossval2.fit(training)
+ # test save/load of CrossValidatorModel
+ cvModelPath2 = temp_path + "/cvModel2"
+ cvModel2.save(cvModelPath2)
+ loadedModel2 = CrossValidatorModel.load(cvModelPath2)
+ self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
+ loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
+ original_nested_pipeline_model = cvModel2.bestModel.stages[1]
+ self.assertEqual(loaded_nested_pipeline_model.uid,
original_nested_pipeline_model.uid)
+ self.assertEqual(len(loaded_nested_pipeline_model.stages),
+ len(original_nested_pipeline_model.stages))
+ for loadedStage, originalStage in
zip(loaded_nested_pipeline_model.stages,
+
original_nested_pipeline_model.stages):
+ self.assertEqual(loadedStage.uid, originalStage.uid)
+
class TrainValidationSplitTests(SparkSessionTestCase):
@@ -511,6 +581,73 @@ class TrainValidationSplitTests(SparkSessionTestCase):
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
+ def test_save_load_pipeline_estimator(self):
+ temp_path = tempfile.mkdtemp()
+ training = self.spark.createDataFrame([
+ (0, "a b c d e spark", 1.0),
+ (1, "b d", 0.0),
+ (2, "spark f g h", 1.0),
+ (3, "hadoop mapreduce", 0.0),
+ (4, "b spark who", 1.0),
+ (5, "g d a y", 0.0),
+ (6, "spark fly", 1.0),
+ (7, "was mapreduce", 0.0),
+ ], ["id", "text", "label"])
+
+ # Configure an ML pipeline, which consists of tree stages: tokenizer,
hashingTF, and lr.
+ tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
outputCol="features")
+
+ ova = OneVsRest(classifier=LogisticRegression())
+ lr1 = LogisticRegression().setMaxIter(5)
+ lr2 = LogisticRegression().setMaxIter(10)
+
+ pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
+
+ paramGrid = ParamGridBuilder() \
+ .addGrid(hashingTF.numFeatures, [10, 100]) \
+ .addGrid(ova.classifier, [lr1, lr2]) \
+ .build()
+
+ tvs = TrainValidationSplit(estimator=pipeline,
+ estimatorParamMaps=paramGrid,
+
evaluator=MulticlassClassificationEvaluator())
+
+ # Run train validation split, and choose the best set of parameters.
+ tvsModel = tvs.fit(training)
+
+ # test save/load of CrossValidatorModel
+ tvsModelPath = temp_path + "/tvsModel"
+ tvsModel.save(tvsModelPath)
+ loadedModel = TrainValidationSplitModel.load(tvsModelPath)
+ self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
+ self.assertEqual(len(loadedModel.bestModel.stages),
len(tvsModel.bestModel.stages))
+ for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
+ tvsModel.bestModel.stages):
+ self.assertEqual(loadedStage.uid, originalStage.uid)
+
+ # Test nested pipeline
+ nested_pipeline = Pipeline(stages=[tokenizer,
Pipeline(stages=[hashingTF, ova])])
+ tvs2 = TrainValidationSplit(estimator=nested_pipeline,
+ estimatorParamMaps=paramGrid,
+
evaluator=MulticlassClassificationEvaluator())
+
+ # Run train validation split, and choose the best set of parameters.
+ tvsModel2 = tvs2.fit(training)
+ # test save/load of CrossValidatorModel
+ tvsModelPath2 = temp_path + "/tvsModel2"
+ tvsModel2.save(tvsModelPath2)
+ loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
+ self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
+ loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
+ original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
+ self.assertEqual(loaded_nested_pipeline_model.uid,
original_nested_pipeline_model.uid)
+ self.assertEqual(len(loaded_nested_pipeline_model.stages),
+ len(original_nested_pipeline_model.stages))
+ for loadedStage, originalStage in
zip(loaded_nested_pipeline_model.stages,
+
original_nested_pipeline_model.stages):
+ self.assertEqual(loadedStage.uid, originalStage.uid)
+
def test_copy(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]