This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 131cc30479e [SPARK-46148][PS] Fix pyspark.pandas.mlflow.load_model test (Python 3.12) 131cc30479e is described below commit 131cc30479ee8dc17f4c5f1f9bad1de97371b924 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Nov 29 12:10:24 2023 +0900 [SPARK-46148][PS] Fix pyspark.pandas.mlflow.load_model test (Python 3.12) ### What changes were proposed in this pull request? This PR proposes to explicitly name the columns for DataFrame used when predicting in mlflow (according to the mlflow spec) in order to fix the test failure below: ``` File "/__w/spark/spark/python/pyspark/pandas/mlflow.py", line 172, in pyspark.pandas.mlflow.load_model Failed example: prediction_df Exception raised: Traceback (most recent call last): File "/usr/lib/python3.10/doctest.py", line 1350, in __run exec(compile(example.source, filename, "single", File "<doctest pyspark.pandas.mlflow.load_model[18]>", line 1, in <module> prediction_df File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13291, in __repr__ pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count)) File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13282, in _get_or_create_repr_pandas_cache self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()} File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13277, in _to_internal_pandas return self._internal.to_pandas_frame File "/__w/spark/spark/python/pyspark/pandas/utils.py", line 599, in wrapped_lazy_property setattr(self, attr_name, fn(self)) File "/__w/spark/spark/python/pyspark/pandas/internal.py", line 1110, in to_pandas_frame pdf = sdf.toPandas() File "/__w/spark/spark/python/pyspark/sql/pandas/conversion.py", line 213, in toPandas rows = self.collect() File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 1369, in collect sock_info = self._jdf.collectToPython() File "/__w/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 188, in deco raise converted from None pyspark.errors.exceptions.captured.PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1523, in main process() File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1515, in process serializer.dump_stream(out_iter, outfile) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 485, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 101, in dump_stream for batch in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 478, in init_stream_yield_batches for series in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1284, in func for result_batch, result_type in result_iter: File "/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1619, in udf yield _predict_row_batch(batch_predict_fn, row_batch_args) File "/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1383, in _predict_row_batch result = predict_fn(pdf, params) File "/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1601, in batch_predict_fn return loaded_model.predict(pdf, params=params) File "/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 491, in predict return _predict() File "/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 477, in _predict return self._predict_fn(data, params=params) File "/usr/local/lib/python3.10/dist-packages/mlflow/sklearn/__init__.py", line 517, in predict return self.sklearn_model.predict(data) File "/usr/local/lib/python3.10/dist-packages/sklearn/linear_model/_base.py", line 386, in predict return self._decision_function(X) File "/usr/local/lib/python3.10/dist-packages/sklearn/linear_model/_base.py", line 369, in _decision_function X = self._validate_data(X, accept_sparse=["csr", "csc", "coo"], reset=False) File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 580, in _validate_data self._check_feature_names(X, reset=reset) File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 507, in _check_feature_names raise ValueError(message) ValueError: The feature names should match those that were passed during fit. Feature names unseen at fit time: - 0 - 1 Feature names seen at fit time, yet now missing: - x1 - x2 ``` ### Why are the changes needed? For the proper test for Python 3.12. It is failing, see https://github.com/apache/spark/actions/runs/7020654429/job/19100965399 ### Does this PR introduce _any_ user-facing change? Yes, it fixes a real but for Python 3.12 support ### How was this patch tested? Fixed unittests. Manually tested via: ```bash ./python/run-tests --python-executables=python3 --testnames 'pyspark.pandas.mlflow' ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44064 from HyukjinKwon/SPARK-46148. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/mlflow.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/mlflow.py b/python/pyspark/pandas/mlflow.py index a609f9b7069..9db8d628527 100644 --- a/python/pyspark/pandas/mlflow.py +++ b/python/pyspark/pandas/mlflow.py @@ -25,6 +25,7 @@ import pandas as pd import numpy as np from pyspark.sql.types import DataType +from pyspark.sql.functions import struct from pyspark.pandas._typing import Label, Dtype from pyspark.pandas.utils import lazy_property, default_session from pyspark.pandas.frame import DataFrame @@ -93,11 +94,8 @@ class PythonModelWrapper: if isinstance(data, pd.DataFrame): return self._model.predict(data) elif isinstance(data, DataFrame): - return_col = self._model_udf(*data._internal.data_spark_columns) - # TODO: the columns should be named according to the mlflow spec - # However, this is only possible with spark >= 3.0 - # s = F.struct(*data.columns) - # return_col = self._model_udf(s) + s = struct(*data.columns) + return_col = self._model_udf(s) column_labels: List[Label] = [ (col,) for col in data._internal.spark_frame.select(return_col).columns ] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org