This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c115bff0cc [SPARK-55502][PYTHON] Unify UDF and UDTF Arrow conversion 
error handling
35c115bff0cc is described below

commit 35c115bff0cca987ccc03f33ef0534a2f0205d0d
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Mar 10 09:12:41 2026 +0800

    [SPARK-55502][PYTHON] Unify UDF and UDTF Arrow conversion error handling
    
    ## What changes were proposed in this pull request?
    
    Remove the `is_udtf` parameter from `PandasToArrowConversion.convert()` and 
add a new `is_legacy` parameter to unify error handling for both UDF and UDTF 
Arrow conversions.
    
    **Key changes**:
    - Removed `is_udtf` parameter and added `is_legacy` for clarity — it 
controls exception catch breadth and error message style, not UDTF-specific 
behavior
    - Removed UDTF-specific error condition `UDTF_ARROW_TYPE_CAST_ERROR` and 
replaced with unified `PySparkTypeError`/`PySparkValueError`
    - Legacy path (broad `ArrowException` catch): keeps original `"Exception 
thrown when converting pandas.Series..."` error format
    - Non-legacy path (narrow `ArrowInvalid` catch): uses new user-friendly 
error messages, with separate messages for TypeError and ValueError
    
    ## Why are the changes needed?
    
    The UDTF-specific `UDTF_ARROW_TYPE_CAST_ERROR` error condition was 
unnecessary — the same conversion errors occur in both UDF and UDTF contexts. 
Unifying error handling provides:
    - Clearer parameter semantics
    - Simpler, more maintainable code
    - Consistent, user-friendly error messages across UDF/UDTF
    
    ## Does this PR introduce any user-facing change?
    
    Yes, error messages change for the non-legacy path (UDF/UDTF with 
`spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled=false`):
    
    **TypeError** (e.g. int → struct type mismatch):
    
    Before:
    ```
    PySparkTypeError: Exception thrown when converting pandas.Series (int64) 
with name 'x' to Arrow Array (struct<a: int32>).
    ```
    
    After:
    ```
    PySparkTypeError: Cannot convert the output value of the column 'x' with 
type 'int64' to the specified return type of the column: 'struct<a: int32>'. 
Please check if the data types match and try again.
    ```
    
    **ValueError** (e.g. string → double value error):
    
    Before:
    ```
    PySparkValueError: Exception thrown when converting pandas.Series (object) 
with name 'val' to Arrow Array (double).
    ```
    
    After:
    ```
    PySparkValueError: Failed to convert the value of the column 'val' with 
type 'object' to Arrow type 'double'.
    ```
    
    Legacy UDTF path error messages remain unchanged.
    
    ## How was this patch tested?
    
    Updated existing unit tests.
    
    ## Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54398 from Yicong-Huang/SPARK-55502/refactor/eliminate-is-udtf-flag.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/errors/error-conditions.json        |  7 +-
 python/pyspark/sql/conversion.py                   | 81 +++++++++++++---------
 python/pyspark/sql/pandas/serializers.py           | 12 ++--
 .../streaming/test_pandas_transform_with_state.py  |  4 +-
 .../sql/tests/pandas/test_pandas_cogrouped_map.py  | 19 +++--
 .../sql/tests/pandas/test_pandas_grouped_map.py    | 19 +++--
 python/pyspark/sql/tests/pandas/test_pandas_map.py | 24 +++----
 python/pyspark/sql/tests/pandas/test_pandas_udf.py | 10 +--
 python/pyspark/sql/tests/test_conversion.py        | 22 +++---
 python/pyspark/sql/tests/test_udtf.py              | 18 ++---
 10 files changed, 110 insertions(+), 106 deletions(-)

diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index b0cf4c084e20..3111fbd54077 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1258,12 +1258,7 @@
       "Cannot convert UDTF output to Arrow. Data: <data>. Schema: <schema>. 
Arrow Schema: <arrow_schema>."
     ]
   },
