This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 131cc30479e [SPARK-46148][PS] Fix pyspark.pandas.mlflow.load_model 
test (Python 3.12)
131cc30479e is described below

commit 131cc30479ee8dc17f4c5f1f9bad1de97371b924
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Wed Nov 29 12:10:24 2023 +0900

    [SPARK-46148][PS] Fix pyspark.pandas.mlflow.load_model test (Python 3.12)
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to explicitly name the columns for DataFrame used when 
predicting in mlflow (according to the mlflow spec) in order to fix the test 
failure below:
    
    ```
    File "/__w/spark/spark/python/pyspark/pandas/mlflow.py", line 172, in 
pyspark.pandas.mlflow.load_model
    Failed example:
        prediction_df
    Exception raised:
        Traceback (most recent call last):
          File "/usr/lib/python3.10/doctest.py", line 1350, in __run
            exec(compile(example.source, filename, "single",
          File "<doctest pyspark.pandas.mlflow.load_model[18]>", line 1, in 
<module>
            prediction_df
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13291, 
in __repr__
            pdf = cast("DataFrame", 
self._get_or_create_repr_pandas_cache(max_display_count))
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13282, 
in _get_or_create_repr_pandas_cache
            self, "_repr_pandas_cache", {n: self.head(n + 
1)._to_internal_pandas()}
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13277, 
in _to_internal_pandas
            return self._internal.to_pandas_frame
          File "/__w/spark/spark/python/pyspark/pandas/utils.py", line 599, in 
wrapped_lazy_property
            setattr(self, attr_name, fn(self))
          File "/__w/spark/spark/python/pyspark/pandas/internal.py", line 1110, 
in to_pandas_frame
            pdf = sdf.toPandas()
          File "/__w/spark/spark/python/pyspark/sql/pandas/conversion.py", line 
213, in toPandas
            rows = self.collect()
          File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 1369, 
in collect
            sock_info = self._jdf.collectToPython()
          File 
"/__w/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
            return_value = get_return_value(
          File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", 
line 188, in deco
            raise converted from None
        pyspark.errors.exceptions.captured.PythonException:
          An exception was thrown from the Python worker. Please see the stack 
trace below.
        Traceback (most recent call last):
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", 
line 1523, in main
            process()
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", 
line 1515, in process
            serializer.dump_stream(out_iter, outfile)
          File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 485, in dump_stream
            return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
          File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 101, in dump_stream
            for batch in iterator:
          File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 478, in init_stream_yield_batches
            for series in iterator:
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", 
line 1284, in func
            for result_batch, result_type in result_iter:
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1619, 
in udf
            yield _predict_row_batch(batch_predict_fn, row_batch_args)
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1383, 
in _predict_row_batch
            result = predict_fn(pdf, params)
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 1601, 
in batch_predict_fn
            return loaded_model.predict(pdf, params=params)
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 491, 
in predict
            return _predict()
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/pyfunc/__init__.py", line 477, 
in _predict
            return self._predict_fn(data, params=params)
          File 
"/usr/local/lib/python3.10/dist-packages/mlflow/sklearn/__init__.py", line 517, 
in predict
            return self.sklearn_model.predict(data)
          File 
"/usr/local/lib/python3.10/dist-packages/sklearn/linear_model/_base.py", line 
386, in predict
            return self._decision_function(X)
          File 
"/usr/local/lib/python3.10/dist-packages/sklearn/linear_model/_base.py", line 
369, in _decision_function
            X = self._validate_data(X, accept_sparse=["csr", "csc", "coo"], 
reset=False)
          File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 
580, in _validate_data
            self._check_feature_names(X, reset=reset)
          File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 
507, in _check_feature_names
            raise ValueError(message)
        ValueError: The feature names should match those that were passed 
during fit.
        Feature names unseen at fit time:
        - 0
        - 1
        Feature names seen at fit time, yet now missing:
        - x1
        - x2
    ```
    
    ### Why are the changes needed?
    
    For the proper test for Python 3.12. It is failing, see 
https://github.com/apache/spark/actions/runs/7020654429/job/19100965399
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes a real but for Python 3.12 support
    
    ### How was this patch tested?
    
    Fixed unittests. Manually tested via:
    
    ```bash
    ./python/run-tests --python-executables=python3  --testnames 
'pyspark.pandas.mlflow'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44064 from HyukjinKwon/SPARK-46148.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/pandas/mlflow.py | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/pandas/mlflow.py b/python/pyspark/pandas/mlflow.py
index a609f9b7069..9db8d628527 100644
--- a/python/pyspark/pandas/mlflow.py
+++ b/python/pyspark/pandas/mlflow.py
@@ -25,6 +25,7 @@ import pandas as pd
 import numpy as np
 
 from pyspark.sql.types import DataType
+from pyspark.sql.functions import struct
 from pyspark.pandas._typing import Label, Dtype
 from pyspark.pandas.utils import lazy_property, default_session
 from pyspark.pandas.frame import DataFrame
@@ -93,11 +94,8 @@ class PythonModelWrapper:
         if isinstance(data, pd.DataFrame):
             return self._model.predict(data)
         elif isinstance(data, DataFrame):
-            return_col = self._model_udf(*data._internal.data_spark_columns)
-            # TODO: the columns should be named according to the mlflow spec
-            # However, this is only possible with spark >= 3.0
-            # s = F.struct(*data.columns)
-            # return_col = self._model_udf(s)
+            s = struct(*data.columns)
+            return_col = self._model_udf(s)
             column_labels: List[Label] = [
                 (col,) for col in 
data._internal.spark_frame.select(return_col).columns
             ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to