This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 99a979df0de [SPARK-45130][CONNECT][ML][PYTHON] Avoid Spark connect ML 
model to change input pandas dataframe
99a979df0de is described below

commit 99a979df0de95a966e1b0d780aa5329d4e62cbf7
Author: Weichen Xu <weichen...@databricks.com>
AuthorDate: Mon Sep 18 12:54:17 2023 +0800

    [SPARK-45130][CONNECT][ML][PYTHON] Avoid Spark connect ML model to change 
input pandas dataframe
    
    ### What changes were proposed in this pull request?
    
    Currently, to avoid data copy, Spark connect ML model directly changes 
input pandas dataframe for appending prediction columns. But we can use 
`pandas_df.copy(deep=False)` to shallow copy it and then append prediction 
columns in copied dataframe. This is easier for user to use it.
    
    ### Why are the changes needed?
    
    This makes `pyspark.ml.connect` model `transform` method has more similar 
behavior with `pyspark.ml` model, i.e., the input dataframe is intact after 
`transform` is called. Otherwise user might be surprise at the new behavior and 
have to change more code to migrate their workload to `pyspark.ml.connect`
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    Previous behavior:
    In `pyspark.ml.connect`, `model.transform` will append new columns into 
input pandas dataframe, and return input dataframe object.
    
    Changed behavior:
    In `pyspark.ml.connect`, `model.transform` will shallow copy input pandas 
dataframe and append new columns into shallow copied pandas dataframe, then 
return copied pandas dataframe.
    
    ### How was this patch tested?
    
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42887 from 
WeichenXu123/spark-ml-connect-model-avoid-change-input-dataframe.
    
    Authored-by: Weichen Xu <weichen...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/ml/connect/base.py                              |  8 ++++----
 python/pyspark/ml/connect/util.py                              |  1 +
 .../ml/tests/connect/test_legacy_mode_classification.py        |  7 ++++++-
 python/pyspark/ml/tests/connect/test_legacy_mode_feature.py    | 10 ++++++++--
 python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py   |  7 ++++++-
 5 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/ml/connect/base.py 
