This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new f2ef7c493961 [SPARK-54600][PYTHON][ML][3.5] Don't use pickle to 
save/load models in pyspark.ml.connect
f2ef7c493961 is described below

commit f2ef7c4939619857d32d7e29e19f1aebf3084ebe
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 5 12:49:57 2025 +0800

    [SPARK-54600][PYTHON][ML][3.5] Don't use pickle to save/load models in 
pyspark.ml.connect
    
    backport https://github.com/apache/spark/pull/53269 to branch-3.5
    
    Closes #53332 from zhengruifeng/ml_connect_model_ser_35.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/ml/connect/feature.py               | 75 ++++++++++++----------
 .../ml/tests/connect/test_legacy_mode_feature.py   | 13 ----
 2 files changed, 40 insertions(+), 48 deletions(-)

diff --git a/python/pyspark/ml/connect/feature.py 
b/python/pyspark/ml/connect/feature.py
index 42b470246d50..b3832db4668c 100644
--- a/python/pyspark/ml/connect/feature.py
+++ b/python/pyspark/ml/connect/feature.py
@@ -17,7 +17,8 @@
 
 import numpy as np
 import pandas as pd
-import pickle
+import pyarrow as pa
+
 from typing import Any, Union, List, Tuple, Callable, Dict, Optional
 
 from pyspark import keyword_only
@@ -118,27 +119,29 @@ class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, 
ParamsReadWrite, CoreM
         return transform_fn
 
     def _get_core_model_filename(self) -> str:
-        return self.__class__.__name__ + ".sklearn.pkl"
+        return self.__class__.__name__ + ".arrow.parquet"
 
     def _save_core_model(self, path: str) -> None:
-        from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler
-
-        sk_model = sk_MaxAbsScaler()
-        sk_model.scale_ = self.scale_values
-        sk_model.max_abs_ = self.max_abs_values
-        sk_model.n_features_in_ = len(self.max_abs_values)  # type: 
ignore[arg-type]
-        sk_model.n_samples_seen_ = self.n_samples_seen
-
-        with open(path, "wb") as fp:
-            pickle.dump(sk_model, fp)
+        import pyarrow.parquet as pq
+
+        table = pa.Table.from_arrays(
+            [
+                pa.array([self.scale_values], pa.list_(pa.float64())),
+                pa.array([self.max_abs_values], pa.list_(pa.float64())),
+                pa.array([self.n_samples_seen], pa.int64()),
+            ],
+            names=["scale", "max_abs", "n_samples"],
+        )
+        pq.write_table(table, path)
 
     def _load_core_model(self, path: str) -> None:
-        with open(path, "rb") as fp:
-            sk_model = pickle.load(fp)
+        import pyarrow.parquet as pq
+
+        table = pq.read_table(path)
 
-        self.max_abs_values = sk_model.max_abs_
-        self.scale_values = sk_model.scale_
-        self.n_samples_seen = sk_model.n_samples_seen_
+        self.max_abs_values = np.array(table.column("scale")[0].as_py())
+        self.scale_values = np.array(table.column("max_abs")[0].as_py())
+        self.n_samples_seen = table.column("n_samples")[0].as_py()
 
 
 class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
@@ -233,26 +236,28 @@ class StandardScalerModel(Model, HasInputCol, 
HasOutputCol, ParamsReadWrite, Cor
         return transform_fn
 
     def _get_core_model_filename(self) -> str:
-        return self.__class__.__name__ + ".sklearn.pkl"
+        return self.__class__.__name__ + ".arrow.parquet"
 
     def _save_core_model(self, path: str) -> None:
-        from sklearn.preprocessing import StandardScaler as sk_StandardScaler
-
-        sk_model = sk_StandardScaler(with_mean=True, with_std=True)
-        sk_model.scale_ = self.scale_values
-        sk_model.var_ = self.std_values * self.std_values  # type: 
ignore[operator]
-        sk_model.mean_ = self.mean_values
-        sk_model.n_features_in_ = len(self.std_values)  # type: 
ignore[arg-type]
-        sk_model.n_samples_seen_ = self.n_samples_seen
-
-        with open(path, "wb") as fp:
-            pickle.dump(sk_model, fp)
+        import pyarrow.parquet as pq
+
+        table = pa.Table.from_arrays(
+            [
+                pa.array([self.scale_values], pa.list_(pa.float64())),
+                pa.array([self.mean_values], pa.list_(pa.float64())),
+                pa.array([self.std_values], pa.list_(pa.float64())),
+                pa.array([self.n_samples_seen], pa.int64()),
+            ],
+            names=["scale", "mean", "std", "n_samples"],
+        )
+        pq.write_table(table, path)
 
     def _load_core_model(self, path: str) -> None:
-        with open(path, "rb") as fp:
-            sk_model = pickle.load(fp)
+        import pyarrow.parquet as pq
+
+        table = pq.read_table(path)
 
-        self.std_values = np.sqrt(sk_model.var_)
-        self.scale_values = sk_model.scale_
-        self.mean_values = sk_model.mean_
-        self.n_samples_seen = sk_model.n_samples_seen_
+        self.scale_values = np.array(table.column("scale")[0].as_py())
+        self.mean_values = np.array(table.column("mean")[0].as_py())
+        self.std_values = np.array(table.column("std")[0].as_py())
+        self.n_samples_seen = table.column("n_samples")[0].as_py()
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 3aac4a0e0972..328effef233b 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -17,7 +17,6 @@
 #
 
 import os
-import pickle
 import numpy as np
 import tempfile
 import unittest
@@ -75,12 +74,6 @@ class FeatureTestsMixin:
             np.testing.assert_allclose(model.max_abs_values, 
loaded_model.max_abs_values)
             assert model.n_samples_seen == loaded_model.n_samples_seen
 
-            # Test loading core model as scikit-learn model
-            with open(os.path.join(model_path, 
"MaxAbsScalerModel.sklearn.pkl"), "rb") as f:
-                sk_model = pickle.load(f)
-                sk_result = 
sk_model.transform(np.stack(list(local_df1.features)))
-                np.testing.assert_allclose(sk_result, expected_result)
-
     def test_standard_scaler(self):
         df1 = self.spark.createDataFrame(
             [
@@ -129,12 +122,6 @@ class FeatureTestsMixin:
             np.testing.assert_allclose(model.scale_values, 
loaded_model.scale_values)
             assert model.n_samples_seen == loaded_model.n_samples_seen
 
-            # Test loading core model as scikit-learn model
-            with open(os.path.join(model_path, 
"StandardScalerModel.sklearn.pkl"), "rb") as f:
-                sk_model = pickle.load(f)
-                sk_result = 
sk_model.transform(np.stack(list(local_df1.features)))
-                np.testing.assert_allclose(sk_result, expected_result)
-
 
 class FeatureTests(FeatureTestsMixin, unittest.TestCase):
     def setUp(self) -> None:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to