This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 99a979df0de [SPARK-45130][CONNECT][ML][PYTHON] Avoid Spark connect ML model to change input pandas dataframe 99a979df0de is described below commit 99a979df0de95a966e1b0d780aa5329d4e62cbf7 Author: Weichen Xu <weichen...@databricks.com> AuthorDate: Mon Sep 18 12:54:17 2023 +0800 [SPARK-45130][CONNECT][ML][PYTHON] Avoid Spark connect ML model to change input pandas dataframe ### What changes were proposed in this pull request? Currently, to avoid data copy, Spark connect ML model directly changes input pandas dataframe for appending prediction columns. But we can use `pandas_df.copy(deep=False)` to shallow copy it and then append prediction columns in copied dataframe. This is easier for user to use it. ### Why are the changes needed? This makes `pyspark.ml.connect` model `transform` method has more similar behavior with `pyspark.ml` model, i.e., the input dataframe is intact after `transform` is called. Otherwise user might be surprise at the new behavior and have to change more code to migrate their workload to `pyspark.ml.connect` ### Does this PR introduce _any_ user-facing change? Yes. Previous behavior: In `pyspark.ml.connect`, `model.transform` will append new columns into input pandas dataframe, and return input dataframe object. Changed behavior: In `pyspark.ml.connect`, `model.transform` will shallow copy input pandas dataframe and append new columns into shallow copied pandas dataframe, then return copied pandas dataframe. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42887 from WeichenXu123/spark-ml-connect-model-avoid-change-input-dataframe. Authored-by: Weichen Xu <weichen...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/ml/connect/base.py | 8 ++++---- python/pyspark/ml/connect/util.py | 1 + .../ml/tests/connect/test_legacy_mode_classification.py | 7 ++++++- python/pyspark/ml/tests/connect/test_legacy_mode_feature.py | 10 ++++++++-- python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py | 7 ++++++- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/python/pyspark/ml/connect/base.py b/python/pyspark/ml/connect/base.py index f8ce0cb6962..fdfcddf601c 100644 --- a/python/pyspark/ml/connect/base.py +++ b/python/pyspark/ml/connect/base.py @@ -151,11 +151,11 @@ class Transformer(Params, metaclass=ABCMeta): ) -> Union[DataFrame, pd.DataFrame]: """ Transforms the input dataset. - The dataset can be either pandas dataframe or spark dataframe, + The dataset can be either pandas dataframe or spark dataframeļ¼ if it is a spark DataFrame, the result of transformation is a new spark DataFrame - that contains all existing columns and output columns with names. - if it is a pandas DataFrame, the input pandas dataframe is appended with output - columns in place. + that contains all existing columns and output columns with names, + If it is a pandas DataFrame, the result of transformation is a shallow copy + of the input pandas dataframe with output columns with names. Note: Transformers does not allow output column having the same name with existing columns. diff --git a/python/pyspark/ml/connect/util.py b/python/pyspark/ml/connect/util.py index a543a16d1fc..d05893ad2a0 100644 --- a/python/pyspark/ml/connect/util.py +++ b/python/pyspark/ml/connect/util.py @@ -147,6 +147,7 @@ def transform_dataframe_column( output_col_name, spark_udf_return_type = output_cols[0] if isinstance(dataframe, pd.DataFrame): + dataframe = dataframe.copy(deep=False) result_data = transform_fn(*[dataframe[col_name] for col_name in input_cols]) if isinstance(result_data, pd.Series): assert len(output_cols) == 1 diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py index 5e5f1b64a33..bc48b4bddd4 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py @@ -33,6 +33,7 @@ if should_test_connect: LogisticRegression as LORV2, LogisticRegressionModel as LORV2Model, ) + import pandas as pd class ClassificationTestsMixin: @@ -81,7 +82,11 @@ class ClassificationTestsMixin: result = model.transform(eval_df1).toPandas() self._check_result(result, expected_predictions, expected_probabilities) - local_transform_result = model.transform(eval_df1.toPandas()) + pandas_eval_df1 = eval_df1.toPandas() + pandas_eval_df1_copy = pandas_eval_df1.copy() + local_transform_result = model.transform(pandas_eval_df1) + # assert that `transform` doesn't mutate the input dataframe. + pd.testing.assert_frame_equal(pandas_eval_df1, pandas_eval_df1_copy) self._check_result(local_transform_result, expected_predictions, expected_probabilities) model.set(model.probabilityCol, "") diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py index 4f8b74e1f70..7315590cdd3 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py @@ -25,6 +25,7 @@ import unittest from pyspark.sql import SparkSession from pyspark.testing.connectutils import should_test_connect, connect_requirement_message + if should_test_connect: from pyspark.ml.connect.feature import ( MaxAbsScaler, @@ -32,6 +33,7 @@ if should_test_connect: StandardScaler, StandardScalerModel, ) + import pandas as pd class FeatureTestsMixin: @@ -57,8 +59,10 @@ class FeatureTestsMixin: local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) + local_df1_copy = local_df1.copy() local_transform_result = local_fit_model.transform(local_df1) - assert id(local_transform_result) == id(local_df1) + # assert that `transform` doesn't mutate the input dataframe. + pd.testing.assert_frame_equal(local_df1, local_df1_copy) assert list(local_transform_result.columns) == ["features", "scaled_features"] np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result) @@ -110,8 +114,10 @@ class FeatureTestsMixin: local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) + local_df1_copy = local_df1.copy() local_transform_result = local_fit_model.transform(local_df1) - assert id(local_transform_result) == id(local_df1) + # assert that `transform` doesn't mutate the input dataframe. + pd.testing.assert_frame_equal(local_df1, local_df1_copy) assert list(local_transform_result.columns) == ["features", "scaled_features"] np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py index 009c17e5b05..34a2ed851bb 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py @@ -26,6 +26,7 @@ if should_test_connect: from pyspark.ml.connect.feature import StandardScaler from pyspark.ml.connect.classification import LogisticRegression as LORV2 from pyspark.ml.connect.pipeline import Pipeline + import pandas as pd class PipelineTestsMixin: @@ -81,7 +82,11 @@ class PipelineTestsMixin: model2 = pipeline2.fit(train_dataset) result2 = model2.transform(eval_dataset).toPandas() self._check_result(result2, expected_predictions, expected_probabilities) - local_transform_result2 = model2.transform(eval_dataset.toPandas()) + local_eval_dataset = eval_dataset.toPandas() + local_eval_dataset_copy = local_eval_dataset.copy() + local_transform_result2 = model2.transform(local_eval_dataset) + # assert that `transform` doesn't mutate the input dataframe. + pd.testing.assert_frame_equal(local_eval_dataset, local_eval_dataset_copy) self._check_result(local_transform_result2, expected_predictions, expected_probabilities) with tempfile.TemporaryDirectory() as tmp_dir: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org