Repository: spark Updated Branches: refs/heads/master c68ec4e6a -> ed72badb0
[SPARK-23699][PYTHON][SQL] Raise same type of error caught with Arrow enabled ## What changes were proposed in this pull request? When using Arrow for createDataFrame or toPandas and an error is encountered with fallback disabled, this will raise the same type of error instead of a RuntimeError. This change also allows for the traceback of the error to be retained and prevents the accidental chaining of exceptions with Python 3. ## How was this patch tested? Updated existing tests to verify error type. Author: Bryan Cutler <[email protected]> Closes #20839 from BryanCutler/arrow-raise-same-error-SPARK-23699. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed72badb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed72badb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed72badb Branch: refs/heads/master Commit: ed72badb04a56d8046bbd185245abf5ae265ccfd Parents: c68ec4e Author: Bryan Cutler <[email protected]> Authored: Tue Mar 27 20:06:12 2018 -0700 Committer: Bryan Cutler <[email protected]> Committed: Tue Mar 27 20:06:12 2018 -0700 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 25 +++++++++++++------------ python/pyspark/sql/session.py | 13 +++++++------ python/pyspark/sql/tests.py | 10 +++++----- python/pyspark/sql/utils.py | 6 ++++++ 4 files changed, 31 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ed72badb/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3fc194d..16f8e52 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2007,7 +2007,7 @@ class DataFrame(object): "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " "failed by the reason below:\n %s\n" - "Attempts non-optimization as " + "Attempting non-optimization as " "'spark.sql.execution.arrow.fallback.enabled' is set to " "true." % _exception_message(e)) warnings.warn(msg) @@ -2015,11 +2015,12 @@ class DataFrame(object): else: msg = ( "toPandas attempted Arrow optimization because " - "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n %s\n" - "For fallback to non-optimization automatically, please set true to " - "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) - raise RuntimeError(msg) + "'spark.sql.execution.arrow.enabled' is set to true, but has reached " + "the error below and will not continue because automatic fallback " + "with 'spark.sql.execution.arrow.fallback.enabled' has been set to " + "false.\n %s" % _exception_message(e)) + warnings.warn(msg) + raise # Try to use Arrow optimization when the schema is supported and the required version # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. @@ -2042,12 +2043,12 @@ class DataFrame(object): # be executed. So, simply fail in this case for now. msg = ( "toPandas attempted Arrow optimization because " - "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed unexpectedly:\n %s\n" - "Note that 'spark.sql.execution.arrow.fallback.enabled' does " - "not have an effect in such failure in the middle of " - "computation." % _exception_message(e)) - raise RuntimeError(msg) + "'spark.sql.execution.arrow.enabled' is set to true, but has reached " + "the error below and can not continue. Note that " + "'spark.sql.execution.arrow.fallback.enabled' does not have an effect " + "on failures in the middle of computation.\n %s" % _exception_message(e)) + warnings.warn(msg) + raise # Below is toPandas without Arrow optimization. pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) http://git-wip-us.apache.org/repos/asf/spark/blob/ed72badb/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index e82a975..13d6e2e 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -674,18 +674,19 @@ class SparkSession(object): "createDataFrame attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true; however, " "failed by the reason below:\n %s\n" - "Attempts non-optimization as " + "Attempting non-optimization as " "'spark.sql.execution.arrow.fallback.enabled' is set to " "true." % _exception_message(e)) warnings.warn(msg) else: msg = ( "createDataFrame attempted Arrow optimization because " - "'spark.sql.execution.arrow.enabled' is set to true; however, " - "failed by the reason below:\n %s\n" - "For fallback to non-optimization automatically, please set true to " - "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) - raise RuntimeError(msg) + "'spark.sql.execution.arrow.enabled' is set to true, but has reached " + "the error below and will not continue because automatic fallback " + "with 'spark.sql.execution.arrow.fallback.enabled' has been set to " + "false.\n %s" % _exception_message(e)) + warnings.warn(msg) + raise data = self._convert_from_pandas(data, schema, timezone) if isinstance(schema, StructType): http://git-wip-us.apache.org/repos/asf/spark/blob/ed72badb/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 967cc83..01c5dd6 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3559,7 +3559,7 @@ class ArrowTests(ReusedSQLTestCase): warn.message for warn in warns if isinstance(warn.message, UserWarning)] self.assertTrue(len(user_warns) > 0) self.assertTrue( - "Attempts non-optimization" in _exception_message(user_warns[-1])) + "Attempting non-optimization" in _exception_message(user_warns[-1])) self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): @@ -3682,7 +3682,7 @@ class ArrowTests(ReusedSQLTestCase): pdf = self.create_pandas_data_frame() wrong_schema = StructType(list(reversed(self.schema))) with QuietTest(self.sc): - with self.assertRaisesRegexp(RuntimeError, ".*No cast.*string.*timestamp.*"): + with self.assertRaisesRegexp(Exception, ".*No cast.*string.*timestamp.*"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): @@ -3707,7 +3707,7 @@ class ArrowTests(ReusedSQLTestCase): def test_createDataFrame_with_single_data_type(self): import pandas as pd with QuietTest(self.sc): - with self.assertRaisesRegexp(RuntimeError, ".*IntegerType.*not supported.*"): + with self.assertRaisesRegexp(ValueError, ".*IntegerType.*not supported.*"): self.spark.createDataFrame(pd.DataFrame({"a": [1]}), schema="int") def test_createDataFrame_does_not_modify_input(self): @@ -3775,14 +3775,14 @@ class ArrowTests(ReusedSQLTestCase): warn.message for warn in warns if isinstance(warn.message, UserWarning)] self.assertTrue(len(user_warns) > 0) self.assertTrue( - "Attempts non-optimization" in _exception_message(user_warns[-1])) + "Attempting non-optimization" in _exception_message(user_warns[-1])) self.assertEqual(df.collect(), [Row(a={u'a': 1})]) def test_createDataFrame_fallback_disabled(self): import pandas as pd with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type'): + with self.assertRaisesRegexp(TypeError, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map<string, int>") http://git-wip-us.apache.org/repos/asf/spark/blob/ed72badb/python/pyspark/sql/utils.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 5782986..45363f0 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -121,7 +121,10 @@ def require_minimum_pandas_version(): from distutils.version import LooseVersion try: import pandas + have_pandas = True except ImportError: + have_pandas = False + if not have_pandas: raise ImportError("Pandas >= %s must be installed; however, " "it was not found." % minimum_pandas_version) if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version): @@ -138,7 +141,10 @@ def require_minimum_pyarrow_version(): from distutils.version import LooseVersion try: import pyarrow + have_arrow = True except ImportError: + have_arrow = False + if not have_arrow: raise ImportError("PyArrow >= %s must be installed; however, " "it was not found." % minimum_pyarrow_version) if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
