This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 41cac935c45e [SPARK-54600][PYTHON][ML] Don't use pickle to save/load
models in `pyspark.ml.connect`
41cac935c45e is described below
commit 41cac935c45e6ca6da6aeefc243a649ddf546c9c
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 5 10:21:14 2025 +0800
[SPARK-54600][PYTHON][ML] Don't use pickle to save/load models in
`pyspark.ml.connect`
### What changes were proposed in this pull request?
save/load models in `pyspark.ml.connect` with arrow-based parquet format
### Why are the changes needed?
`pickle.load` can run arbitrary codes and cause security issues
### Does this PR introduce _any_ user-facing change?
no, the whole module `pyspark.ml.connect` is never documented, so should
not affect any end users
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53269 from zhengruifeng/ml_connect_model_ser.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit eaaa7ffe73391bf360891a964cfe76ecb3f88273)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/ml/connect/feature.py | 74 ++++++++++++----------
.../ml/tests/connect/test_legacy_mode_feature.py | 13 ----
2 files changed, 39 insertions(+), 48 deletions(-)
diff --git a/python/pyspark/ml/connect/feature.py
b/python/pyspark/ml/connect/feature.py
index b0e2028e43fa..2184b3c7f332 100644
--- a/python/pyspark/ml/connect/feature.py
+++ b/python/pyspark/ml/connect/feature.py
@@ -15,11 +15,11 @@
# limitations under the License.
#
-import pickle
from typing import Any, Union, List, Tuple, Callable, Dict, Optional
import numpy as np
import pandas as pd
+import pyarrow as pa
from pyspark import keyword_only
from pyspark.sql import DataFrame
@@ -133,27 +133,29 @@ class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol,
ParamsReadWrite, CoreM
return transform_fn
def _get_core_model_filename(self) -> str:
- return self.__class__.__name__ + ".sklearn.pkl"
+ return self.__class__.__name__ + ".arrow.parquet"
def _save_core_model(self, path: str) -> None:
- from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler
-
- sk_model = sk_MaxAbsScaler()
- sk_model.scale_ = self.scale_values
- sk_model.max_abs_ = self.max_abs_values
- sk_model.n_features_in_ = len(self.max_abs_values) # type:
ignore[arg-type]
- sk_model.n_samples_seen_ = self.n_samples_seen
-
- with open(path, "wb") as fp:
- pickle.dump(sk_model, fp)
+ import pyarrow.parquet as pq
+
+ table = pa.Table.from_arrays(
+ [
+ pa.array([self.scale_values], pa.list_(pa.float64())),
+ pa.array([self.max_abs_values], pa.list_(pa.float64())),
+ pa.array([self.n_samples_seen], pa.int64()),
+ ],
+ names=["scale", "max_abs", "n_samples"],
+ )
+ pq.write_table(table, path)
def _load_core_model(self, path: str) -> None:
- with open(path, "rb") as fp:
- sk_model = pickle.load(fp)
+ import pyarrow.parquet as pq
+
+ table = pq.read_table(path)
- self.max_abs_values = sk_model.max_abs_
- self.scale_values = sk_model.scale_
- self.n_samples_seen = sk_model.n_samples_seen_
+ self.max_abs_values = np.array(table.column("scale")[0].as_py())
+ self.scale_values = np.array(table.column("max_abs")[0].as_py())
+ self.n_samples_seen = table.column("n_samples")[0].as_py()
class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
@@ -254,29 +256,31 @@ class StandardScalerModel(Model, HasInputCol,
HasOutputCol, ParamsReadWrite, Cor
return transform_fn
def _get_core_model_filename(self) -> str:
- return self.__class__.__name__ + ".sklearn.pkl"
+ return self.__class__.__name__ + ".arrow.parquet"
def _save_core_model(self, path: str) -> None:
- from sklearn.preprocessing import StandardScaler as sk_StandardScaler
-
- sk_model = sk_StandardScaler(with_mean=True, with_std=True)
- sk_model.scale_ = self.scale_values
- sk_model.var_ = self.std_values * self.std_values # type:
ignore[operator]
- sk_model.mean_ = self.mean_values
- sk_model.n_features_in_ = len(self.std_values) # type:
ignore[arg-type]
- sk_model.n_samples_seen_ = self.n_samples_seen
-
- with open(path, "wb") as fp:
- pickle.dump(sk_model, fp)
+ import pyarrow.parquet as pq
+
+ table = pa.Table.from_arrays(
+ [
+ pa.array([self.scale_values], pa.list_(pa.float64())),
+ pa.array([self.mean_values], pa.list_(pa.float64())),
+ pa.array([self.std_values], pa.list_(pa.float64())),
+ pa.array([self.n_samples_seen], pa.int64()),
+ ],
+ names=["scale", "mean", "std", "n_samples"],
+ )
+ pq.write_table(table, path)
def _load_core_model(self, path: str) -> None:
- with open(path, "rb") as fp:
- sk_model = pickle.load(fp)
+ import pyarrow.parquet as pq
+
+ table = pq.read_table(path)
- self.std_values = np.sqrt(sk_model.var_)
- self.scale_values = sk_model.scale_
- self.mean_values = sk_model.mean_
- self.n_samples_seen = sk_model.n_samples_seen_
+ self.scale_values = np.array(table.column("scale")[0].as_py())
+ self.mean_values = np.array(table.column("mean")[0].as_py())
+ self.std_values = np.array(table.column("std")[0].as_py())
+ self.n_samples_seen = table.column("n_samples")[0].as_py()
class ArrayAssembler(
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 2d0a37aca5c8..9fbf24d25342 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -17,7 +17,6 @@
#
import os
-import pickle
import tempfile
import unittest
@@ -85,12 +84,6 @@ class FeatureTestsMixin:
np.testing.assert_allclose(model.max_abs_values,
loaded_model.max_abs_values)
assert model.n_samples_seen == loaded_model.n_samples_seen
- # Test loading core model as scikit-learn model
- with open(os.path.join(model_path,
"MaxAbsScalerModel.sklearn.pkl"), "rb") as f:
- sk_model = pickle.load(f)
- sk_result =
sk_model.transform(np.stack(list(local_df1.features)))
- np.testing.assert_allclose(sk_result, expected_result)
-
def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
@@ -141,12 +134,6 @@ class FeatureTestsMixin:
np.testing.assert_allclose(model.scale_values,
loaded_model.scale_values)
assert model.n_samples_seen == loaded_model.n_samples_seen
- # Test loading core model as scikit-learn model
- with open(os.path.join(model_path,
"StandardScalerModel.sklearn.pkl"), "rb") as f:
- sk_model = pickle.load(f)
- sk_result =
sk_model.transform(np.stack(list(local_df1.features)))
- np.testing.assert_allclose(sk_result, expected_result)
-
def test_array_assembler(self):
spark_df = self.spark.createDataFrame(
[
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]