This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4f71231  [SPARK-33073][PYTHON] Improve error handling on Pandas to 
Arrow conversion failures
4f71231 is described below

commit 4f71231af51a3da5d7964a218a878c0cf3037c10
Author: Bryan Cutler <cutl...@gmail.com>
AuthorDate: Tue Oct 6 18:11:24 2020 +0900

    [SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion 
failures
    
    ### What changes were proposed in this pull request?
    
    This improves error handling when a failure in conversion from Pandas to 
Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0 
release.
    
    ### Why are the changes needed?
    
    Current tests will fail with Arrow 2.0.0 because of a change in error 
message when the schema is invalid. For these cases, the current error message 
also includes information on disabling safe conversion config, which is mainly 
meant for floating point truncation and overflow. The tests have been updated 
to use a message that is show for past Arrow versions, and upcoming.
    
    If the user enters an invalid schema, the error produced by pyarrow is not 
consistent and either `TypeError` or `ArrowInvalid`, with the latter being 
caught, and raised as a `RuntimeError` with the extra info.
    
    The error handling is improved by:
    
    - narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a 
subclass and what is raised on safe conversion failures.
    - The exception is only raised with additional information on disabling 
"spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the 
first place.
    - The original exception is chained to better show it to the user.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the error re-raised changes from a RuntimeError to a ValueError, which 
better categorizes this type of error and in-line with the original Arrow error.
    
    ### How was this patch tested?
    
    Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot
    
    Closes #29951 from 
BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073.
    
    Authored-by: Bryan Cutler <cutl...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 0812d6c17cc4876bb87a9d1fec35ec8c7b2365f0)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/serializers.py            | 17 ++++++++++-------
 python/pyspark/sql/tests/test_arrow.py              |  9 +++++----
 python/pyspark/sql/tests/test_pandas_grouped_map.py | 15 ++++++++-------
 3 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 4dd15d1..b164a38 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -156,13 +156,16 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
                 s = _check_series_convert_timestamps_internal(s, 
self._timezone)
             try:
                 array = pa.Array.from_pandas(s, mask=mask, type=t, 
safe=self._safecheck)
-            except pa.ArrowException as e:
-                error_msg = "Exception thrown when converting pandas.Series 
(%s) to Arrow " + \
-                            "Array (%s). It can be caused by overflows or 
other unsafe " + \
-                            "conversions warned by Arrow. Arrow safe type 
check can be " + \
-                            "disabled by using SQL config " + \
-                            
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
-                raise RuntimeError(error_msg % (s.dtype, t), e)
+            except ValueError as e:
+                if self._safecheck:
+                    error_msg = "Exception thrown when converting 
pandas.Series (%s) to " + \
+                                "Arrow Array (%s). It can be caused by 
overflows or other " + \
+                                "unsafe conversions warned by Arrow. Arrow 
safe type check " + \
+                                "can be disabled by using SQL config " + \
+                                
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
+                    raise ValueError(error_msg % (s.dtype, t)) from e
+                else:
+                    raise e
             return array
 
         arrs = []
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 15c5cf1..2c6231d 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -266,11 +266,12 @@ class ArrowTests(ReusedSQLTestCase):
     def test_createDataFrame_with_incorrect_schema(self):
         pdf = self.create_pandas_data_frame()
         fields = list(self.schema)
-        fields[0], fields[1] = fields[1], fields[0]  # swap str with int
+        fields[5], fields[6] = fields[6], fields[5]  # swap decimal with date
         wrong_schema = StructType(fields)
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, "integer.*required"):
-                self.spark.createDataFrame(pdf, schema=wrong_schema)
+        with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(Exception, 
"[D|d]ecimal.*got.*date"):
+                    self.spark.createDataFrame(pdf, schema=wrong_schema)
 
     def test_createDataFrame_with_names(self):
         pdf = self.create_pandas_data_frame()
diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/test_pandas_grouped_map.py
index cc6167e..8e02b29 100644
--- a/python/pyspark/sql/tests/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py
@@ -450,15 +450,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase):
         def column_name_typo(pdf):
             return pd.DataFrame({'iid': pdf.id, 'v': pdf.v})
 
-        @pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
+        @pandas_udf('id long, v decimal', PandasUDFType.GROUPED_MAP)
         def invalid_positional_types(pdf):
-            return pd.DataFrame([(u'a', 1.2)])
+            return pd.DataFrame([(1, datetime.date(2020, 10, 5))])
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
-                grouped_df.apply(column_name_typo).collect()
-            with self.assertRaisesRegexp(Exception, "an integer is required"):
-                grouped_df.apply(invalid_positional_types).collect()
+        with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
+                    grouped_df.apply(column_name_typo).collect()
+                with self.assertRaisesRegexp(Exception, 
"[D|d]ecimal.*got.*date"):
+                    grouped_df.apply(invalid_positional_types).collect()
 
     def test_positional_assignment_conf(self):
         with self.sql_conf({


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to