This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 85c9e8c [SPARK-32092][ML][PYSPARK] Fix parameters not being copied in
CrossValidatorModel.copy(), read() and write()
85c9e8c is described below
commit 85c9e8c54c30b69c39075e97cd3cac295be09303
Author: Louiszr <[email protected]>
AuthorDate: Sat Aug 22 09:27:31 2020 -0500
[SPARK-32092][ML][PYSPARK] Fix parameters not being copied in
CrossValidatorModel.copy(), read() and write()
### What changes were proposed in this pull request?
Changed the definitions of
`CrossValidatorModel.copy()/_to_java()/_from_java()` so that exposed parameters
(i.e. parameters with `get()` methods) are copied in these methods.
### Why are the changes needed?
Parameters are copied in the respective Scala interface for
`CrossValidatorModel.copy()`.
It fits the semantics to persist parameters when calling
`CrossValidatorModel.save()` and `CrossValidatorModel.load()` so that the user
gets the same model by saving and loading it after. Not copying across
`numFolds` also causes bugs like Array index out of bound and losing sub-models
because this parameters will always default to 3 (as described in the JIRA
ticket).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tests for `CrossValidatorModel.copy()` and `save()`/`load()` are updated so
that they check parameters before and after function calls.
Closes #29445 from Louiszr/master.
Authored-by: Louiszr <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit d9eb06ea37cab185f1e49c641313be9707270252)
Signed-off-by: Sean Owen <[email protected]>
---
python/pyspark/ml/tests/test_tuning.py | 131 ++++++++++++++++++++++++++++++---
python/pyspark/ml/tuning.py | 67 +++++++++++++----
2 files changed, 172 insertions(+), 26 deletions(-)
diff --git a/python/pyspark/ml/tests/test_tuning.py
b/python/pyspark/ml/tests/test_tuning.py
index 6bcc3f9..b250740 100644
--- a/python/pyspark/ml/tests/test_tuning.py
+++ b/python/pyspark/ml/tests/test_tuning.py
@@ -89,15 +89,50 @@ class CrossValidatorTests(SparkSessionTestCase):
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
- cv = CrossValidator(estimator=iee, estimatorParamMaps=grid,
evaluator=evaluator)
+ cv = CrossValidator(
+ estimator=iee,
+ estimatorParamMaps=grid,
+ evaluator=evaluator,
+ collectSubModels=True,
+ numFolds=2
+ )
cvCopied = cv.copy()
- self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
+ for param in [
+ lambda x: x.getEstimator().uid,
+ # SPARK-32092: CrossValidator.copy() needs to copy all existing
params
+ lambda x: x.getNumFolds(),
+ lambda x: x.getFoldCol(),
+ lambda x: x.getCollectSubModels(),
+ lambda x: x.getParallelism(),
+ lambda x: x.getSeed()
+ ]:
+ self.assertEqual(param(cv), param(cvCopied))
cvModel = cv.fit(dataset)
cvModelCopied = cvModel.copy()
for index in range(len(cvModel.avgMetrics)):
self.assertTrue(abs(cvModel.avgMetrics[index] -
cvModelCopied.avgMetrics[index])
< 0.0001)
+ # SPARK-32092: CrossValidatorModel.copy() needs to copy all existing
params
+ for param in [
+ lambda x: x.getNumFolds(),
+ lambda x: x.getFoldCol(),
+ lambda x: x.getSeed()
+ ]:
+ self.assertEqual(param(cvModel), param(cvModelCopied))
+
+ cvModel.avgMetrics[0] = 'foo'
+ self.assertNotEqual(
+ cvModelCopied.avgMetrics[0],
+ 'foo',
+ "Changing the original avgMetrics should not affect the copied
model"
+ )
+ cvModel.subModels[0] = 'foo'
+ self.assertNotEqual(
+ cvModelCopied.subModels[0],
+ 'foo',
+ "Changing the original subModels should not affect the copied
model"
+ )
def test_fit_minimize_metric(self):
dataset = self.spark.createDataFrame([
@@ -166,16 +201,39 @@ class CrossValidatorTests(SparkSessionTestCase):
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
- cv = CrossValidator(estimator=lr, estimatorParamMaps=grid,
evaluator=evaluator)
+ cv = CrossValidator(
+ estimator=lr,
+ estimatorParamMaps=grid,
+ evaluator=evaluator,
+ collectSubModels=True,
+ numFolds=4,
+ seed=42
+ )
cvModel = cv.fit(dataset)
lrModel = cvModel.bestModel
- cvModelPath = temp_path + "/cvModel"
- lrModel.save(cvModelPath)
- loadedLrModel = LogisticRegressionModel.load(cvModelPath)
+ lrModelPath = temp_path + "/lrModel"
+ lrModel.save(lrModelPath)
+ loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
+ # SPARK-32092: Saving and then loading CrossValidatorModel should not
change the params
+ cvModelPath = temp_path + "/cvModel"
+ cvModel.save(cvModelPath)
+ loadedCvModel = CrossValidatorModel.load(cvModelPath)
+ for param in [
+ lambda x: x.getNumFolds(),
+ lambda x: x.getFoldCol(),
+ lambda x: x.getSeed(),
+ lambda x: len(x.subModels)
+ ]:
+ self.assertEqual(param(cvModel), param(loadedCvModel))
+
+ self.assertTrue(all(
+ loadedCvModel.isSet(param) for param in loadedCvModel.params
+ ))
+
def test_save_load_simple_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
@@ -451,16 +509,35 @@ class TrainValidationSplitTests(SparkSessionTestCase):
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
- tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid,
evaluator=evaluator)
+ tvs = TrainValidationSplit(
+ estimator=lr,
+ estimatorParamMaps=grid,
+ evaluator=evaluator,
+ collectSubModels=True,
+ seed=42
+ )
tvsModel = tvs.fit(dataset)
lrModel = tvsModel.bestModel
- tvsModelPath = temp_path + "/tvsModel"
- lrModel.save(tvsModelPath)
- loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
+ lrModelPath = temp_path + "/lrModel"
+ lrModel.save(lrModelPath)
+ loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
+ tvsModelPath = temp_path + "/tvsModel"
+ tvsModel.save(tvsModelPath)
+ loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath)
+ for param in [
+ lambda x: x.getSeed(),
+ lambda x: x.getTrainRatio(),
+ ]:
+ self.assertEqual(param(tvsModel), param(loadedTvsModel))
+
+ self.assertTrue(all(
+ loadedTvsModel.isSet(param) for param in loadedTvsModel.params
+ ))
+
def test_save_load_simple_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
@@ -662,11 +739,30 @@ class TrainValidationSplitTests(SparkSessionTestCase):
grid = ParamGridBuilder() \
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
.build()
- tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid,
evaluator=evaluator)
+ tvs = TrainValidationSplit(
+ estimator=iee,
+ estimatorParamMaps=grid,
+ evaluator=evaluator,
+ collectSubModels=True
+ )
tvsModel = tvs.fit(dataset)
tvsCopied = tvs.copy()
tvsModelCopied = tvsModel.copy()
+ for param in [
+ lambda x: x.getCollectSubModels(),
+ lambda x: x.getParallelism(),
+ lambda x: x.getSeed(),
+ lambda x: x.getTrainRatio(),
+ ]:
+ self.assertEqual(param(tvs), param(tvsCopied))
+
+ for param in [
+ lambda x: x.getSeed(),
+ lambda x: x.getTrainRatio(),
+ ]:
+ self.assertEqual(param(tvsModel), param(tvsModelCopied))
+
self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
"Copied TrainValidationSplit has the same uid of
Estimator")
@@ -678,6 +774,19 @@ class TrainValidationSplitTests(SparkSessionTestCase):
self.assertEqual(tvsModel.validationMetrics[index],
tvsModelCopied.validationMetrics[index])
+ tvsModel.validationMetrics[0] = 'foo'
+ self.assertNotEqual(
+ tvsModelCopied.validationMetrics[0],
+ 'foo',
+ "Changing the original validationMetrics should not affect the
copied model"
+ )
+ tvsModel.subModels[0] = 'foo'
+ self.assertNotEqual(
+ tvsModelCopied.subModels[0],
+ 'foo',
+ "Changing the original subModels should not affect the copied
model"
+ )
+
if __name__ == "__main__":
from pyspark.ml.tests.test_tuning import *
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index e564ff7..91f34ef 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -479,9 +479,9 @@ class CrossValidatorModel(Model, _CrossValidatorParams,
MLReadable, MLWritable):
if extra is None:
extra = dict()
bestModel = self.bestModel.copy(extra)
- avgMetrics = self.avgMetrics
- subModels = self.subModels
- return CrossValidatorModel(bestModel, avgMetrics, subModels)
+ avgMetrics = list(self.avgMetrics)
+ subModels = [model.copy() for model in self.subModels]
+ return self._copyValues(CrossValidatorModel(bestModel, avgMetrics,
subModels), extra=extra)
@since("2.3.0")
def write(self):
@@ -505,8 +505,17 @@ class CrossValidatorModel(Model, _CrossValidatorParams,
MLReadable, MLWritable):
avgMetrics = _java2py(sc, java_stage.avgMetrics())
estimator, epms, evaluator = super(CrossValidatorModel,
cls)._from_java_impl(java_stage)
- py_stage = cls(bestModel=bestModel,
avgMetrics=avgMetrics)._set(estimator=estimator)
- py_stage =
py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
+ py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)
+ params = {
+ "evaluator": evaluator,
+ "estimator": estimator,
+ "estimatorParamMaps": epms,
+ "numFolds": java_stage.getNumFolds(),
+ "foldCol": java_stage.getFoldCol(),
+ "seed": java_stage.getSeed(),
+ }
+ for param_name, param_val in params.items():
+ py_stage = py_stage._set(**{param_name: param_val})
if java_stage.hasSubModels():
py_stage.subModels = [[JavaParams._from_java(sub_model)
@@ -530,9 +539,18 @@ class CrossValidatorModel(Model, _CrossValidatorParams,
MLReadable, MLWritable):
_py2java(sc, self.avgMetrics))
estimator, epms, evaluator = super(CrossValidatorModel,
self)._to_java_impl()
- _java_obj.set("evaluator", evaluator)
- _java_obj.set("estimator", estimator)
- _java_obj.set("estimatorParamMaps", epms)
+ params = {
+ "evaluator": evaluator,
+ "estimator": estimator,
+ "estimatorParamMaps": epms,
+ "numFolds": self.getNumFolds(),
+ "foldCol": self.getFoldCol(),
+ "seed": self.getSeed(),
+ }
+ for param_name, param_val in params.items():
+ java_param = _java_obj.getParam(param_name)
+ pair = java_param.w(param_val)
+ _java_obj.set(pair)
if self.subModels is not None:
java_sub_models = [[sub_model._to_java() for sub_model in
fold_sub_models]
@@ -818,8 +836,11 @@ class TrainValidationSplitModel(Model,
_TrainValidationSplitParams, MLReadable,
extra = dict()
bestModel = self.bestModel.copy(extra)
validationMetrics = list(self.validationMetrics)
- subModels = self.subModels
- return TrainValidationSplitModel(bestModel, validationMetrics,
subModels)
+ subModels = [model.copy() for model in self.subModels]
+ return self._copyValues(
+ TrainValidationSplitModel(bestModel, validationMetrics, subModels),
+ extra=extra
+ )
@since("2.3.0")
def write(self):
@@ -847,8 +868,16 @@ class TrainValidationSplitModel(Model,
_TrainValidationSplitParams, MLReadable,
cls)._from_java_impl(java_stage)
# Create a new instance of this stage.
py_stage = cls(bestModel=bestModel,
-
validationMetrics=validationMetrics)._set(estimator=estimator)
- py_stage =
py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
+ validationMetrics=validationMetrics)
+ params = {
+ "evaluator": evaluator,
+ "estimator": estimator,
+ "estimatorParamMaps": epms,
+ "trainRatio": java_stage.getTrainRatio(),
+ "seed": java_stage.getSeed(),
+ }
+ for param_name, param_val in params.items():
+ py_stage = py_stage._set(**{param_name: param_val})
if java_stage.hasSubModels():
py_stage.subModels = [JavaParams._from_java(sub_model)
@@ -871,9 +900,17 @@ class TrainValidationSplitModel(Model,
_TrainValidationSplitParams, MLReadable,
_py2java(sc, self.validationMetrics))
estimator, epms, evaluator = super(TrainValidationSplitModel,
self)._to_java_impl()
- _java_obj.set("evaluator", evaluator)
- _java_obj.set("estimator", estimator)
- _java_obj.set("estimatorParamMaps", epms)
+ params = {
+ "evaluator": evaluator,
+ "estimator": estimator,
+ "estimatorParamMaps": epms,
+ "trainRatio": self.getTrainRatio(),
+ "seed": self.getSeed(),
+ }
+ for param_name, param_val in params.items():
+ java_param = _java_obj.getParam(param_name)
+ pair = java_param.w(param_val)
+ _java_obj.set(pair)
if self.subModels is not None:
java_sub_models = [sub_model._to_java() for sub_model in
self.subModels]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]