This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4f71231 [SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion failures 4f71231 is described below commit 4f71231af51a3da5d7964a218a878c0cf3037c10 Author: Bryan Cutler <cutl...@gmail.com> AuthorDate: Tue Oct 6 18:11:24 2020 +0900 [SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion failures ### What changes were proposed in this pull request? This improves error handling when a failure in conversion from Pandas to Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0 release. ### Why are the changes needed? Current tests will fail with Arrow 2.0.0 because of a change in error message when the schema is invalid. For these cases, the current error message also includes information on disabling safe conversion config, which is mainly meant for floating point truncation and overflow. The tests have been updated to use a message that is show for past Arrow versions, and upcoming. If the user enters an invalid schema, the error produced by pyarrow is not consistent and either `TypeError` or `ArrowInvalid`, with the latter being caught, and raised as a `RuntimeError` with the extra info. The error handling is improved by: - narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a subclass and what is raised on safe conversion failures. - The exception is only raised with additional information on disabling "spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the first place. - The original exception is chained to better show it to the user. ### Does this PR introduce _any_ user-facing change? Yes, the error re-raised changes from a RuntimeError to a ValueError, which better categorizes this type of error and in-line with the original Arrow error. ### How was this patch tested? Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot Closes #29951 from BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073. Authored-by: Bryan Cutler <cutl...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 0812d6c17cc4876bb87a9d1fec35ec8c7b2365f0) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/serializers.py | 17 ++++++++++------- python/pyspark/sql/tests/test_arrow.py | 9 +++++---- python/pyspark/sql/tests/test_pandas_grouped_map.py | 15 ++++++++------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 4dd15d1..b164a38 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -156,13 +156,16 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): s = _check_series_convert_timestamps_internal(s, self._timezone) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) - except pa.ArrowException as e: - error_msg = "Exception thrown when converting pandas.Series (%s) to Arrow " + \ - "Array (%s). It can be caused by overflows or other unsafe " + \ - "conversions warned by Arrow. Arrow safe type check can be " + \ - "disabled by using SQL config " + \ - "`spark.sql.execution.pandas.convertToArrowArraySafely`." - raise RuntimeError(error_msg % (s.dtype, t), e) + except ValueError as e: + if self._safecheck: + error_msg = "Exception thrown when converting pandas.Series (%s) to " + \ + "Arrow Array (%s). It can be caused by overflows or other " + \ + "unsafe conversions warned by Arrow. Arrow safe type check " + \ + "can be disabled by using SQL config " + \ + "`spark.sql.execution.pandas.convertToArrowArraySafely`." + raise ValueError(error_msg % (s.dtype, t)) from e + else: + raise e return array arrs = [] diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 15c5cf1..2c6231d 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -266,11 +266,12 @@ class ArrowTests(ReusedSQLTestCase): def test_createDataFrame_with_incorrect_schema(self): pdf = self.create_pandas_data_frame() fields = list(self.schema) - fields[0], fields[1] = fields[1], fields[0] # swap str with int + fields[5], fields[6] = fields[6], fields[5] # swap decimal with date wrong_schema = StructType(fields) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, "integer.*required"): - self.spark.createDataFrame(pdf, schema=wrong_schema) + with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, "[D|d]ecimal.*got.*date"): + self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): pdf = self.create_pandas_data_frame() diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index cc6167e..8e02b29 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -450,15 +450,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase): def column_name_typo(pdf): return pd.DataFrame({'iid': pdf.id, 'v': pdf.v}) - @pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP) + @pandas_udf('id long, v decimal', PandasUDFType.GROUPED_MAP) def invalid_positional_types(pdf): - return pd.DataFrame([(u'a', 1.2)]) + return pd.DataFrame([(1, datetime.date(2020, 10, 5))]) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): - grouped_df.apply(column_name_typo).collect() - with self.assertRaisesRegexp(Exception, "an integer is required"): - grouped_df.apply(invalid_positional_types).collect() + with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): + grouped_df.apply(column_name_typo).collect() + with self.assertRaisesRegexp(Exception, "[D|d]ecimal.*got.*date"): + grouped_df.apply(invalid_positional_types).collect() def test_positional_assignment_conf(self): with self.sql_conf({ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org