This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new f2ef7c493961 [SPARK-54600][PYTHON][ML][3.5] Don't use pickle to
save/load models in pyspark.ml.connect
f2ef7c493961 is described below
commit f2ef7c4939619857d32d7e29e19f1aebf3084ebe
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 5 12:49:57 2025 +0800
[SPARK-54600][PYTHON][ML][3.5] Don't use pickle to save/load models in
pyspark.ml.connect
backport https://github.com/apache/spark/pull/53269 to branch-3.5
Closes #53332 from zhengruifeng/ml_connect_model_ser_35.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/ml/connect/feature.py | 75 ++++++++++++----------
.../ml/tests/connect/test_legacy_mode_feature.py | 13 ----
2 files changed, 40 insertions(+), 48 deletions(-)
diff --git a/python/pyspark/ml/connect/feature.py
b/python/pyspark/ml/connect/feature.py
index 42b470246d50..b3832db4668c 100644
--- a/python/pyspark/ml/connect/feature.py
+++ b/python/pyspark/ml/connect/feature.py
@@ -17,7 +17,8 @@
import numpy as np
import pandas as pd
-import pickle
+import pyarrow as pa
+
from typing import Any, Union, List, Tuple, Callable, Dict, Optional
from pyspark import keyword_only
@@ -118,27 +119,29 @@ class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol,
ParamsReadWrite, CoreM
return transform_fn
def _get_core_model_filename(self) -> str:
- return self.__class__.__name__ + ".sklearn.pkl"
+ return self.__class__.__name__ + ".arrow.parquet"
def _save_core_model(self, path: str) -> None:
- from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler
-
- sk_model = sk_MaxAbsScaler()
- sk_model.scale_ = self.scale_values
- sk_model.max_abs_ = self.max_abs_values
- sk_model.n_features_in_ = len(self.max_abs_values) # type:
ignore[arg-type]
- sk_model.n_samples_seen_ = self.n_samples_seen
-
- with open(path, "wb") as fp:
- pickle.dump(sk_model, fp)
+ import pyarrow.parquet as pq
+
+ table = pa.Table.from_arrays(
+ [
+ pa.array([self.scale_values], pa.list_(pa.float64())),
+ pa.array([self.max_abs_values], pa.list_(pa.float64())),
+ pa.array([self.n_samples_seen], pa.int64()),
+ ],
+ names=["scale", "max_abs", "n_samples"],
+ )
+ pq.write_table(table, path)
def _load_core_model(self, path: str) -> None:
- with open(path, "rb") as fp:
- sk_model = pickle.load(fp)
+ import pyarrow.parquet as pq
+
+ table = pq.read_table(path)
- self.max_abs_values = sk_model.max_abs_
- self.scale_values = sk_model.scale_
- self.n_samples_seen = sk_model.n_samples_seen_
+ self.max_abs_values = np.array(table.column("scale")[0].as_py())
+ self.scale_values = np.array(table.column("max_abs")[0].as_py())
+ self.n_samples_seen = table.column("n_samples")[0].as_py()
class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
@@ -233,26 +236,28 @@ class StandardScalerModel(Model, HasInputCol,
HasOutputCol, ParamsReadWrite, Cor
return transform_fn
def _get_core_model_filename(self) -> str:
- return self.__class__.__name__ + ".sklearn.pkl"
+ return self.__class__.__name__ + ".arrow.parquet"
def _save_core_model(self, path: str) -> None:
- from sklearn.preprocessing import StandardScaler as sk_StandardScaler
-
- sk_model = sk_StandardScaler(with_mean=True, with_std=True)
- sk_model.scale_ = self.scale_values
- sk_model.var_ = self.std_values * self.std_values # type:
ignore[operator]
- sk_model.mean_ = self.mean_values
- sk_model.n_features_in_ = len(self.std_values) # type:
ignore[arg-type]
- sk_model.n_samples_seen_ = self.n_samples_seen
-
- with open(path, "wb") as fp:
- pickle.dump(sk_model, fp)
+ import pyarrow.parquet as pq
+
+ table = pa.Table.from_arrays(
+ [
+ pa.array([self.scale_values], pa.list_(pa.float64())),
+ pa.array([self.mean_values], pa.list_(pa.float64())),
+ pa.array([self.std_values], pa.list_(pa.float64())),
+ pa.array([self.n_samples_seen], pa.int64()),
+ ],
+ names=["scale", "mean", "std", "n_samples"],
+ )
+ pq.write_table(table, path)
def _load_core_model(self, path: str) -> None:
- with open(path, "rb") as fp:
- sk_model = pickle.load(fp)
+ import pyarrow.parquet as pq
+
+ table = pq.read_table(path)
- self.std_values = np.sqrt(sk_model.var_)
- self.scale_values = sk_model.scale_
- self.mean_values = sk_model.mean_
- self.n_samples_seen = sk_model.n_samples_seen_
+ self.scale_values = np.array(table.column("scale")[0].as_py())
+ self.mean_values = np.array(table.column("mean")[0].as_py())
+ self.std_values = np.array(table.column("std")[0].as_py())
+ self.n_samples_seen = table.column("n_samples")[0].as_py()
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 3aac4a0e0972..328effef233b 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -17,7 +17,6 @@
#
import os
-import pickle
import numpy as np
import tempfile
import unittest
@@ -75,12 +74,6 @@ class FeatureTestsMixin:
np.testing.assert_allclose(model.max_abs_values,
loaded_model.max_abs_values)
assert model.n_samples_seen == loaded_model.n_samples_seen
- # Test loading core model as scikit-learn model
- with open(os.path.join(model_path,
"MaxAbsScalerModel.sklearn.pkl"), "rb") as f:
- sk_model = pickle.load(f)
- sk_result =
sk_model.transform(np.stack(list(local_df1.features)))
- np.testing.assert_allclose(sk_result, expected_result)
-
def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
@@ -129,12 +122,6 @@ class FeatureTestsMixin:
np.testing.assert_allclose(model.scale_values,
loaded_model.scale_values)
assert model.n_samples_seen == loaded_model.n_samples_seen
- # Test loading core model as scikit-learn model
- with open(os.path.join(model_path,
"StandardScalerModel.sklearn.pkl"), "rb") as f:
- sk_model = pickle.load(f)
- sk_result =
sk_model.transform(np.stack(list(local_df1.features)))
- np.testing.assert_allclose(sk_result, expected_result)
-
class FeatureTests(FeatureTestsMixin, unittest.TestCase):
def setUp(self) -> None:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]