This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 41cac935c45e [SPARK-54600][PYTHON][ML] Don't use pickle to save/load 
models in `pyspark.ml.connect`
41cac935c45e is described below

commit 41cac935c45e6ca6da6aeefc243a649ddf546c9c
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 5 10:21:14 2025 +0800

    [SPARK-54600][PYTHON][ML] Don't use pickle to save/load models in 
`pyspark.ml.connect`
    
    ### What changes were proposed in this pull request?
     save/load models in `pyspark.ml.connect` with arrow-based parquet format
    
    ### Why are the changes needed?
    `pickle.load` can run arbitrary codes and cause security issues
    
    ### Does this PR introduce _any_ user-facing change?
    no, the whole module `pyspark.ml.connect` is never documented, so should 
not affect any end users
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53269 from zhengruifeng/ml_connect_model_ser.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit eaaa7ffe73391bf360891a964cfe76ecb3f88273)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/ml/connect/feature.py               | 74 ++++++++++++----------
 .../ml/tests/connect/test_legacy_mode_feature.py   | 13 ----
 2 files changed, 39 insertions(+), 48 deletions(-)

diff --git a/python/pyspark/ml/connect/feature.py 
b/python/pyspark/ml/connect/feature.py
index b0e2028e43fa..2184b3c7f332 100644
--- a/python/pyspark/ml/connect/feature.py
+++ b/python/pyspark/ml/connect/feature.py
@@ -15,11 +15,11 @@
 # limitations under the License.
 #
 
-import pickle
 from typing import Any, Union, List, Tuple, Callable, Dict, Optional
 
 import numpy as np
 import pandas as pd
+import pyarrow as pa
 
 from pyspark import keyword_only
 from pyspark.sql import DataFrame
@@ -133,27 +133,29 @@ class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, 
ParamsReadWrite, CoreM
         return transform_fn
 
     def _get_core_model_filename(self) -> str:
-        return self.__class__.__name__ + ".sklearn.pkl"
+        return self.__class__.__name__ + ".arrow.parquet"
 
     def _save_core_model(self, path: str) -> None:
-        from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler
-
-        sk_model = sk_MaxAbsScaler()
-        sk_model.scale_ = self.scale_values
-        sk_model.max_abs_ = self.max_abs_values
-        sk_model.n_features_in_ = len(self.max_abs_values)  # type: 
ignore[arg-type]
-        sk_model.n_samples_seen_ = self.n_samples_seen
-
-        with open(path, "wb") as fp:
-            pickle.dump(sk_model, fp)
+        import pyarrow.parquet as pq
+
+        table = pa.Table.from_arrays(
+            [
+                pa.array([self.scale_values], pa.list_(pa.float64())),
+                pa.array([self.max_abs_values], pa.list_(pa.float64())),
+                pa.array([self.n_samples_seen], pa.int64()),
+            ],
+            names=["scale", "max_abs", "n_samples"],
+        )
+        pq.write_table(table, path)
 
     def _load_core_model(self, path: str) -> None:
-        with open(path, "rb") as fp:
-            sk_model = pickle.load(fp)
+        import pyarrow.parquet as pq
+
+        table = pq.read_table(path)
 
-        self.max_abs_values = sk_model.max_abs_
-        self.scale_values = sk_model.scale_
-        self.n_samples_seen = sk_model.n_samples_seen_
+        self.max_abs_values = np.array(table.column("scale")[0].as_py())
+        self.scale_values = np.array(table.column("max_abs")[0].as_py())
+        self.n_samples_seen = table.column("n_samples")[0].as_py()
 
 
 class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
@@ -254,29 +256,31 @@ class StandardScalerModel(Model, HasInputCol, 
HasOutputCol, ParamsReadWrite, Cor
         return transform_fn
 
     def _get_core_model_filename(self) -> str:
-        return self.__class__.__name__ + ".sklearn.pkl"
+        return self.__class__.__name__ + ".arrow.parquet"
 
     def _save_core_model(self, path: str) -> None:
-        from sklearn.preprocessing import StandardScaler as sk_StandardScaler
-
-        sk_model = sk_StandardScaler(with_mean=True, with_std=True)
-        sk_model.scale_ = self.scale_values
-        sk_model.var_ = self.std_values * self.std_values  # type: 
ignore[operator]
-        sk_model.mean_ = self.mean_values
-        sk_model.n_features_in_ = len(self.std_values)  # type: 
ignore[arg-type]
-        sk_model.n_samples_seen_ = self.n_samples_seen
-
-        with open(path, "wb") as fp:
-            pickle.dump(sk_model, fp)
+        import pyarrow.parquet as pq
+
+        table = pa.Table.from_arrays(
+            [
+                pa.array([self.scale_values], pa.list_(pa.float64())),
+                pa.array([self.mean_values], pa.list_(pa.float64())),
+                pa.array([self.std_values], pa.list_(pa.float64())),
+                pa.array([self.n_samples_seen], pa.int64()),
+            ],
+            names=["scale", "mean", "std", "n_samples"],
+        )
+        pq.write_table(table, path)
 
     def _load_core_model(self, path: str) -> None:
-        with open(path, "rb") as fp:
-            sk_model = pickle.load(fp)
+        import pyarrow.parquet as pq
+
+        table = pq.read_table(path)
 
-        self.std_values = np.sqrt(sk_model.var_)
-        self.scale_values = sk_model.scale_
-        self.mean_values = sk_model.mean_
-        self.n_samples_seen = sk_model.n_samples_seen_
+        self.scale_values = np.array(table.column("scale")[0].as_py())
+        self.mean_values = np.array(table.column("mean")[0].as_py())
+        self.std_values = np.array(table.column("std")[0].as_py())
+        self.n_samples_seen = table.column("n_samples")[0].as_py()
 
 
 class ArrayAssembler(
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 2d0a37aca5c8..9fbf24d25342 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -17,7 +17,6 @@
 #
 
 import os
-import pickle
 import tempfile
 import unittest
 
@@ -85,12 +84,6 @@ class FeatureTestsMixin:
             np.testing.assert_allclose(model.max_abs_values, 
loaded_model.max_abs_values)
             assert model.n_samples_seen == loaded_model.n_samples_seen
 
-            # Test loading core model as scikit-learn model
-            with open(os.path.join(model_path, 
"MaxAbsScalerModel.sklearn.pkl"), "rb") as f:
-                sk_model = pickle.load(f)
-                sk_result = 
sk_model.transform(np.stack(list(local_df1.features)))
-                np.testing.assert_allclose(sk_result, expected_result)
-
     def test_standard_scaler(self):
         df1 = self.spark.createDataFrame(
             [
@@ -141,12 +134,6 @@ class FeatureTestsMixin:
             np.testing.assert_allclose(model.scale_values, 
loaded_model.scale_values)
             assert model.n_samples_seen == loaded_model.n_samples_seen
 
-            # Test loading core model as scikit-learn model
-            with open(os.path.join(model_path, 
"StandardScalerModel.sklearn.pkl"), "rb") as f:
-                sk_model = pickle.load(f)
-                sk_result = 
sk_model.transform(np.stack(list(local_df1.features)))
-                np.testing.assert_allclose(sk_result, expected_result)
-
     def test_array_assembler(self):
         spark_df = self.spark.createDataFrame(
             [


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to