This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new bca1ee54e38 [SPARK-42342][PYTHON][CONNECT] Introduce base hierarchy to exceptions bca1ee54e38 is described below commit bca1ee54e38258c4ea0def067883fa67071b7d3d Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Wed Feb 8 20:46:13 2023 +0900 [SPARK-42342][PYTHON][CONNECT] Introduce base hierarchy to exceptions ### What changes were proposed in this pull request? Introduces base hierarchy to exceptions. As a common hierarchy for users, base exception classes are subclasses of `PySparkException`. The concrete classes for both PySpark and Spark Connect inherits the base classes that should not be exposed to users. ### Why are the changes needed? Currently exception class hierarchy is separated between PySpark and Spark Connect. If users want to check the exception type, they need to switch the error classes based on whether they are running on PySpark or Spark Connect, but it's not ideal. ### Does this PR introduce _any_ user-facing change? No. Users still can use the existing exception classes to check the exception type. ### How was this patch tested? Updated tests. Closes #39882 from ueshin/issues/SPARK-42342/exceptions. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit bd34b162d4774bcc19371096a04972b03f423bca) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/docs/source/reference/pyspark.errors.rst | 7 +- python/pyspark/errors/__init__.py | 16 +- python/pyspark/errors/exceptions/__init__.py | 16 ++ python/pyspark/errors/exceptions/base.py | 162 ++++++++++++++++++ .../{exceptions.py => exceptions/captured.py} | 188 +++------------------ python/pyspark/errors/exceptions/connect.py | 105 ++++++++++++ python/pyspark/sql/catalog.py | 6 +- python/pyspark/sql/connect/client.py | 23 ++- python/pyspark/sql/connect/functions.py | 5 +- python/pyspark/sql/context.py | 2 +- python/pyspark/sql/functions.py | 5 +- python/pyspark/sql/session.py | 2 +- python/pyspark/sql/streaming/query.py | 5 +- .../sql/tests/connect/test_connect_basic.py | 34 ++-- .../sql/tests/connect/test_connect_column.py | 2 +- .../sql/tests/connect/test_connect_function.py | 16 +- .../sql/tests/connect/test_parity_column.py | 2 +- .../sql/tests/connect/test_parity_dataframe.py | 5 + .../sql/tests/connect/test_parity_functions.py | 7 + .../sql/tests/connect/test_parity_pandas_udf.py | 25 +-- .../pyspark/sql/tests/connect/test_parity_udf.py | 23 --- python/pyspark/sql/tests/pandas/test_pandas_udf.py | 20 ++- .../pyspark/sql/tests/streaming/test_streaming.py | 2 +- python/pyspark/sql/tests/test_dataframe.py | 19 +-- python/pyspark/sql/tests/test_functions.py | 16 +- python/setup.py | 1 + 26 files changed, 410 insertions(+), 304 deletions(-) diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index 1d54c6babe0..13db9bd01fa 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -30,6 +30,7 @@ Classes PySparkException AnalysisException + TempTableAlreadyExistsException ParseException IllegalArgumentException StreamingQueryException @@ -37,12 +38,6 @@ Classes PythonException UnknownException SparkUpgradeException - SparkConnectAnalysisException - SparkConnectException - SparkConnectGrpcException - SparkConnectParseException - SparkConnectTempTableAlreadyExistsException - SparkConnectIllegalArgumentException Methods diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 7faa0768a24..95da7ca2aa8 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -18,9 +18,10 @@ """ PySpark exceptions. """ -from pyspark.errors.exceptions import ( # noqa: F401 +from pyspark.errors.exceptions.base import ( # noqa: F401 PySparkException, AnalysisException, + TempTableAlreadyExistsException, ParseException, IllegalArgumentException, StreamingQueryException, @@ -30,18 +31,13 @@ from pyspark.errors.exceptions import ( # noqa: F401 SparkUpgradeException, PySparkTypeError, PySparkValueError, - SparkConnectException, - SparkConnectGrpcException, - SparkConnectAnalysisException, - SparkConnectParseException, - SparkConnectTempTableAlreadyExistsException, - SparkConnectIllegalArgumentException, ) __all__ = [ "PySparkException", "AnalysisException", + "TempTableAlreadyExistsException", "ParseException", "IllegalArgumentException", "StreamingQueryException", @@ -51,10 +47,4 @@ __all__ = [ "SparkUpgradeException", "PySparkTypeError", "PySparkValueError", - "SparkConnectException", - "SparkConnectGrpcException", - "SparkConnectAnalysisException", - "SparkConnectParseException", - "SparkConnectTempTableAlreadyExistsException", - "SparkConnectIllegalArgumentException", ] diff --git a/python/pyspark/errors/exceptions/__init__.py b/python/pyspark/errors/exceptions/__init__.py new file mode 100644 index 00000000000..cce3acad34a --- /dev/null +++ b/python/pyspark/errors/exceptions/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py new file mode 100644 index 00000000000..6e67039374d --- /dev/null +++ b/python/pyspark/errors/exceptions/base.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Dict, Optional, cast + +from pyspark.errors.utils import ErrorClassesReader + + +class PySparkException(Exception): + """ + Base Exception for handling errors generated from PySpark. + """ + + def __init__( + self, + message: Optional[str] = None, + error_class: Optional[str] = None, + message_parameters: Optional[Dict[str, str]] = None, + ): + # `message` vs `error_class` & `message_parameters` are mutually exclusive. + assert (message is not None and (error_class is None and message_parameters is None)) or ( + message is None and (error_class is not None and message_parameters is not None) + ) + + self.error_reader = ErrorClassesReader() + + if message is None: + self.message = self.error_reader.get_error_message( + cast(str, error_class), cast(Dict[str, str], message_parameters) + ) + else: + self.message = message + + self.error_class = error_class + self.message_parameters = message_parameters + + def getErrorClass(self) -> Optional[str]: + """ + Returns an error class as a string. + + .. versionadded:: 3.4.0 + + See Also + -------- + :meth:`PySparkException.getMessageParameters` + :meth:`PySparkException.getSqlState` + """ + return self.error_class + + def getMessageParameters(self) -> Optional[Dict[str, str]]: + """ + Returns a message parameters as a dictionary. + + .. versionadded:: 3.4.0 + + See Also + -------- + :meth:`PySparkException.getErrorClass` + :meth:`PySparkException.getSqlState` + """ + return self.message_parameters + + def getSqlState(self) -> None: + """ + Returns an SQLSTATE as a string. + + Errors generated in Python have no SQLSTATE, so it always returns None. + + .. versionadded:: 3.4.0 + + See Also + -------- + :meth:`PySparkException.getErrorClass` + :meth:`PySparkException.getMessageParameters` + """ + return None + + def __str__(self) -> str: + if self.getErrorClass() is not None: + return f"[{self.getErrorClass()}] {self.message}" + else: + return self.message + + +class AnalysisException(PySparkException): + """ + Failed to analyze a SQL query plan. + """ + + +class TempTableAlreadyExistsException(AnalysisException): + """ + Failed to create temp view since it is already exists. + """ + + +class ParseException(PySparkException): + """ + Failed to parse a SQL command. + """ + + +class IllegalArgumentException(PySparkException): + """ + Passed an illegal or inappropriate argument. + """ + + +class StreamingQueryException(PySparkException): + """ + Exception that stopped a :class:`StreamingQuery`. + """ + + +class QueryExecutionException(PySparkException): + """ + Failed to execute a query. + """ + + +class PythonException(PySparkException): + """ + Exceptions thrown from Python workers. + """ + + +class UnknownException(PySparkException): + """ + None of the above exceptions. + """ + + +class SparkUpgradeException(PySparkException): + """ + Exception thrown because of Spark upgrade. + """ + + +class PySparkValueError(PySparkException, ValueError): + """ + Wrapper class for ValueError to support error classes. + """ + + +class PySparkTypeError(PySparkException, TypeError): + """ + Wrapper class for TypeError to support error classes. + """ diff --git a/python/pyspark/errors/exceptions.py b/python/pyspark/errors/exceptions/captured.py similarity index 59% rename from python/pyspark/errors/exceptions.py rename to python/pyspark/errors/exceptions/captured.py index a799f4522de..1764ed7d02c 100644 --- a/python/pyspark/errors/exceptions.py +++ b/python/pyspark/errors/exceptions/captured.py @@ -22,83 +22,17 @@ from py4j.protocol import Py4JJavaError from py4j.java_gateway import is_instance_of from pyspark import SparkContext -from pyspark.errors.utils import ErrorClassesReader - - -class PySparkException(Exception): - """ - Base Exception for handling errors generated from PySpark. - """ - - def __init__( - self, - message: Optional[str] = None, - error_class: Optional[str] = None, - message_parameters: Optional[Dict[str, str]] = None, - ): - # `message` vs `error_class` & `message_parameters` are mutually exclusive. - assert (message is not None and (error_class is None and message_parameters is None)) or ( - message is None and (error_class is not None and message_parameters is not None) - ) - - self.error_reader = ErrorClassesReader() - - if message is None: - self.message = self.error_reader.get_error_message( - cast(str, error_class), cast(Dict[str, str], message_parameters) - ) - else: - self.message = message - - self.error_class = error_class - self.message_parameters = message_parameters - - def getErrorClass(self) -> Optional[str]: - """ - Returns an error class as a string. - - .. versionadded:: 3.4.0 - - See Also - -------- - :meth:`PySparkException.getMessageParameters` - :meth:`PySparkException.getSqlState` - """ - return self.error_class - - def getMessageParameters(self) -> Optional[Dict[str, str]]: - """ - Returns a message parameters as a dictionary. - - .. versionadded:: 3.4.0 - - See Also - -------- - :meth:`PySparkException.getErrorClass` - :meth:`PySparkException.getSqlState` - """ - return self.message_parameters - - def getSqlState(self) -> None: - """ - Returns an SQLSTATE as a string. - - Errors generated in Python have no SQLSTATE, so it always returns None. - - .. versionadded:: 3.4.0 - - See Also - -------- - :meth:`PySparkException.getErrorClass` - :meth:`PySparkException.getMessageParameters` - """ - return None - - def __str__(self) -> str: - if self.getErrorClass() is not None: - return f"[{self.getErrorClass()}] {self.message}" - else: - return self.message +from pyspark.errors.exceptions.base import ( + AnalysisException as BaseAnalysisException, + IllegalArgumentException as BaseIllegalArgumentException, + ParseException as BaseParseException, + PySparkException, + PythonException as BasePythonException, + QueryExecutionException as BaseQueryExecutionException, + SparkUpgradeException as BaseSparkUpgradeException, + StreamingQueryException as BaseStreamingQueryException, + UnknownException as BaseUnknownException, +) class CapturedException(PySparkException): @@ -247,133 +181,49 @@ def install_exception_handler() -> None: py4j.java_gateway.get_return_value = patched -class AnalysisException(CapturedException): +class AnalysisException(CapturedException, BaseAnalysisException): """ Failed to analyze a SQL query plan. """ -class ParseException(CapturedException): +class ParseException(CapturedException, BaseParseException): """ Failed to parse a SQL command. """ -class IllegalArgumentException(CapturedException): +class IllegalArgumentException(CapturedException, BaseIllegalArgumentException): """ Passed an illegal or inappropriate argument. """ -class StreamingQueryException(CapturedException): +class StreamingQueryException(CapturedException, BaseStreamingQueryException): """ Exception that stopped a :class:`StreamingQuery`. """ -class QueryExecutionException(CapturedException): +class QueryExecutionException(CapturedException, BaseQueryExecutionException): """ Failed to execute a query. """ -class PythonException(CapturedException): +class PythonException(CapturedException, BasePythonException): """ Exceptions thrown from Python workers. """ -class UnknownException(CapturedException): +class UnknownException(CapturedException, BaseUnknownException): """ None of the above exceptions. """ -class SparkUpgradeException(CapturedException): +class SparkUpgradeException(CapturedException, BaseSparkUpgradeException): """ Exception thrown because of Spark upgrade. """ - - -class SparkConnectException(PySparkException): - """ - Exception thrown from Spark Connect. - """ - - -class SparkConnectGrpcException(SparkConnectException): - """ - Base class to handle the errors from GRPC. - """ - - def __init__( - self, - message: Optional[str] = None, - error_class: Optional[str] = None, - message_parameters: Optional[Dict[str, str]] = None, - reason: Optional[str] = None, - ) -> None: - self.message = message # type: ignore[assignment] - if reason is not None: - self.message = f"({reason}) {self.message}" - - super().__init__( - message=self.message, - error_class=error_class, - message_parameters=message_parameters, - ) - - -class SparkConnectAnalysisException(SparkConnectGrpcException): - """ - Failed to analyze a SQL query plan from Spark Connect server. - """ - - def __init__( - self, - message: Optional[str] = None, - error_class: Optional[str] = None, - message_parameters: Optional[Dict[str, str]] = None, - plan: Optional[str] = None, - reason: Optional[str] = None, - ) -> None: - self.message = message # type: ignore[assignment] - if plan is not None: - self.message = f"{self.message}\nPlan: {plan}" - - super().__init__( - message=self.message, - error_class=error_class, - message_parameters=message_parameters, - reason=reason, - ) - - -class SparkConnectParseException(SparkConnectGrpcException): - """ - Failed to parse a SQL command from Spark Connect server. - """ - - -class SparkConnectTempTableAlreadyExistsException(SparkConnectAnalysisException): - """ - Failed to create temp view since it is already exists. - """ - - -class PySparkValueError(PySparkException, ValueError): - """ - Wrapper class for ValueError to support error classes. - """ - - -class PySparkTypeError(PySparkException, TypeError): - """ - Wrapper class for TypeError to support error classes. - """ - - -class SparkConnectIllegalArgumentException(SparkConnectGrpcException): - """ - Passed an illegal or inappropriate argument from Spark Connect server. - """ diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py new file mode 100644 index 00000000000..ba3bc9f7576 --- /dev/null +++ b/python/pyspark/errors/exceptions/connect.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Dict, Optional + +from pyspark.errors.exceptions.base import ( + AnalysisException as BaseAnalysisException, + IllegalArgumentException as BaseIllegalArgumentException, + ParseException as BaseParseException, + PySparkException, + PythonException as BasePythonException, + TempTableAlreadyExistsException as BaseTempTableAlreadyExistsException, +) + + +class SparkConnectException(PySparkException): + """ + Exception thrown from Spark Connect. + """ + + +class SparkConnectGrpcException(SparkConnectException): + """ + Base class to handle the errors from GRPC. + """ + + def __init__( + self, + message: Optional[str] = None, + error_class: Optional[str] = None, + message_parameters: Optional[Dict[str, str]] = None, + reason: Optional[str] = None, + ) -> None: + self.message = message # type: ignore[assignment] + if reason is not None: + self.message = f"({reason}) {self.message}" + + super().__init__( + message=self.message, + error_class=error_class, + message_parameters=message_parameters, + ) + + +class AnalysisException(SparkConnectGrpcException, BaseAnalysisException): + """ + Failed to analyze a SQL query plan from Spark Connect server. + """ + + def __init__( + self, + message: Optional[str] = None, + error_class: Optional[str] = None, + message_parameters: Optional[Dict[str, str]] = None, + plan: Optional[str] = None, + reason: Optional[str] = None, + ) -> None: + self.message = message # type: ignore[assignment] + if plan is not None: + self.message = f"{self.message}\nPlan: {plan}" + + super().__init__( + message=self.message, + error_class=error_class, + message_parameters=message_parameters, + reason=reason, + ) + + +class TempTableAlreadyExistsException(AnalysisException, BaseTempTableAlreadyExistsException): + """ + Failed to create temp view from Spark Connect server since it is already exists. + """ + + +class ParseException(SparkConnectGrpcException, BaseParseException): + """ + Failed to parse a SQL command from Spark Connect server. + """ + + +class IllegalArgumentException(SparkConnectGrpcException, BaseIllegalArgumentException): + """ + Passed an illegal or inappropriate argument from Spark Connect server. + """ + + +class PythonException(SparkConnectGrpcException, BasePythonException): + """ + Exceptions thrown from Spark Connect server. + """ diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index a417f754a36..6deee786164 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -959,7 +959,7 @@ class Catalog: Throw an analysis exception when the table does not exist. - >>> spark.catalog.isCached("not_existing_table") # doctest: +SKIP + >>> spark.catalog.isCached("not_existing_table") Traceback (most recent call last): ... AnalysisException: ... @@ -997,7 +997,7 @@ class Catalog: Throw an analysis exception when the table does not exist. - >>> spark.catalog.cacheTable("not_existing_table") # doctest: +SKIP + >>> spark.catalog.cacheTable("not_existing_table") Traceback (most recent call last): ... AnalysisException: ... @@ -1037,7 +1037,7 @@ class Catalog: Throw an analysis exception when the table does not exist. - >>> spark.catalog.uncacheTable("not_existing_table") # doctest: +SKIP + >>> spark.catalog.uncacheTable("not_existing_table") Traceback (most recent call last): ... AnalysisException: ... diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index 36f1328b1bd..8cf5fa50693 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -58,13 +58,14 @@ from google.rpc import error_details_pb2 import pyspark.sql.connect.proto as pb2 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib import pyspark.sql.connect.types as types -from pyspark.errors import ( +from pyspark.errors.exceptions.connect import ( + AnalysisException, + ParseException, + PythonException, SparkConnectException, SparkConnectGrpcException, - SparkConnectAnalysisException, - SparkConnectParseException, - SparkConnectTempTableAlreadyExistsException, - SparkConnectIllegalArgumentException, + TempTableAlreadyExistsException, + IllegalArgumentException, ) from pyspark.sql.types import ( DataType, @@ -672,22 +673,26 @@ class SparkConnectClient(object): d.Unpack(info) reason = info.reason if reason == "org.apache.spark.sql.AnalysisException": - raise SparkConnectAnalysisException( + raise AnalysisException( info.metadata["message"], plan=info.metadata["plan"] ) from None elif reason == "org.apache.spark.sql.catalyst.parser.ParseException": - raise SparkConnectParseException(info.metadata["message"]) from None + raise ParseException(info.metadata["message"]) from None elif ( reason == "org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException" ): - raise SparkConnectTempTableAlreadyExistsException( + raise TempTableAlreadyExistsException( info.metadata["message"], plan=info.metadata["plan"] ) from None elif reason == "java.lang.IllegalArgumentException": message = info.metadata["message"] message = message if message != "" else status.message - raise SparkConnectIllegalArgumentException(message) from None + raise IllegalArgumentException(message) from None + elif reason == "org.apache.spark.api.python.PythonException": + message = info.metadata["message"] + message = message if message != "" else status.message + raise PythonException(message) from None else: raise SparkConnectGrpcException( status.message, reason=info.reason diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 1c9e740474b..d4984b1ba67 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -37,10 +37,7 @@ from typing import ( import numpy as np -from pyspark.errors.exceptions import ( - PySparkTypeError, - PySparkValueError, -) +from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.sql.connect.column import Column from pyspark.sql.connect.expressions import ( CaseWhen, diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index c3c83d48bd9..3c47ebfb973 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -41,7 +41,7 @@ from pyspark.sql.dataframe import DataFrame from pyspark.sql.readwriter import DataFrameReader from pyspark.sql.streaming import DataStreamReader from pyspark.sql.udf import UDFRegistration # noqa: F401 -from pyspark.errors.exceptions import install_exception_handler +from pyspark.errors.exceptions.captured import install_exception_handler from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.sql.types import AtomicType, DataType, StructType diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 8bee517de6a..39a3f036cc3 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -38,10 +38,7 @@ from typing import ( ) from pyspark import SparkContext -from pyspark.errors.exceptions import ( - PySparkTypeError, - PySparkValueError, -) +from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.rdd import PythonEvalType from pyspark.sql.column import Column, _to_java_column, _to_seq, _create_column_from_literal from pyspark.sql.dataframe import DataFrame diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 36ad1500687..942e1da95c8 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -60,7 +60,7 @@ from pyspark.sql.types import ( _parse_datatype_string, _from_numpy_type, ) -from pyspark.errors.exceptions import install_exception_handler +from pyspark.errors.exceptions.captured import install_exception_handler from pyspark.sql.utils import is_timestamp_ntz_preferred, to_str if TYPE_CHECKING: diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py index a577e99d0c2..3c43628bf37 100644 --- a/python/pyspark/sql/streaming/query.py +++ b/python/pyspark/sql/streaming/query.py @@ -22,6 +22,9 @@ from typing import Any, Dict, List, Optional from py4j.java_gateway import JavaObject, java_import from pyspark.errors import StreamingQueryException +from pyspark.errors.exceptions.captured import ( + StreamingQueryException as CapturedStreamingQueryException, +) from pyspark.sql.streaming.listener import StreamingQueryListener __all__ = ["StreamingQuery", "StreamingQueryManager"] @@ -387,7 +390,7 @@ class StreamingQuery: je = self._jsq.exception().get() msg = je.toString().split(": ", 1)[1] # Drop the Java StreamingQueryException type info stackTrace = "\n\t at ".join(map(lambda x: x.toString(), je.getStackTrace())) - return StreamingQueryException(msg, stackTrace, je.getCause()) + return CapturedStreamingQueryException(msg, stackTrace, je.getCause()) else: return None diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index a9beb71545d..eebfaaa39d8 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -49,11 +49,11 @@ from pyspark.testing.connectutils import ( connect_requirement_message, ) from pyspark.testing.pandasutils import PandasOnSparkTestUtils -from pyspark.errors import ( +from pyspark.errors.exceptions.connect import ( + AnalysisException, + ParseException, SparkConnectException, - SparkConnectAnalysisException, - SparkConnectParseException, - SparkConnectTempTableAlreadyExistsException, + TempTableAlreadyExistsException, ) if should_test_connect: @@ -223,7 +223,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): def test_error_handling(self): # SPARK-41533 Proper error handling for Spark Connect df = self.connect.range(10).select("id2") - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): df.collect() def test_simple_read(self): @@ -472,7 +472,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ): self.connect.createDataFrame(data, ["a", "b", "c", "d", "e"]) - with self.assertRaises(SparkConnectParseException): + with self.assertRaises(ParseException): self.connect.createDataFrame( data, "col1 magic_type, col2 int, col3 int, col4 int" ).show() @@ -518,7 +518,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ): self.connect.createDataFrame(data, ["a", "b", "c", "d", "e"]) - with self.assertRaises(SparkConnectParseException): + with self.assertRaises(ParseException): self.connect.createDataFrame( data, "col1 magic_type, col2 int, col3 int, col4 int" ).show() @@ -1005,7 +1005,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): # incompatible field nullability schema = StructType([StructField("id", LongType(), False)]) self.assertRaisesRegex( - SparkConnectAnalysisException, + AnalysisException, "NULLABLE_COLUMN_OR_FIELD", lambda: cdf.to(schema).toPandas(), ) @@ -1013,7 +1013,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): # field cannot upcast schema = StructType([StructField("name", LongType())]) self.assertRaisesRegex( - SparkConnectAnalysisException, + AnalysisException, "INVALID_COLUMN_OR_FIELD_DATA_TYPE", lambda: cdf.to(schema).toPandas(), ) @@ -1025,7 +1025,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ] ) self.assertRaisesRegex( - SparkConnectAnalysisException, + AnalysisException, "INVALID_COLUMN_OR_FIELD_DATA_TYPE", lambda: cdf.to(schema).toPandas(), ) @@ -1244,7 +1244,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): # Test when creating a view which is already exists but self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1")) - with self.assertRaises(SparkConnectTempTableAlreadyExistsException): + with self.assertRaises(TempTableAlreadyExistsException): self.connect.sql("SELECT 1 AS X LIMIT 0").createGlobalTempView("view_1") def test_create_session_local_temp_view(self): @@ -1256,7 +1256,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): self.assertEqual(self.connect.sql("SELECT * FROM view_local_temp").count(), 0) # Test when creating a view which is already exists but - with self.assertRaises(SparkConnectTempTableAlreadyExistsException): + with self.assertRaises(TempTableAlreadyExistsException): self.connect.sql("SELECT 1 AS X LIMIT 0").createTempView("view_local_temp") def test_to_pandas(self): @@ -1499,7 +1499,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): self.connect.sql(query).replace({None: 1}, subset="a").toPandas() self.assertTrue("Mixed type replacements are not supported" in str(context.exception)) - with self.assertRaises(SparkConnectAnalysisException) as context: + with self.assertRaises(AnalysisException) as context: self.connect.sql(query).replace({1: 2, 3: -1}, subset=("a", "x")).toPandas() self.assertIn( """Cannot resolve column name "x" among (a, b, c)""", str(context.exception) @@ -1607,7 +1607,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ) # Hint with unsupported parameter values - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): self.connect.read.table(self.tbl_name).hint("REPARTITION", "id+1").toPandas() # Hint with unsupported parameter types @@ -1621,7 +1621,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ).toPandas() # Hint with wrong combination - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): self.connect.read.table(self.tbl_name).hint("REPARTITION", "id", 3).toPandas() def test_join_hint(self): @@ -1950,7 +1950,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ) # repartition with unsupported parameter values - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): self.connect.read.table(self.tbl_name).repartition("id+1").toPandas() def test_repartition_by_range(self) -> None: @@ -1974,7 +1974,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): ) # repartitionByRange with unsupported parameter values - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): self.connect.read.table(self.tbl_name).repartitionByRange("id+1").toPandas() def test_agg_with_two_agg_exprs(self) -> None: diff --git a/python/pyspark/sql/tests/connect/test_connect_column.py b/python/pyspark/sql/tests/connect/test_connect_column.py index 624bdf4f539..a2c786db180 100644 --- a/python/pyspark/sql/tests/connect/test_connect_column.py +++ b/python/pyspark/sql/tests/connect/test_connect_column.py @@ -41,7 +41,7 @@ from pyspark.sql.types import ( DecimalType, BooleanType, ) -from pyspark.errors import SparkConnectException +from pyspark.errors.exceptions.connect import SparkConnectException from pyspark.testing.connectutils import should_test_connect from pyspark.sql.tests.connect.test_connect_basic import SparkConnectSQLTestCase diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index e3e668eb835..243153be083 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -23,7 +23,7 @@ from pyspark.sql.types import StringType, StructType, StructField, ArrayType, In from pyspark.testing.pandasutils import PandasOnSparkTestUtils from pyspark.testing.connectutils import ReusedConnectTestCase from pyspark.testing.sqlutils import SQLTestUtils -from pyspark.errors import SparkConnectAnalysisException, SparkConnectException +from pyspark.errors.exceptions.connect import AnalysisException, SparkConnectException class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, SQLTestUtils): @@ -899,7 +899,7 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S cdf.select(CF.rank().over(cdf.a)) # invalid window function - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select(cdf.b.over(CW.orderBy("b"))).show() # invalid window frame @@ -913,34 +913,34 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S CF.lead("c", 1), CF.ntile(1), ]: - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.currentRow, CW.currentRow + 123)) ).show() - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.currentRow, CW.currentRow + 123)) ).show() - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.unboundedPreceding, CW.currentRow)) ).show() # Function 'cume_dist' requires Windowframe(RangeFrame, UnboundedPreceding, CurrentRow) ccol = CF.cume_dist() - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rangeBetween(CW.currentRow, CW.currentRow + 123)) ).show() - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.currentRow, CW.currentRow + 123)) ).show() - with self.assertRaises(SparkConnectAnalysisException): + with self.assertRaises(AnalysisException): cdf.select( ccol.over(CW.orderBy("b").rowsBetween(CW.unboundedPreceding, CW.currentRow)) ).show() diff --git a/python/pyspark/sql/tests/connect/test_parity_column.py b/python/pyspark/sql/tests/connect/test_parity_column.py index e1576389351..5cce063871a 100644 --- a/python/pyspark/sql/tests/connect/test_parity_column.py +++ b/python/pyspark/sql/tests/connect/test_parity_column.py @@ -32,7 +32,7 @@ from pyspark.testing.connectutils import ReusedConnectTestCase class ColumnParityTests(ColumnTestsMixin, ReusedConnectTestCase): - # TODO(SPARK-42017): Different error type AnalysisException vs SparkConnectAnalysisException + # TODO(SPARK-42017): df["bad_key"] does not raise AnalysisException @unittest.skip("Fails in Spark Connect, should enable.") def test_access_column(self): super().test_access_column() diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py index d3807285f3e..7e6735cb7cd 100644 --- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py +++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py @@ -85,6 +85,11 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase): def test_same_semantics_error(self): super().test_same_semantics_error() + # TODO(SPARK-42338): Different exception in DataFrame.sample + @unittest.skip("Fails in Spark Connect, should enable.") + def test_sample(self): + super().test_sample() + @unittest.skip("Spark Connect does not support RDD but the tests depend on them.") def test_toDF_with_schema_string(self): super().test_toDF_with_schema_string() diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py b/python/pyspark/sql/tests/connect/test_parity_functions.py index b151986cb24..3d390c13913 100644 --- a/python/pyspark/sql/tests/connect/test_parity_functions.py +++ b/python/pyspark/sql/tests/connect/test_parity_functions.py @@ -17,11 +17,15 @@ import unittest +from pyspark.errors.exceptions.connect import SparkConnectException from pyspark.sql.tests.test_functions import FunctionsTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase class FunctionsParityTests(FunctionsTestsMixin, ReusedConnectTestCase): + def test_assert_true(self): + self.check_assert_true(SparkConnectException) + @unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") def test_basic_functions(self): super().test_basic_functions() @@ -49,6 +53,9 @@ class FunctionsParityTests(FunctionsTestsMixin, ReusedConnectTestCase): def test_lit_np_scalar(self): super().test_lit_np_scalar() + def test_raise_error(self): + self.check_assert_true(SparkConnectException) + # Comparing column type of connect and pyspark @unittest.skip("Fails in Spark Connect, should enable.") def test_sorting_functions_with_column(self): diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py index b4d1a9dd31a..4b1ce0a9587 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py @@ -19,9 +19,6 @@ import unittest from pyspark.sql.tests.pandas.test_pandas_udf import PandasUDFTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase -from pyspark.errors.exceptions import SparkConnectGrpcException -from pyspark.sql.connect.functions import udf -from pyspark.sql.functions import pandas_udf, PandasUDFType class PandasUDFParityTests(PandasUDFTestsMixin, ReusedConnectTestCase): @@ -53,24 +50,10 @@ class PandasUDFParityTests(PandasUDFTestsMixin, ReusedConnectTestCase): def test_pandas_udf_basic(self): super().test_pandas_udf_basic() - def test_stopiteration_in_udf(self): - # The vanilla PySpark throws PythonException instead. - def foo(x): - raise StopIteration() - - exc_message = "Caught StopIteration thrown from user's code; failing the task" - df = self.spark.range(0, 100) - - self.assertRaisesRegex( - SparkConnectGrpcException, exc_message, df.withColumn("v", udf(foo)("id")).collect - ) - - # pandas scalar udf - self.assertRaisesRegex( - SparkConnectGrpcException, - exc_message, - df.withColumn("v", pandas_udf(foo, "double", PandasUDFType.SCALAR)("id")).collect, - ) + # TODO(SPARK-42340): implement GroupedData.applyInPandas + @unittest.skip("Fails in Spark Connect, should enable.") + def test_stopiteration_in_grouped_map(self): + super().test_stopiteration_in_grouped_map() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/connect/test_parity_udf.py b/python/pyspark/sql/tests/connect/test_parity_udf.py index f74d21f0d86..8d4bb69bf16 100644 --- a/python/pyspark/sql/tests/connect/test_parity_udf.py +++ b/python/pyspark/sql/tests/connect/test_parity_udf.py @@ -27,9 +27,6 @@ if should_test_connect: # test_udf_with_partial_function from pyspark.sql.tests.test_udf import BaseUDFTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase -from pyspark.errors.exceptions import SparkConnectAnalysisException -from pyspark.sql.connect.functions import udf -from pyspark.sql.types import BooleanType class UDFParityTests(BaseUDFTestsMixin, ReusedConnectTestCase): @@ -182,26 +179,6 @@ class UDFParityTests(BaseUDFTestsMixin, ReusedConnectTestCase): def test_udf_in_subquery(self): super().test_udf_in_subquery() - def test_udf_not_supported_in_join_condition(self): - # The vanilla PySpark throws AnalysisException instead. - # test python udf is not supported in join type except inner join. - left = self.spark.createDataFrame([(1, 1, 1), (2, 2, 2)], ["a", "a1", "a2"]) - right = self.spark.createDataFrame([(1, 1, 1), (1, 3, 1)], ["b", "b1", "b2"]) - f = udf(lambda a, b: a == b, BooleanType()) - - def runWithJoinType(join_type, type_string): - with self.assertRaisesRegex( - SparkConnectAnalysisException, - """Python UDF in the ON clause of a %s JOIN.""" % type_string, - ): - left.join(right, [f("a", "b"), left.a1 == right.b1], join_type).collect() - - runWithJoinType("full", "FULL OUTER") - runWithJoinType("left", "LEFT OUTER") - runWithJoinType("right", "RIGHT OUTER") - runWithJoinType("leftanti", "LEFT ANTI") - runWithJoinType("leftsemi", "LEFT SEMI") - if __name__ == "__main__": import unittest diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf.py b/python/pyspark/sql/tests/pandas/test_pandas_udf.py index 768317ab60d..1b3b4555d7f 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf.py @@ -171,9 +171,6 @@ class PandasUDFTestsMixin: def foo(x): raise StopIteration() - def foofoo(x, y): - raise StopIteration() - exc_message = "Caught StopIteration thrown from user's code; failing the task" df = self.spark.range(0, 100) @@ -189,6 +186,16 @@ class PandasUDFTestsMixin: df.withColumn("v", pandas_udf(foo, "double", PandasUDFType.SCALAR)("id")).collect, ) + def test_stopiteration_in_grouped_map(self): + def foo(x): + raise StopIteration() + + def foofoo(x, y): + raise StopIteration() + + exc_message = "Caught StopIteration thrown from user's code; failing the task" + df = self.spark.range(0, 100) + # pandas grouped map self.assertRaisesRegex( PythonException, @@ -204,6 +211,13 @@ class PandasUDFTestsMixin: .collect, ) + def test_stopiteration_in_grouped_agg(self): + def foo(x): + raise StopIteration() + + exc_message = "Caught StopIteration thrown from user's code; failing the task" + df = self.spark.range(0, 100) + # pandas grouped agg self.assertRaisesRegex( PythonException, diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 5470f79ff22..9f02ae848bf 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -254,7 +254,7 @@ class StreamingTests(ReusedSQLTestCase): self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") finally: sq.stop() - self.assertTrue(type(sq.exception()) is StreamingQueryException) + self.assertIsInstance(sq.exception(), StreamingQueryException) self._assert_exception_tree_contains_msg(sq.exception(), "ZeroDivisionError") def _assert_exception_tree_contains_msg(self, exception, msg): diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 033878470e1..1d52602a96f 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -46,8 +46,6 @@ from pyspark.sql.types import ( from pyspark.errors import ( AnalysisException, IllegalArgumentException, - SparkConnectException, - SparkConnectAnalysisException, PySparkTypeError, ) from pyspark.testing.sqlutils import ( @@ -948,8 +946,7 @@ class DataFrameTestsMixin: self.assertRaises(TypeError, lambda: self.spark.range(1).sample(seed="abc")) self.assertRaises( - (IllegalArgumentException, SparkConnectException), - lambda: self.spark.range(1).sample(-1.0).count(), + IllegalArgumentException, lambda: self.spark.range(1).sample(-1.0).count() ) def test_toDF_with_schema_string(self): @@ -1041,17 +1038,17 @@ class DataFrameTestsMixin: self.assertFalse(spark.catalog.isCached("tab1")) self.assertFalse(spark.catalog.isCached("tab2")) self.assertRaisesRegex( - Exception, + AnalysisException, "does_not_exist", lambda: spark.catalog.isCached("does_not_exist"), ) self.assertRaisesRegex( - Exception, + AnalysisException, "does_not_exist", lambda: spark.catalog.cacheTable("does_not_exist"), ) self.assertRaisesRegex( - Exception, + AnalysisException, "does_not_exist", lambda: spark.catalog.uncacheTable("does_not_exist"), ) @@ -1595,17 +1592,13 @@ class DataFrameTestsMixin: # incompatible field nullability schema4 = StructType([StructField("j", LongType(), False)]) self.assertRaisesRegex( - (AnalysisException, SparkConnectAnalysisException), - "NULLABLE_COLUMN_OR_FIELD", - lambda: df.to(schema4).count(), + AnalysisException, "NULLABLE_COLUMN_OR_FIELD", lambda: df.to(schema4).count() ) # field cannot upcast schema5 = StructType([StructField("i", LongType())]) self.assertRaisesRegex( - (AnalysisException, SparkConnectAnalysisException), - "INVALID_COLUMN_OR_FIELD_DATA_TYPE", - lambda: df.to(schema5).count(), + AnalysisException, "INVALID_COLUMN_OR_FIELD_DATA_TYPE", lambda: df.to(schema5).count() ) def test_repartition(self): diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 05492347755..d8343b4fb47 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -25,7 +25,7 @@ import math import unittest from py4j.protocol import Py4JJavaError -from pyspark.errors import PySparkTypeError, PySparkValueError, SparkConnectException +from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.sql import Row, Window, types from pyspark.sql.functions import ( udf, @@ -1056,6 +1056,9 @@ class FunctionsTestsMixin: self.assertEqual(date(2017, 1, 22), parse_result["to_date(dateCol)"]) def test_assert_true(self): + self.check_assert_true(Py4JJavaError) + + def check_assert_true(self, tpe): from pyspark.sql.functions import assert_true df = self.spark.range(3) @@ -1065,10 +1068,10 @@ class FunctionsTestsMixin: [Row(val=None), Row(val=None), Row(val=None)], ) - with self.assertRaisesRegex((Py4JJavaError, SparkConnectException), "too big"): + with self.assertRaisesRegex(tpe, "too big"): df.select(assert_true(df.id < 2, "too big")).toDF("val").collect() - with self.assertRaisesRegex((Py4JJavaError, SparkConnectException), "2000000"): + with self.assertRaisesRegex(tpe, "2000000"): df.select(assert_true(df.id < 2, df.id * 1e6)).toDF("val").collect() with self.assertRaises(PySparkTypeError) as pe: @@ -1081,14 +1084,17 @@ class FunctionsTestsMixin: ) def test_raise_error(self): + self.check_raise_error(Py4JJavaError) + + def check_raise_error(self, tpe): from pyspark.sql.functions import raise_error df = self.spark.createDataFrame([Row(id="foobar")]) - with self.assertRaisesRegex((Py4JJavaError, SparkConnectException), "foobar"): + with self.assertRaisesRegex(tpe, "foobar"): df.select(raise_error(df.id)).collect() - with self.assertRaisesRegex((Py4JJavaError, SparkConnectException), "barfoo"): + with self.assertRaisesRegex(tpe, "barfoo"): df.select(raise_error("barfoo")).collect() with self.assertRaises(PySparkTypeError) as pe: diff --git a/python/setup.py b/python/setup.py index af5c5f9384c..ead1139f8f8 100755 --- a/python/setup.py +++ b/python/setup.py @@ -266,6 +266,7 @@ try: "pyspark.licenses", "pyspark.resource", "pyspark.errors", + "pyspark.errors.exceptions", "pyspark.examples.src.main.python", ], include_package_data=True, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org