This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ad454f46d85 [SPARK-42992][PYTHON] Introduce PySparkRuntimeError ad454f46d85 is described below commit ad454f46d85e17dc59991ecd6c3326a8c6de8ea3 Author: itholic <haejoon....@databricks.com> AuthorDate: Wed Apr 26 13:51:52 2023 +0800 [SPARK-42992][PYTHON] Introduce PySparkRuntimeError ### What changes were proposed in this pull request? This PR proposes to introduce new error for PySpark, and also applied it to existing RuntimeError under `python/pyspark/*.py`. ### Why are the changes needed? To cover the built-in RuntimeError by PySpark error framework. ### Does this PR introduce _any_ user-facing change? No, it's internal error framework improvement. ### How was this patch tested? The existing CI should pass. Closes #40617 from itholic/pyspark_runtime_error. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/accumulators.py | 15 ++- python/pyspark/broadcast.py | 23 +++- python/pyspark/conf.py | 6 +- python/pyspark/context.py | 40 +++--- python/pyspark/errors/__init__.py | 2 + python/pyspark/errors/error_classes.py | 137 +++++++++++++++++++++ python/pyspark/errors/exceptions/base.py | 6 + python/pyspark/errors/utils.py | 4 +- python/pyspark/java_gateway.py | 18 ++- python/pyspark/profiler.py | 6 +- python/pyspark/rdd.py | 33 +++-- .../sql/tests/pandas/test_pandas_grouped_map.py | 2 +- python/pyspark/taskcontext.py | 30 +++-- python/pyspark/tests/test_context.py | 2 +- python/pyspark/tests/test_profiler.py | 27 ++-- python/pyspark/util.py | 13 +- python/pyspark/worker.py | 75 ++++++----- 17 files changed, 346 insertions(+), 93 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index ce4bb561814..dc8520a844d 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -23,6 +23,7 @@ import threading from typing import Callable, Dict, Generic, Tuple, Type, TYPE_CHECKING, TypeVar, Union from pyspark.serializers import read_int, CPickleSerializer +from pyspark.errors import PySparkRuntimeError if TYPE_CHECKING: from pyspark._typing import SupportsIAdd # noqa: F401 @@ -140,14 +141,24 @@ class Accumulator(Generic[T]): def value(self) -> T: """Get the accumulator's value; only usable in driver program""" if self._deserialized: - raise RuntimeError("Accumulator.value cannot be accessed inside tasks") + raise PySparkRuntimeError( + error_class="VALUE_NOT_ACCESSIBLE", + message_parameters={ + "value": "Accumulator.value", + }, + ) return self._value @value.setter def value(self, value: T) -> None: """Sets the accumulator's value; only usable in driver program""" if self._deserialized: - raise RuntimeError("Accumulator.value cannot be accessed inside tasks") + raise PySparkRuntimeError( + error_class="VALUE_NOT_ACCESSIBLE", + message_parameters={ + "value": "Accumulator.value", + }, + ) self._value = value def add(self, term: T) -> None: diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index b1a9b790af5..a72bf1e059b 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -40,6 +40,7 @@ from typing.io import BinaryIO # type: ignore[import] from pyspark.java_gateway import local_connect_and_auth from pyspark.serializers import ChunkedStream, pickle_protocol from pyspark.util import print_exec +from pyspark.errors import PySparkRuntimeError if TYPE_CHECKING: from pyspark import SparkContext @@ -58,7 +59,12 @@ def _from_id(bid: int) -> "Broadcast[Any]": from pyspark.broadcast import _broadcastRegistry if bid not in _broadcastRegistry: - raise RuntimeError("Broadcast variable '%s' not loaded!" % bid) + raise PySparkRuntimeError( + error_class="BROADCAST_VARIABLE_NOT_LOADED", + message_parameters={ + "variable": str(bid), + }, + ) return _broadcastRegistry[bid] @@ -293,7 +299,10 @@ class Broadcast(Generic[T]): >>> b.unpersist() """ if self._jbroadcast is None: - raise RuntimeError("Broadcast can only be unpersisted in driver") + raise PySparkRuntimeError( + error_class="INVALID_BROADCAST_OPERATION", + message_parameters={"operation": "unpersisted"}, + ) self._jbroadcast.unpersist(blocking) def destroy(self, blocking: bool = False) -> None: @@ -320,13 +329,19 @@ class Broadcast(Generic[T]): >>> b.destroy() """ if self._jbroadcast is None: - raise RuntimeError("Broadcast can only be destroyed in driver") + raise PySparkRuntimeError( + error_class="INVALID_BROADCAST_OPERATION", + message_parameters={"operation": "destroyed"}, + ) self._jbroadcast.destroy(blocking) os.unlink(self._path) def __reduce__(self) -> Tuple[Callable[[int], "Broadcast[T]"], Tuple[int]]: if self._jbroadcast is None: - raise RuntimeError("Broadcast can only be serialized in driver") + raise PySparkRuntimeError( + error_class="INVALID_BROADCAST_OPERATION", + message_parameters={"operation": "serialized"}, + ) assert self._pickle_registry is not None self._pickle_registry.add(self) return _from_id, (self._jbroadcast.id(),) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 1ddc8f5ddaa..dd203e2d26b 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -21,6 +21,7 @@ import sys from typing import Dict, List, Optional, Tuple, cast, overload from py4j.java_gateway import JVMView, JavaObject +from pyspark.errors import PySparkRuntimeError class SparkConf: @@ -182,7 +183,10 @@ class SparkConf: ) -> "SparkConf": """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): - raise RuntimeError("Either pass one key-value pair or a list of pairs") + raise PySparkRuntimeError( + error_class="KEY_VALUE_PAIR_REQUIRED", + message_parameters={}, + ) elif key is not None: self.set("spark.executorEnv.{}".format(key), cast(str, value)) elif pairs is not None: diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c3f18127121..7b89a13b115 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -68,6 +68,7 @@ from pyspark.taskcontext import TaskContext from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker from pyspark.profiler import ProfilerCollector, BasicProfiler, UDFBasicProfiler, MemoryProfiler +from pyspark.errors import PySparkRuntimeError from py4j.java_gateway import is_instance_of, JavaGateway, JavaObject, JVMView if TYPE_CHECKING: @@ -180,8 +181,9 @@ class SparkContext: memory_profiler_cls: Type[MemoryProfiler] = MemoryProfiler, ): if "SPARK_REMOTE" in os.environ and "SPARK_LOCAL_REMOTE" not in os.environ: - raise RuntimeError( - "Remote client cannot create a SparkContext. Create SparkSession instead." + raise PySparkRuntimeError( + error_class="CONTEXT_UNAVAILABLE_FOR_REMOTE_CLIENT", + message_parameters={}, ) if conf is None or conf.get("spark.executor.allowSparkContext", "false").lower() != "true": @@ -266,9 +268,15 @@ class SparkContext: # Check that we have at least the required parameters if not self._conf.contains("spark.master"): - raise RuntimeError("A master URL must be set in your configuration") + raise PySparkRuntimeError( + error_class="MASTER_URL_NOT_SET", + message_parameters={}, + ) if not self._conf.contains("spark.app.name"): - raise RuntimeError("An application name must be set in your configuration") + raise PySparkRuntimeError( + error_class="APPLICATION_NAME_NOT_SET", + message_parameters={}, + ) # Read back our properties from the conf in case we loaded some of them from # the classpath or an external config file @@ -459,10 +467,9 @@ class SparkContext: def __getnewargs__(self) -> NoReturn: # This method is called when attempting to pickle SparkContext, which is always an error: - raise RuntimeError( - "It appears that you are attempting to reference SparkContext from a broadcast " - "variable, action, or transformation. SparkContext can only be used on the driver, " - "not in code that it run on workers. For more information, see SPARK-5063." + raise PySparkRuntimeError( + error_class="CONTEXT_ONLY_VALID_ON_DRIVER", + message_parameters={}, ) def __enter__(self) -> "SparkContext": @@ -2331,9 +2338,9 @@ class SparkContext: if self.profiler_collector is not None: self.profiler_collector.show_profiles() else: - raise RuntimeError( - "'spark.python.profile' or 'spark.python.profile.memory' configuration" - " must be set to 'true' to enable Python profile." + raise PySparkRuntimeError( + error_class="INCORRECT_CONF_FOR_PROFILE", + message_parameters={}, ) def dump_profiles(self, path: str) -> None: @@ -2348,9 +2355,9 @@ class SparkContext: if self.profiler_collector is not None: self.profiler_collector.dump_profiles(path) else: - raise RuntimeError( - "'spark.python.profile' or 'spark.python.profile.memory' configuration" - " must be set to 'true' to enable Python profile." + raise PySparkRuntimeError( + error_class="INCORRECT_CONF_FOR_PROFILE", + message_parameters={}, ) def getConf(self) -> SparkConf: @@ -2387,7 +2394,10 @@ class SparkContext: Throws an exception if a SparkContext is about to be created in executors. """ if TaskContext.get() is not None: - raise RuntimeError("SparkContext should only be created and accessed on the driver.") + raise PySparkRuntimeError( + error_class="CONTEXT_ONLY_VALID_ON_DRIVER", + message_parameters={}, + ) def _test() -> None: diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 05b4f192940..1525f351ea4 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -37,6 +37,7 @@ from pyspark.errors.exceptions.base import ( # noqa: F401 PySparkTypeError, PySparkValueError, PySparkAttributeError, + PySparkRuntimeError, PySparkAssertionError, PySparkNotImplementedError, ) @@ -61,6 +62,7 @@ __all__ = [ "PySparkTypeError", "PySparkValueError", "PySparkAttributeError", + "PySparkRuntimeError", "PySparkAssertionError", "PySparkNotImplementedError", ] diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 046132eb0ce..34efd471707 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -19,6 +19,11 @@ import json ERROR_CLASSES_JSON = """ { + "APPLICATION_NAME_NOT_SET" : { + "message" : [ + "An application name must be set in your configuration." + ] + }, "ARGUMENT_REQUIRED": { "message": [ "Argument `<arg_name>` is required when <condition>." @@ -29,6 +34,16 @@ ERROR_CLASSES_JSON = """ "Attribute `<attr_name>` in provided object `<obj_name>` is not callable." ] }, + "BROADCAST_VARIABLE_NOT_LOADED": { + "message": [ + "Broadcast variable `<variable>` not loaded." + ] + }, + "CALL_BEFORE_INITIALIZE": { + "message": [ + "Not supported to call `<func_name>` before initialize <object>." + ] + }, "CANNOT_ACCESS_TO_DUNDER": { "message": [ "Dunder(double underscore) attribute is for internal use only." @@ -59,6 +74,11 @@ ERROR_CLASSES_JSON = """ "Can not infer Array Type from an list with None as the first element." ] }, + "CANNOT_OPEN_SOCKET": { + "message": [ + "Can not open socket: <errors>." + ] + }, "CANNOT_PARSE_DATATYPE": { "message": [ "Unable to parse datatype from schema. <error>." @@ -84,6 +104,16 @@ ERROR_CLASSES_JSON = """ "`<func_name>` does not allow a Column in a list." ] }, + "CONTEXT_ONLY_VALID_ON_DRIVER" : { + "message" : [ + "It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063." + ] + }, + "CONTEXT_UNAVAILABLE_FOR_REMOTE_CLIENT" : { + "message" : [ + "Remote client cannot create a SparkContext. Create SparkSession instead." + ] + }, "DISALLOWED_TYPE_FOR_CONTAINER" : { "message" : [ "Argument `<arg_name>`(type: <arg_type>) should only contain a type in [<allowed_types>], got <return_type>" @@ -94,6 +124,17 @@ ERROR_CLASSES_JSON = """ "Function `<func_name>` should return Column, got <return_type>." ] }, + "INCORRECT_CONF_FOR_PROFILE" : { + "message" : [ + "`spark.python.profile` or `spark.python.profile.memory` configuration", + " must be set to `true` to enable Python profile." + ] + }, + "INVALID_BROADCAST_OPERATION": { + "message": [ + "Broadcast can only be <operation> in driver." + ] + }, "INVALID_CALL_ON_UNRESOLVED_OBJECT": { "message": [ "Invalid call to `<func_name>` on unresolved object." @@ -124,16 +165,41 @@ ERROR_CLASSES_JSON = """ "when() can only be applied on a Column previously generated by when() function, and cannot be applied once otherwise() is applied." ] }, + "INVALID_WINDOW_BOUND_TYPE" : { + "message" : [ + "Invalid window bound type: <window_bound_type>." + ] + }, + "JAVA_GATEWAY_EXITED" : { + "message" : [ + "Java gateway process exited before sending its port number." + ] + }, "JVM_ATTRIBUTE_NOT_SUPPORTED" : { "message" : [ "Attribute `<attr_name>` is not supported in Spark Connect as it depends on the JVM. If you need to use this attribute, do not use Spark Connect when creating your session." ] }, + "KEY_VALUE_PAIR_REQUIRED" : { + "message" : [ + "Key-value pair or a list of pairs is required." + ] + }, "LENGTH_SHOULD_BE_THE_SAME" : { "message" : [ "<arg1> and <arg2> should be of the same length, got <arg1_length> and <arg2_length>." ] }, + "MASTER_URL_NOT_SET" : { + "message" : [ + "A master URL must be set in your configuration." + ] + }, + "MISSING_LIBRARY_FOR_PROFILER" : { + "message" : [ + "Install the 'memory_profiler' library in the cluster to enable memory profiling." + ] + }, "MISSING_VALID_PLAN" : { "message" : [ "Argument to <operator> does not contain a valid plan." @@ -269,6 +335,11 @@ ERROR_CLASSES_JSON = """ "Argument `<arg_name>` should be an int, got <arg_type>." ] }, + "NOT_IN_BARRIER_STAGE" : { + "message" : [ + "It is not in a barrier stage." + ] + }, "NOT_ITERABLE" : { "message" : [ "<objectName> is not iterable." @@ -354,6 +425,52 @@ ERROR_CLASSES_JSON = """ "Only a single trigger is allowed." ] }, + "PIPE_FUNCTION_EXITED" : { + "message" : [ + "Pipe function `<func_name>` exited with error code <error_code>." + ] + }, + "PYTHON_HASH_SEED_NOT_SET" : { + "message" : [ + "Randomness of hash of string should be disabled via PYTHONHASHSEED." + ] + }, + "PYTHON_VERSION_MISMATCH" : { + "message" : [ + "Python in worker has different version <worker_version> than that in driver <driver_version>, PySpark cannot run with different minor versions.", + "Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set." + ] + }, + "RDD_TRANSFORM_ONLY_VALID_ON_DRIVER" : { + "message" : [ + "It appears that you are attempting to broadcast an RDD or reference an RDD from an ", + "action or transformation. RDD transformations and actions can only be invoked by the ", + "driver, not inside of other transformations; for example, ", + "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values ", + "transformation and count action cannot be performed inside of the rdd1.map ", + "transformation. For more information, see SPARK-5063." + ] + }, + "RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF" : { + "message" : [ + "Column names of the returned pandas.DataFrame do not match specified schema.<missing><extra>" + ] + }, + "RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF" : { + "message" : [ + "Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: <expected> Actual: <actual>" + ] + }, + "RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF" : { + "message" : [ + "The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was <output_length> and the length of input was <input_length>." + ] + }, + "SCHEMA_MISMATCH_FOR_PANDAS_UDF" : { + "message" : [ + "Result vector from pandas_udf was not the required length: expected <expected>, got <actual>." + ] + }, "SLICE_WITH_STEP" : { "message" : [ "Slice with step is not supported." @@ -364,6 +481,21 @@ ERROR_CLASSES_JSON = """ "State is either not defined or has already been removed." ] }, + "STOP_ITERATION_OCCURRED" : { + "message" : [ + "Caught StopIteration thrown from user's code; failing the task: <exc>" + ] + }, + "STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF" : { + "message" : [ + "pandas iterator UDF should exhaust the input iterator." + ] + }, + "UNEXPECTED_RESPONSE_FROM_SERVER" : { + "message" : [ + "Unexpected response from iterator server." + ] + }, "UNSUPPORTED_DATA_TYPE" : { "message" : [ "Unsupported DataType `<data_type>`." @@ -399,6 +531,11 @@ ERROR_CLASSES_JSON = """ "Function `<func_name>` should use only POSITIONAL or POSITIONAL OR KEYWORD arguments." ] }, + "VALUE_NOT_ACCESSIBLE": { + "message": [ + "Value `<value>` cannot be accessed inside tasks." + ] + }, "VALUE_NOT_ANY_OR_ALL" : { "message" : [ "Value for `<arg_name>` must be 'any' or 'all', got '<arg_value>'." diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index 9b229f6376e..543ee9473e3 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -198,6 +198,12 @@ class PySparkAttributeError(PySparkException, AttributeError): """ +class PySparkRuntimeError(PySparkException, RuntimeError): + """ + Wrapper class for RuntimeError to support error classes. + """ + + class PySparkAssertionError(PySparkException, AssertionError): """ Wrapper class for AssertionError to support error classes. diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py index 69a72f86b9f..bbe8d613315 100644 --- a/python/pyspark/errors/utils.py +++ b/python/pyspark/errors/utils.py @@ -72,7 +72,7 @@ class ErrorClassesReader: "message" : [ "Problem <A> because of <B>." ], - "subClass" : { + "sub_class" : { "SUB_ERROR_CLASS" : { "message" : [ "Do <C> to fix the problem." @@ -104,7 +104,7 @@ class ErrorClassesReader: else: # Generate message template for sub error class if exists. sub_error_class = error_classes[1] - main_error_class_subclass_info_map = main_error_class_info_map["subClass"] + main_error_class_subclass_info_map = main_error_class_info_map["sub_class"] if sub_error_class in main_error_class_subclass_info_map: sub_error_class_info_map = main_error_class_subclass_info_map[sub_error_class] else: diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index aee206dd6b3..691120a1312 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -30,6 +30,7 @@ from py4j.java_gateway import java_import, JavaGateway, JavaObject, GatewayParam from py4j.clientserver import ClientServer, JavaParameters, PythonParameters from pyspark.find_spark_home import _find_spark_home from pyspark.serializers import read_int, write_with_length, UTF8Deserializer +from pyspark.errors import PySparkRuntimeError def launch_gateway(conf=None, popen_kwargs=None): @@ -103,7 +104,10 @@ def launch_gateway(conf=None, popen_kwargs=None): time.sleep(0.1) if not os.path.isfile(conn_info_file): - raise RuntimeError("Java gateway process exited before sending its port number") + raise PySparkRuntimeError( + error_class="JAVA_GATEWAY_EXITED", + message_parameters={}, + ) with open(conn_info_file, "rb") as info: gateway_port = read_int(info) @@ -172,7 +176,10 @@ def _do_server_auth(conn, auth_secret): reply = UTF8Deserializer().loads(conn) if reply != "ok": conn.close() - raise RuntimeError("Unexpected reply from iterator server.") + raise PySparkRuntimeError( + error_class="UNEXPECTED_RESPONSE_FROM_SERVER", + message_parameters={}, + ) def local_connect_and_auth(port, auth_secret): @@ -210,7 +217,12 @@ def local_connect_and_auth(port, auth_secret): errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg)) sock.close() sock = None - raise RuntimeError("could not open socket: %s" % errors) + raise PySparkRuntimeError( + error_class="CANNOT_OPEN_SOCKET", + message_parameters={ + "errors": str(errors), + }, + ) def ensure_callback_server_started(gw): diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index cb668b874a2..d7990d2fd1b 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -45,6 +45,7 @@ except Exception: has_memory_profiler = False from pyspark.accumulators import AccumulatorParam +from pyspark.errors import PySparkRuntimeError if TYPE_CHECKING: from pyspark.context import SparkContext @@ -413,8 +414,9 @@ class MemoryProfiler(Profiler): self._accumulator.add(codemap_dict) # type: ignore[arg-type] return ret else: - raise RuntimeError( - "Install the 'memory_profiler' library in the cluster to enable memory profiling." + raise PySparkRuntimeError( + error_class="MISSING_LIBRARY_FOR_PROFILER", + message_parameters={}, ) def stats(self) -> CodeMapDict: diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 06e4facc962..13f93fbdad6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -87,6 +87,7 @@ from pyspark.shuffle import ( ) from pyspark.traceback_utils import SCCallSiteSync from pyspark.util import fail_on_stopiteration, _parse_memory +from pyspark.errors import PySparkRuntimeError if TYPE_CHECKING: @@ -167,7 +168,10 @@ def portable_hash(x: Hashable) -> int: """ if "PYTHONHASHSEED" not in os.environ: - raise RuntimeError("Randomness of hash of string should be disabled via PYTHONHASHSEED") + raise PySparkRuntimeError( + error_class="PYTHON_HASH_SEED_NOT_SET", + message_parameters={}, + ) if x is None: return 0 @@ -368,13 +372,9 @@ class RDD(Generic[T_co]): def __getnewargs__(self) -> NoReturn: # This method is called when attempting to pickle an RDD, which is always an error: - raise RuntimeError( - "It appears that you are attempting to broadcast an RDD or reference an RDD from an " - "action or transformation. RDD transformations and actions can only be invoked by the " - "driver, not inside of other transformations; for example, " - "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values " - "transformation and count action cannot be performed inside of the rdd1.map " - "transformation. For more information, see SPARK-5063." + raise PySparkRuntimeError( + error_class="RDD_TRANSFORM_ONLY_VALID_ON_DRIVER", + message_parameters={}, ) @property @@ -1699,9 +1699,12 @@ class RDD(Generic[T_co]): def check_return_code() -> Iterable[int]: pipe.wait() if checkCode and pipe.returncode: - raise RuntimeError( - "Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode) + raise PySparkRuntimeError( + error_class="PIPE_FUNCTION_EXITED", + message_parameters={ + "func_name": command, + "error_code": str(pipe.returncode), + }, ) else: for i in range(0): @@ -5215,7 +5218,13 @@ class RDD(Generic[T_co]): def toDF( self: "RDD[Any]", schema: Optional[Any] = None, sampleRatio: Optional[float] = None ) -> "DataFrame": - raise RuntimeError("""RDD.toDF was called before SparkSession was initialized.""") + raise PySparkRuntimeError( + error_class="CALL_BEFORE_INITIALIZE", + message_parameters={ + "func_name": "RDD.toDF", + "object": "SparkSession", + }, + ) def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]: diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 0eff994ead7..0a2f81c6e5b 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -596,7 +596,7 @@ class GroupedApplyInPandasTestsMixin: with QuietTest(self.sc): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: id. Unexpected: iid.\n", ): grouped_df.apply(column_name_typo).collect() diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 5e8765e2cbb..329820c3ee0 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -19,6 +19,7 @@ from typing import ClassVar, Type, Dict, List, Optional, Union, cast from pyspark.java_gateway import local_connect_and_auth from pyspark.resource import ResourceInformation from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer +from pyspark.errors import PySparkRuntimeError class TaskContext: @@ -355,7 +356,10 @@ class BarrierTaskContext(TaskContext): This API is experimental """ if not isinstance(cls._taskContext, BarrierTaskContext): - raise RuntimeError("It is not in a barrier stage") + raise PySparkRuntimeError( + error_class="NOT_IN_BARRIER_STAGE", + message_parameters={}, + ) return cls._taskContext @classmethod @@ -386,8 +390,12 @@ class BarrierTaskContext(TaskContext): or a `SparkException` after timeout. """ if self._port is None or self._secret is None: - raise RuntimeError( - "Not supported to call barrier() before initialize BarrierTaskContext." + raise PySparkRuntimeError( + error_class="CALL_BEFORE_INITIALIZE", + message_parameters={ + "func_name": "barrier", + "object": "BarrierTaskContext", + }, ) else: _load_from_socket(self._port, self._secret, BARRIER_FUNCTION) @@ -411,8 +419,12 @@ class BarrierTaskContext(TaskContext): if not isinstance(message, str): raise TypeError("Argument `message` must be of type `str`") elif self._port is None or self._secret is None: - raise RuntimeError( - "Not supported to call barrier() before initialize BarrierTaskContext." + raise PySparkRuntimeError( + error_class="CALL_BEFORE_INITIALIZE", + message_parameters={ + "func_name": "allGather", + "object": "BarrierTaskContext", + }, ) else: return _load_from_socket(self._port, self._secret, ALL_GATHER_FUNCTION, message) @@ -438,8 +450,12 @@ class BarrierTaskContext(TaskContext): '...:...' """ if self._port is None or self._secret is None: - raise RuntimeError( - "Not supported to call getTaskInfos() before initialize " + "BarrierTaskContext." + raise PySparkRuntimeError( + error_class="CALL_BEFORE_INITIALIZE", + message_parameters={ + "func_name": "getTaskInfos", + "object": "BarrierTaskContext", + }, ) else: addresses = cast(Dict[str, str], self._localProperties).get("addresses", "") diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index d819656f3b7..0a9628977af 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -287,7 +287,7 @@ class ContextTests(unittest.TestCase): with self.assertRaises(Exception) as context: sc.range(2).foreach(lambda _: SparkContext()) self.assertIn( - "SparkContext should only be created and accessed on the driver.", + "CONTEXT_ONLY_VALID_ON_DRIVER", str(context.exception), ) diff --git a/python/pyspark/tests/test_profiler.py b/python/pyspark/tests/test_profiler.py index 58a486cbbcc..018387012fc 100644 --- a/python/pyspark/tests/test_profiler.py +++ b/python/pyspark/tests/test_profiler.py @@ -25,8 +25,8 @@ from pyspark import SparkConf, SparkContext, BasicProfiler from pyspark.profiler import has_memory_profiler from pyspark.sql import SparkSession from pyspark.sql.functions import udf -from pyspark.errors import PythonException -from pyspark.testing.utils import PySparkTestCase +from pyspark.errors import PythonException, PySparkRuntimeError +from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils class ProfilerTests(PySparkTestCase): @@ -84,7 +84,7 @@ class ProfilerTests(PySparkTestCase): rdd.foreach(heavy_foo) -class ProfilerTests2(unittest.TestCase): +class ProfilerTests2(unittest.TestCase, PySparkErrorTestUtils): def test_profiler_disabled(self): sc = SparkContext( conf=SparkConf() @@ -92,15 +92,20 @@ class ProfilerTests2(unittest.TestCase): .set("spark.python.profile.memory", "false") ) try: - self.assertRaisesRegex( - RuntimeError, - "'spark.python.profile' or 'spark.python.profile.memory' configuration must be set", - lambda: sc.show_profiles(), + with self.assertRaises(PySparkRuntimeError) as pe: + sc.show_profiles() + self.check_error( + exception=pe.exception, + error_class="INCORRECT_CONF_FOR_PROFILE", + message_parameters={}, ) - self.assertRaisesRegex( - RuntimeError, - "'spark.python.profile' or 'spark.python.profile.memory' configuration must be set", - lambda: sc.dump_profiles("/tmp/abc"), + + with self.assertRaises(PySparkRuntimeError) as pe: + sc.dump_profiles("/tmp/abc") + self.check_error( + exception=pe.exception, + error_class="INCORRECT_CONF_FOR_PROFILE", + message_parameters={}, ) finally: sc.stop() diff --git a/python/pyspark/util.py b/python/pyspark/util.py index b7b972a5d35..5232c929e16 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -27,6 +27,8 @@ import traceback from types import TracebackType from typing import Any, Callable, Iterator, List, Optional, TextIO, Tuple +from pyspark.errors import PySparkRuntimeError + from py4j.clientserver import ClientServer __all__: List[str] = [] @@ -80,8 +82,11 @@ def fail_on_stopiteration(f: Callable) -> Callable: try: return f(*args, **kwargs) except StopIteration as exc: - raise RuntimeError( - "Caught StopIteration thrown from user's code; failing the task", exc + raise PySparkRuntimeError( + error_class="STOP_ITERATION_OCCURRED", + message_parameters={ + "exc": str(exc), + }, ) return wrapper @@ -152,7 +157,7 @@ def try_simplify_traceback(tb: TracebackType) -> Optional[TracebackType]: ... File "/.../pyspark/util.py", line ... ... - RuntimeError: ... + pyspark.errors.exceptions.base.PySparkRuntimeError: ... >>> "pyspark/util.py" in exc_info True @@ -168,7 +173,7 @@ def try_simplify_traceback(tb: TracebackType) -> Optional[TracebackType]: ... traceback.format_exception( ... type(e), e, try_simplify_traceback(skip_doctest_traceback(tb)))) >>> print(exc_info) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS - RuntimeError: ... + pyspark.errors.exceptions.base.PySparkRuntimeError: ... >>> "pyspark/util.py" in exc_info False diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c9d9ea866a8..bc40e5fc4ef 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -64,6 +64,7 @@ from pyspark.sql.pandas.types import to_arrow_type from pyspark.sql.types import StructType from pyspark.util import fail_on_stopiteration, try_simplify_traceback from pyspark import shuffle +from pyspark.errors import PySparkRuntimeError pickleSer = CPickleSerializer() utf8_deserializer = UTF8Deserializer() @@ -117,9 +118,12 @@ def wrap_scalar_pandas_udf(f, return_type): def verify_result_length(result, length): if len(result) != length: - raise RuntimeError( - "Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (length, len(result)) + raise PySparkRuntimeError( + error_class="SCHEMA_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "expected": str(length), + "actual": str(len(result)), + }, ) return result @@ -173,16 +177,21 @@ def verify_pandas_result(result, return_type, assign_cols_by_name): extra = sorted(list(column_names.difference(field_names))) extra = f" Unexpected: {', '.join(extra)}." if extra else "" - raise RuntimeError( - "Column names of the returned pandas.DataFrame do not match specified schema." - "{}{}".format(missing, extra) + raise PySparkRuntimeError( + error_class="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "missing": missing, + "extra": extra, + }, ) # otherwise the number of columns of result have to match the return type elif len(result.columns) != len(return_type): - raise RuntimeError( - "Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. " - "Expected: {} Actual: {}".format(len(return_type), len(result.columns)) + raise PySparkRuntimeError( + error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "expected": str(len(return_type)), + "actual": str(len(result.columns)), + }, ) @@ -278,10 +287,12 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type): len(result.columns) == len(return_type) or (len(result.columns) == 0 and result.empty) ): - raise RuntimeError( - "Number of columns of the element (pandas.DataFrame) in return iterator " - "doesn't match specified schema. " - "Expected: {} Actual: {}".format(len(return_type), len(result.columns)) + raise PySparkRuntimeError( + error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "expected": str(len(return_type)), + "actual": str(len(result.columns)), + }, ) return result @@ -330,7 +341,12 @@ def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index): elif window_bound_type == "unbounded": return wrap_unbounded_window_agg_pandas_udf(f, return_type) else: - raise RuntimeError("Invalid window bound type: {} ".format(window_bound_type)) + raise PySparkRuntimeError( + error_class="INVALID_WINDOW_BOUND_TYPE", + message_parameters={ + "window_bound_type": window_bound_type, + }, + ) def wrap_unbounded_window_agg_pandas_udf(f, return_type): @@ -554,13 +570,18 @@ def read_udfs(pickleSer, infile, eval_type): except StopIteration: pass else: - raise RuntimeError("pandas iterator UDF should exhaust the input " "iterator.") + raise PySparkRuntimeError( + error_class="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF", + message_parameters={}, + ) if num_output_rows != num_input_rows: - raise RuntimeError( - "The length of output in Scalar iterator pandas UDF should be " - "the same with the input's; however, the length of output was %d and the " - "length of input was %d." % (num_output_rows, num_input_rows) + raise PySparkRuntimeError( + error_class="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF", + message_parameters={ + "output_length": str(num_output_rows), + "input_length": str(num_input_rows), + }, ) # profiling is not supported for UDF @@ -700,14 +721,12 @@ def main(infile, outfile): version = utf8_deserializer.loads(infile) if version != "%d.%d" % sys.version_info[:2]: - raise RuntimeError( - ( - "Python in worker has different version %s than that in " - + "driver %s, PySpark cannot run with different minor versions. " - + "Please check environment variables PYSPARK_PYTHON and " - + "PYSPARK_DRIVER_PYTHON are correctly set." - ) - % ("%d.%d" % sys.version_info[:2], version) + raise PySparkRuntimeError( + error_class="PYTHON_VERSION_MISMATCH", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "driver_version": str(version), + }, ) # read inputs only for a barrier task --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org