b/python/pyspark/ml/connect/base.py
index f8ce0cb6962..fdfcddf601c 100644
--- a/python/pyspark/ml/connect/base.py
+++ b/python/pyspark/ml/connect/base.py
@@ -151,11 +151,11 @@ class Transformer(Params, metaclass=ABCMeta):
     ) -> Union[DataFrame, pd.DataFrame]:
         """
         Transforms the input dataset.
-        The dataset can be either pandas dataframe or spark dataframe,
+        The dataset can be either pandas dataframe or spark dataframeļ¼Œ
         if it is a spark DataFrame, the result of transformation is a new 
spark DataFrame
-        that contains all existing columns and output columns with names.
-        if it is a pandas DataFrame, the input pandas dataframe is appended 
with output
-        columns in place.
+        that contains all existing columns and output columns with names,
+        If it is a pandas DataFrame, the result of transformation is a shallow 
copy
+        of the input pandas dataframe with output columns with names.
 
         Note: Transformers does not allow output column having the same name 
with
         existing columns.
diff --git a/python/pyspark/ml/connect/util.py 
b/python/pyspark/ml/connect/util.py
index a543a16d1fc..d05893ad2a0 100644
--- a/python/pyspark/ml/connect/util.py
+++ b/python/pyspark/ml/connect/util.py
@@ -147,6 +147,7 @@ def transform_dataframe_column(
         output_col_name, spark_udf_return_type = output_cols[0]
 
     if isinstance(dataframe, pd.DataFrame):
+        dataframe = dataframe.copy(deep=False)
         result_data = transform_fn(*[dataframe[col_name] for col_name in 
input_cols])
         if isinstance(result_data, pd.Series):
             assert len(output_cols) == 1
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
index 5e5f1b64a33..bc48b4bddd4 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
@@ -33,6 +33,7 @@ if should_test_connect:
         LogisticRegression as LORV2,
         LogisticRegressionModel as LORV2Model,
     )
+    import pandas as pd
 
 
 class ClassificationTestsMixin:
@@ -81,7 +82,11 @@ class ClassificationTestsMixin:
 
         result = model.transform(eval_df1).toPandas()
         self._check_result(result, expected_predictions, 
expected_probabilities)
-        local_transform_result = model.transform(eval_df1.toPandas())
+        pandas_eval_df1 = eval_df1.toPandas()
+        pandas_eval_df1_copy = pandas_eval_df1.copy()
+        local_transform_result = model.transform(pandas_eval_df1)
+        # assert that `transform` doesn't mutate the input dataframe.
+        pd.testing.assert_frame_equal(pandas_eval_df1, pandas_eval_df1_copy)
         self._check_result(local_transform_result, expected_predictions, 
expected_probabilities)
 
         model.set(model.probabilityCol, "")
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 4f8b74e1f70..7315590cdd3 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -25,6 +25,7 @@ import unittest
 from pyspark.sql import SparkSession
 from pyspark.testing.connectutils import should_test_connect, 
connect_requirement_message
 
+
 if should_test_connect:
     from pyspark.ml.connect.feature import (
         MaxAbsScaler,
@@ -32,6 +33,7 @@ if should_test_connect:
         StandardScaler,
         StandardScalerModel,
     )
+    import pandas as pd
 
 
 class FeatureTestsMixin:
@@ -57,8 +59,10 @@ class FeatureTestsMixin:
 
         local_df1 = df1.toPandas()
         local_fit_model = scaler.fit(local_df1)
+        local_df1_copy = local_df1.copy()
         local_transform_result = local_fit_model.transform(local_df1)
-        assert id(local_transform_result) == id(local_df1)
+        # assert that `transform` doesn't mutate the input dataframe.
+        pd.testing.assert_frame_equal(local_df1, local_df1_copy)
         assert list(local_transform_result.columns) == ["features", 
"scaled_features"]
 
         
np.testing.assert_allclose(list(local_transform_result.scaled_features), 
expected_result)
@@ -110,8 +114,10 @@ class FeatureTestsMixin:
 
         local_df1 = df1.toPandas()
         local_fit_model = scaler.fit(local_df1)
+        local_df1_copy = local_df1.copy()
         local_transform_result = local_fit_model.transform(local_df1)
-        assert id(local_transform_result) == id(local_df1)
+        # assert that `transform` doesn't mutate the input dataframe.
+        pd.testing.assert_frame_equal(local_df1, local_df1_copy)
         assert list(local_transform_result.columns) == ["features", 
"scaled_features"]
 
         
np.testing.assert_allclose(list(local_transform_result.scaled_features), 
expected_result)
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
index 009c17e5b05..34a2ed851bb 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
@@ -26,6 +26,7 @@ if should_test_connect:
     from pyspark.ml.connect.feature import StandardScaler
     from pyspark.ml.connect.classification import LogisticRegression as LORV2
     from pyspark.ml.connect.pipeline import Pipeline
+    import pandas as pd
 
 
 class PipelineTestsMixin:
@@ -81,7 +82,11 @@ class PipelineTestsMixin:
         model2 = pipeline2.fit(train_dataset)
         result2 = model2.transform(eval_dataset).toPandas()
         self._check_result(result2, expected_predictions, 
expected_probabilities)
-        local_transform_result2 = model2.transform(eval_dataset.toPandas())
+        local_eval_dataset = eval_dataset.toPandas()
+        local_eval_dataset_copy = local_eval_dataset.copy()
+        local_transform_result2 = model2.transform(local_eval_dataset)
+        # assert that `transform` doesn't mutate the input dataframe.
+        pd.testing.assert_frame_equal(local_eval_dataset, 
local_eval_dataset_copy)
         self._check_result(local_transform_result2, expected_predictions, 
expected_probabilities)
 
         with tempfile.TemporaryDirectory() as tmp_dir:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to