-  "UDTF_ARROW_TYPE_CAST_ERROR": {
-    "message": [
-      "Cannot convert the output value of the column '<col_name>' with type 
'<col_type>' to the specified return type of the column: '<arrow_type>'. Please 
check if the data types match and try again."
-    ]
-  },
-  "UDTF_ARROW_TYPE_CONVERSION_ERROR": {
+"UDTF_ARROW_TYPE_CONVERSION_ERROR": {
     "message": [
       "PyArrow UDTF must return an iterator of pyarrow.Table or 
pyarrow.RecordBatch objects."
     ]
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 7e6287fce07e..249bd2ffcb20 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -232,7 +232,7 @@ class PandasToArrowConversion:
         assign_cols_by_name: bool = False,
         int_to_decimal_coercion_enabled: bool = False,
         ignore_unexpected_complex_type_values: bool = False,
-        is_udtf: bool = False,
+        is_legacy: bool = False,
     ) -> "pa.RecordBatch":
         """
         Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
@@ -259,14 +259,13 @@ class PandasToArrowConversion:
             Whether to enable int to decimal coercion (default False)
         ignore_unexpected_complex_type_values : bool
             Whether to ignore unexpected complex type values in converter 
(default False)
-        is_udtf : bool
-            Whether this conversion is for a UDTF. UDTFs use broader Arrow 
exception
-            handling to allow more type coercions (e.g., struct field casting 
via
-            ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
-            # TODO(SPARK-55502): Unify UDTF and regular UDF conversion paths to
-            #   eliminate the is_udtf flag.
-            Regular UDFs only catch ArrowInvalid to preserve legacy behavior 
where
-            e.g. string->decimal must raise an error. (default False)
+        is_legacy : bool
+            Whether to use the legacy pandas-to-Arrow conversion path. The 
legacy
+            path uses broader Arrow exception handling (ArrowException) to 
allow
+            more implicit type coercions (e.g., int->boolean, dict->struct via
+            ArrowTypeError). The non-legacy path only catches ArrowInvalid for
+            the cast fallback, so type mismatches like string->decimal raise
+            immediately. (default False)
 
         Returns
         -------
@@ -275,7 +274,7 @@ class PandasToArrowConversion:
         import pyarrow as pa
         import pandas as pd
 
-        from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
+        from pyspark.errors import PySparkTypeError, PySparkValueError
         from pyspark.sql.pandas.types import to_arrow_type, 
_create_converter_from_pandas
 
         # Handle empty schema (0 columns)
@@ -322,7 +321,7 @@ class PandasToArrowConversion:
                     assign_cols_by_name=assign_cols_by_name,
                     
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
                     
ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values,
-                    is_udtf=is_udtf,
+                    is_legacy=is_legacy,
                 )
                 # Wrap the nested RecordBatch as a single StructArray column
                 return 
ArrowBatchTransformer.wrap_struct(nested_batch).column(0)
@@ -347,9 +346,10 @@ class PandasToArrowConversion:
 
             mask = None if hasattr(series.array, "__arrow_array__") else 
series.isnull()
 
-            if is_udtf:
-                # UDTF path: broad ArrowException catch so that both 
ArrowInvalid
-                # AND ArrowTypeError (e.g. dict→struct) trigger the cast 
fallback.
+            if is_legacy:
+                # Legacy pandas conversion path: broad ArrowException catch so
+                # that both ArrowInvalid AND ArrowTypeError (e.g. dict->struct)
+                # trigger the cast fallback.
                 try:
                     try:
                         return pa.Array.from_pandas(
@@ -361,18 +361,26 @@ class PandasToArrowConversion:
                                 target_type=arrow_type, safe=safecheck
                             )
                         raise
-                except pa.lib.ArrowException:  # convert any Arrow error to 
user-friendly message
-                    raise PySparkRuntimeError(
-                        errorClass="UDTF_ARROW_TYPE_CAST_ERROR",
-                        messageParameters={
-                            "col_name": field_name,
-                            "col_type": str(series.dtype),
-                            "arrow_type": str(arrow_type),
-                        },
-                    ) from None
+                except pa.lib.ArrowException as e:
+                    error_msg = (
+                        "Exception thrown when converting pandas.Series (%s) "
+                        "with name '%s' to Arrow Array (%s)."
+                        % (series.dtype, field_name, arrow_type)
+                    )
+                    if isinstance(e, TypeError):
+                        raise PySparkTypeError(error_msg) from e
+                    if safecheck:
+                        error_msg += (
+                            " It can be caused by overflows or other "
+                            "unsafe conversions warned by Arrow. Arrow safe "
+                            "type check can be disabled by using SQL config "
+                            "`spark.sql.execution.pandas."
+                            "convertToArrowArraySafely`."
+                        )
+                    raise PySparkValueError(error_msg) from e
             else:
-                # UDF path: only ArrowInvalid triggers the cast fallback.
-                # ArrowTypeError (e.g. string→decimal) must NOT be silently 
cast.
+                # Non-legacy path: only ArrowInvalid triggers the cast 
fallback.
+                # ArrowTypeError (e.g. string->decimal) must NOT be silently 
cast.
                 try:
                     try:
                         return pa.Array.from_pandas(
@@ -384,21 +392,26 @@ class PandasToArrowConversion:
                                 target_type=arrow_type, safe=safecheck
                             )
                         raise
-                except TypeError as e:  # includes pa.lib.ArrowTypeError
+                except TypeError as e:
                     raise PySparkTypeError(
-                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
-                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                        f"Cannot convert the output value of the column "
+                        f"'{field_name}' with type '{series.dtype}' to the "
+                        f"specified return type of the column: '{arrow_type}'."
+                        f" Please check if the data types match and try again."
                     ) from e
-                except ValueError as e:  # includes pa.lib.ArrowInvalid
+                except ValueError as e:
                     error_msg = (
-                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
-                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                        f"Failed to convert the value of the column "
+                        f"'{field_name}' with type '{series.dtype}' to Arrow "
+                        f"type '{arrow_type}'."
                     )
                     if safecheck:
                         error_msg += (
-                            " It can be caused by overflows or other unsafe 
conversions "
-                            "warned by Arrow. Arrow safe type check can be 
disabled by using "
-                            "SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`."
+                            " It can be caused by overflows or other unsafe "
+                            "conversions warned by Arrow. Arrow safe type "
+                            "check can be disabled by using SQL config "
+                            "`spark.sql.execution.pandas."
+                            "convertToArrowArraySafely`."
                         )
                     raise PySparkValueError(error_msg) from e
 
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 84f25a7bb9f5..9806f5ac0849 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -521,7 +521,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
         int_to_decimal_coercion_enabled: bool = False,
         prefers_large_types: bool = False,
         ignore_unexpected_complex_type_values: bool = False,
-        is_udtf: bool = False,
+        is_legacy: bool = False,
     ):
         super().__init__(
             timezone=timezone,
@@ -537,7 +537,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
         )
         self._assign_cols_by_name = assign_cols_by_name
         self._ignore_unexpected_complex_type_values = 
ignore_unexpected_complex_type_values
-        self._is_udtf = is_udtf
+        self._is_legacy = is_legacy
 
     def dump_stream(self, iterator, stream):
         """
@@ -576,7 +576,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
                 assign_cols_by_name=self._assign_cols_by_name,
                 
int_to_decimal_coercion_enabled=self._int_to_decimal_coercion_enabled,
                 
ignore_unexpected_complex_type_values=self._ignore_unexpected_complex_type_values,
-                is_udtf=self._is_udtf,
+                is_legacy=self._is_legacy,
             )
 
         batches = self._write_stream_start(
@@ -787,9 +787,9 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
             int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
             # UDTF-specific: ignore unexpected complex type values in converter
             ignore_unexpected_complex_type_values=True,
-            # UDTF-specific: enables broader Arrow exception handling and
-            # converts errors to UDTF_ARROW_TYPE_CAST_ERROR
-            is_udtf=True,
+            # Legacy UDTF pandas conversion: enables broader Arrow exception
+            # handling to allow more implicit type coercions
+            is_legacy=True,
         )
 
     def __repr__(self):
diff --git 
a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py
index 77cbe84f57e7..7a34c5ddf61b 100644
--- 
a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py
+++ 
b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py
@@ -1382,9 +1382,7 @@ class TransformWithStateTestsMixin:
         with self.sql_conf(
             
{"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False}
         ):
-            with self.assertRaisesRegex(
-                Exception, "Exception thrown when converting pandas.Series"
-            ):
+            with self.assertRaisesRegex(Exception, "Failed to convert the 
value"):
                 (
                     df.groupBy("id")
                     .transformWithStateInPandas(
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
index cfaa7e850d57..b4a6b957f937 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
@@ -256,14 +256,14 @@ class CogroupedApplyInPandasTestsMixin:
                 with self.subTest(convert="string to double"):
                     pandas_type_name = "object" if 
LooseVersion(pd.__version__) < "3.0.0" else "str"
                     expected = (
-                        rf"ValueError: Exception thrown when converting 
pandas.Series \({pandas_type_name}\) "
-                        r"with name 'k' to Arrow Array \(double\)."
+                        rf"ValueError: Failed to convert the value of the 
column 'k' "
+                        rf"with type '{pandas_type_name}' to Arrow type 
'double'\."
                     )
                     if safely:
                         expected = expected + (
-                            " It can be caused by overflows or other "
-                            "unsafe conversions warned by Arrow. Arrow safe 
type check "
-                            "can be disabled by using SQL config "
+                            " It can be caused by overflows or other unsafe "
+                            "conversions warned by Arrow. Arrow safe type "
+                            "check can be disabled by using SQL config "
                             
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
                         )
                     self._test_merge_error(
@@ -276,8 +276,9 @@ class CogroupedApplyInPandasTestsMixin:
                 # sometimes we see TypeErrors
                 with self.subTest(convert="double to string"):
                     expected = (
-                        r"TypeError: Exception thrown when converting 
pandas.Series \(float64\) "
-                        r"with name 'k' to Arrow Array \(string\)."
+                        r"TypeError: Cannot convert the output value of the 
column 'k' "
+                        r"with type 'float64' to the specified return type of 
the column: "
+                        r"'string'\. Please check if the data types match and 
try again\."
                     )
                     self._test_merge_error(
                         fn=lambda lft, rgt: pd.DataFrame({"id": [1], "k": 
[2.0]}),
@@ -321,9 +322,7 @@ class CogroupedApplyInPandasTestsMixin:
         with self.sql_conf(
             
{"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False}
         ):
-            with self.assertRaisesRegex(
-                PythonException, "Exception thrown when converting 
pandas.Series"
-            ):
+            with self.assertRaisesRegex(PythonException, "Failed to convert 
the value"):
                 (
                     left.groupby("id")
                     .cogroup(right.groupby("id"))
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 3367d6a3ae02..ae27cb2bc70f 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -349,14 +349,14 @@ class ApplyInPandasTestsMixin:
                 with self.subTest(convert="string to double"):
                     pandas_type_name = "object" if 
LooseVersion(pd.__version__) < "3.0.0" else "str"
                     expected = (
-                        rf"ValueError: Exception thrown when converting 
pandas.Series \({pandas_type_name}\) "
-                        r"with name 'mean' to Arrow Array \(double\)."
+                        rf"ValueError: Failed to convert the value of the 
column 'mean' "
+                        rf"with type '{pandas_type_name}' to Arrow type 
'double'\."
                     )
                     if safely:
                         expected = expected + (
-                            " It can be caused by overflows or other "
-                            "unsafe conversions warned by Arrow. Arrow safe 
type check "
-                            "can be disabled by using SQL config "
+                            " It can be caused by overflows or other unsafe "
+                            "conversions warned by Arrow. Arrow safe type "
+                            "check can be disabled by using SQL config "
                             
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
                         )
                     with self.assertRaisesRegex(PythonException, expected):
@@ -369,8 +369,9 @@ class ApplyInPandasTestsMixin:
                 with self.subTest(convert="double to string"):
                     with self.assertRaisesRegex(
                         PythonException,
-                        r"TypeError: Exception thrown when converting 
pandas.Series \(float64\) "
-                        r"with name 'mean' to Arrow Array \(string\).",
+                        r"TypeError: Cannot convert the output value of the 
column 'mean' "
+                        r"with type 'float64' to the specified return type of 
the column: "
+                        r"'string'\. Please check if the data types match and 
try again\.",
                     ):
                         self._test_apply_in_pandas(
                             lambda key, pdf: pd.DataFrame([key + 
(pdf.v.mean(),)]),
@@ -397,9 +398,7 @@ class ApplyInPandasTestsMixin:
         with self.sql_conf(
             
{"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False}
         ):
-            with self.assertRaisesRegex(
-                PythonException, "Exception thrown when converting 
pandas.Series"
-            ):
+            with self.assertRaisesRegex(PythonException, "Failed to convert 
the value"):
                 (
                     self.data.groupby("id")
                     .applyInPandas(
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_map.py
index 1df1828d2fb2..43f316810474 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py
@@ -303,14 +303,14 @@ class MapInPandasTestsMixin:
                     pandas_type_name = "object" if 
LooseVersion(pd.__version__) < "3.0.0" else "str"
 
                     expected = (
-                        r"ValueError: Exception thrown when converting 
pandas.Series "
-                        rf"\({pandas_type_name}\) with name 'id' to Arrow 
Array \(double\)."
+                        rf"ValueError: Failed to convert the value of the 
column 'id' "
+                        rf"with type '{pandas_type_name}' to Arrow type 
'double'\."
                     )
                     if safely:
                         expected = expected + (
-                            " It can be caused by overflows or other "
-                            "unsafe conversions warned by Arrow. Arrow safe 
type check "
-                            "can be disabled by using SQL config "
+                            " It can be caused by overflows or other unsafe "
+                            "conversions warned by Arrow. Arrow safe type "
+                            "check can be disabled by using SQL config "
                             
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
                         )
                     with self.assertRaisesRegex(PythonException, expected):
@@ -333,11 +333,11 @@ class MapInPandasTestsMixin:
                     )
                     if safely:
                         expected = (
-                            r"ValueError: Exception thrown when converting 
pandas.Series "
-                            r"\(float64\) with name 'id' to Arrow Array 
\(int32\)."
-                            " It can be caused by overflows or other "
-                            "unsafe conversions warned by Arrow. Arrow safe 
type check "
-                            "can be disabled by using SQL config "
+                            r"ValueError: Failed to convert the value of the 
column 'id' "
+                            r"with type 'float64' to Arrow type 'int32'\."
+                            " It can be caused by overflows or other unsafe "
+                            "conversions warned by Arrow. Arrow safe type "
+                            "check can be disabled by using SQL config "
                             
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
                         )
                         with self.assertRaisesRegex(PythonException, expected):
@@ -489,8 +489,8 @@ class MapInPandasTestsMixin:
         pandas_type_name = "object" if LooseVersion(pd.__version__) < "3.0.0" 
else "str"
         with self.assertRaisesRegex(
             PythonException,
-            f"PySparkValueError: Exception thrown when converting 
pandas.Series \\({pandas_type_name}\\) "
-            "with name 'id' to Arrow Array \\(int32\\)\\.",
+            f"PySparkValueError: Failed to convert the value of the column 
'id' "
+            f"with type '{pandas_type_name}' to Arrow type 'int32'\\.",
         ):
             df.collect()
 
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf.py
index db5d2072a4bf..752ec743d412 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf.py
@@ -322,9 +322,7 @@ class PandasUDFTestsMixin:
 
         # Since 0.11.0, PyArrow supports the feature to raise an error for 
unsafe cast.
         with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
-            with self.assertRaisesRegex(
-                Exception, "Exception thrown when converting pandas.Series"
-            ):
+            with self.assertRaisesRegex(Exception, "Failed to convert the 
value"):
                 df.select(["A"]).withColumn("udf", udf("A")).collect()
 
         # Disabling Arrow safe type check.
@@ -342,9 +340,7 @@ class PandasUDFTestsMixin:
 
         # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast.
         with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
-            with self.assertRaisesRegex(
-                Exception, "Exception thrown when converting pandas.Series"
-            ):
+            with self.assertRaisesRegex(Exception, "Failed to convert the 
value"):
                 df.withColumn("udf", udf("id")).collect()
 
         # Disabling safe type check, let Arrow do the cast anyway.
@@ -375,7 +371,7 @@ class PandasUDFTestsMixin:
         ):
             self.assertRaisesRegex(
                 PythonException,
-                "Exception thrown when converting pandas.Series",
+                "Failed to convert the value",
                 df.withColumn("decimal_val", int_to_decimal_udf("id")).collect,
             )
 
diff --git a/python/pyspark/sql/tests/test_conversion.py 
b/python/pyspark/sql/tests/test_conversion.py
index 261b81a407b5..9ac6bcbd0537 100644
--- a/python/pyspark/sql/tests/test_conversion.py
+++ b/python/pyspark/sql/tests/test_conversion.py
@@ -18,7 +18,7 @@ import datetime
 import unittest
 from zoneinfo import ZoneInfo
 
-from pyspark.errors import PySparkRuntimeError, PySparkTypeError, 
PySparkValueError
+from pyspark.errors import PySparkTypeError, PySparkValueError
 from pyspark.sql.conversion import (
     ArrowArrayToPandasConversion,
     ArrowTableToRowsConversion,
@@ -298,20 +298,21 @@ class PandasToArrowConversionTests(unittest.TestCase):
         data = [pd.Series(["not_int", "bad"]), pd.Series(["a", "b"])]
         with self.assertRaises((PySparkValueError, PySparkTypeError)) as ctx:
             PandasToArrowConversion.convert(data, schema)
-        # Error message should reference the schema field name, not the 
positional index
+        # Error message should use the new format and reference the schema 
field name
         self.assertIn("age", str(ctx.exception))
 
-    def test_convert_is_udtf(self):
-        """Test is_udtf=True produces PySparkRuntimeError with 
UDTF_ARROW_TYPE_CAST_ERROR."""
+    def test_convert_is_legacy(self):
+        """Test is_legacy=True uses the legacy error format."""
         import pandas as pd
 
         schema = StructType([StructField("val", DoubleType())])
         data = [pd.Series(["not_a_number", "bad"])]
 
         # ValueError path (string -> double)
-        with self.assertRaises(PySparkRuntimeError) as ctx:
-            PandasToArrowConversion.convert(data, schema, is_udtf=True)
-        self.assertIn("UDTF_ARROW_TYPE_CAST_ERROR", str(ctx.exception))
+        with self.assertRaises(PySparkValueError) as ctx:
+            PandasToArrowConversion.convert(data, schema, is_legacy=True)
+        self.assertIn("Exception thrown when converting pandas.Series", 
str(ctx.exception))
+        self.assertIn("val", str(ctx.exception))
 
         # TypeError path (int -> struct): ArrowTypeError inherits from 
TypeError.
         # ignore_unexpected_complex_type_values=True lets the bad value pass 
through
@@ -320,14 +321,15 @@ class PandasToArrowConversionTests(unittest.TestCase):
             [StructField("x", StructType([StructField("a", IntegerType())]))]
         )
         data = [pd.Series([0, 1])]
-        with self.assertRaises(PySparkRuntimeError) as ctx:
+        with self.assertRaises(PySparkTypeError) as ctx:
             PandasToArrowConversion.convert(
                 data,
                 struct_schema,
-                is_udtf=True,
+                is_legacy=True,
                 ignore_unexpected_complex_type_values=True,
             )
-        self.assertIn("UDTF_ARROW_TYPE_CAST_ERROR", str(ctx.exception))
+        self.assertIn("Exception thrown when converting pandas.Series", 
str(ctx.exception))
+        self.assertIn("x", str(ctx.exception))
 
     def test_convert_prefers_large_types(self):
         """Test prefers_large_types produces large Arrow types."""
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index a18911815a9d..0626addf778c 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -3494,7 +3494,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield 1,
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", [Row(x=True)]),
@@ -3521,7 +3521,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield "1",
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", [Row(x=True)]),
@@ -3550,7 +3550,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield "hello",
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", err),
@@ -3579,7 +3579,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield [0, 1.1, 2],
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", err),
@@ -3612,7 +3612,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield {"a": 0, "b": 1.1, "c": 2},
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", err),
@@ -3644,7 +3644,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield {"a": 0, "b": 1.1, "c": 2},
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", err),
@@ -3675,7 +3675,7 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             def eval(self):
                 yield Row(a=0, b=1.1, c=2),
 
-        err = "UDTF_ARROW_TYPE_CAST_ERROR"
+        err = "Exception thrown when converting pandas.Series"
 
         for ret_type, expected in [
             ("x: boolean", err),
@@ -3712,7 +3712,9 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
             "x: array<int>",
         ]:
             with self.subTest(ret_type=ret_type):
-                with self.assertRaisesRegex(PythonException, 
"UDTF_ARROW_TYPE_CAST_ERROR"):
+                with self.assertRaisesRegex(
+                    PythonException, "Exception thrown when converting 
pandas.Series"
+                ):
                     udtf(TestUDTF, returnType=ret_type)().collect()
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to