This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fbb4bcad088a [SPARK-46226][PYTHON] Migrate all remaining
`RuntimeError` into PySpark error framework
fbb4bcad088a is described below
commit fbb4bcad088a012a8f05d664aa6e473bb2419d4f
Author: Haejoon Lee <[email protected]>
AuthorDate: Wed Dec 6 10:50:06 2023 +0900
[SPARK-46226][PYTHON] Migrate all remaining `RuntimeError` into PySpark
error framework
### What changes were proposed in this pull request?
This PR proposes to migrate all remaining `RuntimeError` from
`pyspark/sql/*` into PySpark error framework, `PySparkRuntimeError` with
assigning dedicated error classes. Total 30 `RuntimeError`s are managed.
### Why are the changes needed?
To improve the error handling in PySpark.
### Does this PR introduce _any_ user-facing change?
No API changes, but the user-facing error messages will be improved.
### How was this patch tested?
The existing CI should pass.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44143 from itholic/session_custom_error.
Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/errors/error_classes.py | 80 ++++++++++++++++++
python/pyspark/sql/connect/client/reattach.py | 8 +-
python/pyspark/sql/connect/client/retries.py | 6 +-
python/pyspark/sql/pandas/serializers.py | 9 +-
python/pyspark/sql/pandas/utils.py | 7 +-
python/pyspark/sql/session.py | 114 +++++++++++++-------------
python/pyspark/sql/streaming/state.py | 29 ++++---
python/pyspark/sql/tests/test_types.py | 55 +++++++++++--
python/pyspark/sql/types.py | 16 +++-
python/pyspark/sql/udf.py | 11 ++-
python/pyspark/sql/utils.py | 11 ++-
python/pyspark/tests/test_profiler.py | 15 ++--
12 files changed, 262 insertions(+), 99 deletions(-)
diff --git a/python/pyspark/errors/error_classes.py
b/python/pyspark/errors/error_classes.py
index e0ca8a938ec2..7dd5cd927052 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -29,6 +29,11 @@ ERROR_CLASSES_JSON = """
"Argument `<arg_name>` is required when <condition>."
]
},
+ "ARROW_LEGACY_IPC_FORMAT": {
+ "message": [
+ "Arrow legacy IPC format is not supported in PySpark, please unset
ARROW_PRE_0_15_IPC_FORMAT."
+ ]
+ },
"ATTRIBUTE_NOT_CALLABLE" : {
"message" : [
"Attribute `<attr_name>` in provided object `<obj_name>` is not
callable."
@@ -79,6 +84,16 @@ ERROR_CLASSES_JSON = """
"Argument `<arg_name>` cannot be None."
]
},
+ "CANNOT_CONFIGURE_SPARK_CONNECT": {
+ "message": [
+ "Spark Connect server cannot be configured with Spark master; however,
found URL for Spark master [<url>]."
+ ]
+ },
+ "CANNOT_CONFIGURE_SPARK_MASTER": {
+ "message": [
+ "Spark master cannot be configured with Spark Connect server; however,
found URL for Spark Connect [<url>]."
+ ]
+ },
"CANNOT_CONVERT_COLUMN_INTO_BOOL": {
"message": [
"Cannot convert column into bool: please use '&' for 'and', '|' for
'or', '~' for 'not' when building DataFrame boolean expressions."
@@ -149,11 +164,26 @@ ERROR_CLASSES_JSON = """
"returnType can not be specified when `<arg_name>` is a user-defined
function, but got <return_type>."
]
},
+ "CANNOT_WITHOUT": {
+ "message": [
+ "Cannot <condition1> without <condition2>."
+ ]
+ },
"COLUMN_IN_LIST": {
"message": [
"`<func_name>` does not allow a Column in a list."
]
},
+ "CONNECT_URL_ALREADY_DEFINED" : {
+ "message" : [
+ "Only one Spark Connect client URL can be set; however, got a different
URL [<new_url>] from the existing [<existing_url>]."
+ ]
+ },
+ "CONNECT_URL_NOT_SET" : {
+ "message" : [
+ "Cannot create a Spark Connect session because the Spark Connect remote
URL has not been set. Please define the remote URL by setting either the
'spark.remote' option or the 'SPARK_REMOTE' environment variable."
+ ]
+ },
"CONTEXT_ONLY_VALID_ON_DRIVER" : {
"message" : [
"It appears that you are attempting to reference SparkContext from a
broadcast variable, action, or transformation. SparkContext can only be used on
the driver, not in code that it run on workers. For more information, see
SPARK-5063."
@@ -231,6 +261,11 @@ ERROR_CLASSES_JSON = """
"Duplicated field names in Arrow Struct are not allowed, got
<field_names>"
]
},
+ "ERROR_OCCURRED_WHILE_CALLING" : {
+ "message" : [
+ "An error occurred while calling <func_name>: <error_msg>."
+ ]
+ },
"HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN" : {
"message" : [
"Function `<func_name>` should return Column, got <return_type>."
@@ -272,6 +307,11 @@ ERROR_CLASSES_JSON = """
"Invalid URL for Spark Connect: <detail>"
]
},
+ "INVALID_INTERVAL_CASTING": {
+ "message": [
+ "Interval <start_field> to <end_field> is invalid."
+ ]
+ },
"INVALID_ITEM_FOR_CONTAINER": {
"message": [
"All items in `<arg_name>` should be in <allowed_types>, got
<item_type>."
@@ -657,6 +697,11 @@ ERROR_CLASSES_JSON = """
"Argument `<arg_name>` should be a WindowSpec, got <arg_type>."
]
},
+ "NO_ACTIVE_EXCEPTION" : {
+ "message" : [
+ "No active exception."
+ ]
+ },
"NO_ACTIVE_OR_DEFAULT_SESSION" : {
"message" : [
"No active or default Spark session found. Please create a new Spark
session before running the code."
@@ -682,6 +727,16 @@ ERROR_CLASSES_JSON = """
"Only a single trigger is allowed."
]
},
+ "ONLY_SUPPORTED_WITH_SPARK_CONNECT" : {
+ "message" : [
+ "<feature> is only supported with Spark Connect; however, the current
Spark session does not use Spark Connect."
+ ]
+ },
+ "PACKAGE_NOT_INSTALLED" : {
+ "message" : [
+ "<package_name> >= <minimum_version> must be installed; however, it was
not found."
+ ]
+ },
"PIPE_FUNCTION_EXITED" : {
"message" : [
"Pipe function `<func_name>` exited with error code <error_code>."
@@ -723,6 +778,16 @@ ERROR_CLASSES_JSON = """
"transformation. For more information, see SPARK-5063."
]
},
+ "READ_ONLY" : {
+ "message" : [
+ "<object> is read-only."
+ ]
+ },
+ "RESPONSE_ALREADY_RECEIVED" : {
+ "message" : [
+ "OPERATION_NOT_FOUND on the server but responses were already received
from it."
+ ]
+ },
"RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF" : {
"message" : [
"Column names of the returned pyarrow.Table do not match specified
schema.<missing><extra>"
@@ -773,6 +838,11 @@ ERROR_CLASSES_JSON = """
"There should not be an existing Spark Session or Spark Context."
]
},
+ "SESSION_OR_CONTEXT_NOT_EXISTS" : {
+ "message" : [
+ "SparkContext or SparkSession should be created first.."
+ ]
+ },
"SHOULD_NOT_DATAFRAME": {
"message": [
"Argument `<arg_name>` should not be a DataFrame."
@@ -803,6 +873,11 @@ ERROR_CLASSES_JSON = """
"Cannot serialize the function `<name>`. If you accessed the Spark
session, or a DataFrame defined outside of the function, or any object that
contains a Spark session, please be aware that they are not allowed in Spark
Connect. For `foreachBatch`, please access the Spark session using
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch`
function. For `StreamingQueryListener`, please access the Spark session using
`self.spark`. For details please check out th [...]
]
},
+ "TEST_CLASS_NOT_COMPILED" : {
+ "message" : [
+ "<test_class_path> doesn't exist. Spark sql test classes are not
compiled."
+ ]
+ },
"TOO_MANY_VALUES" : {
"message" : [
"Expected <expected> values for `<item>`, got <actual>."
@@ -928,6 +1003,11 @@ ERROR_CLASSES_JSON = """
"Unsupported Literal '<literal>'."
]
},
+ "UNSUPPORTED_LOCAL_CONNECTION_STRING" : {
+ "message" : [
+ "Creating new SparkSessions with `local` connection string is not
supported."
+ ]
+ },
"UNSUPPORTED_NUMPY_ARRAY_SCALAR" : {
"message" : [
"The type of array scalar '<dtype>' is not supported."
diff --git a/python/pyspark/sql/connect/client/reattach.py
b/python/pyspark/sql/connect/client/reattach.py
index 9fa0f2541337..4468582ca80e 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -32,6 +32,7 @@ from grpc_status import rpc_status
import pyspark.sql.connect.proto as pb2
import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib
+from pyspark.errors import PySparkRuntimeError
class ExecutePlanResponseReattachableIterator(Generator):
@@ -255,10 +256,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
status = rpc_status.from_call(cast(grpc.Call, e))
if status is not None and "INVALID_HANDLE.OPERATION_NOT_FOUND" in
status.message:
if self._last_returned_response_id is not None:
- raise RuntimeError(
- "OPERATION_NOT_FOUND on the server but "
- "responses were already received from it.",
- e,
+ raise PySparkRuntimeError(
+ error_class="RESPONSE_ALREADY_RECEIVED",
+ message_parameters={},
)
# Try a new ExecutePlan, and throw upstream for retry.
self._iterator = iter(
diff --git a/python/pyspark/sql/connect/client/retries.py
b/python/pyspark/sql/connect/client/retries.py
index 26aa6893dfae..88fc3fe1ffd6 100644
--- a/python/pyspark/sql/connect/client/retries.py
+++ b/python/pyspark/sql/connect/client/retries.py
@@ -22,6 +22,7 @@ import typing
from typing import Optional, Callable, Generator, List, Type
from types import TracebackType
from pyspark.sql.connect.client.logging import logger
+from pyspark.errors import PySparkRuntimeError
"""
This module contains retry system. The system is designed to be
@@ -201,7 +202,10 @@ class Retrying:
def _last_exception(self) -> BaseException:
if self._exception is None:
- raise RuntimeError("No active exception")
+ raise PySparkRuntimeError(
+ error_class="NO_ACTIVE_EXCEPTION",
+ message_parameters={},
+ )
return self._exception
def _wait(self) -> None:
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 6c5bd826a023..8b2b583ddaab 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -74,9 +74,12 @@ class ArrowCollectSerializer(Serializer):
num = read_int(stream)
if num == -1:
error_msg = UTF8Deserializer().loads(stream)
- raise RuntimeError(
- "An error occurred while calling "
- "ArrowCollectSerializer.load_stream: {}".format(error_msg)
+ raise PySparkRuntimeError(
+ error_class="ERROR_OCCURRED_WHILE_CALLING",
+ message_parameters={
+ "func_name": "ArrowCollectSerializer.load_stream",
+ "error_msg": error_msg,
+ },
)
batch_order = []
for i in range(num):
diff --git a/python/pyspark/sql/pandas/utils.py
b/python/pyspark/sql/pandas/utils.py
index b62be2081028..db60f77c391c 100644
--- a/python/pyspark/sql/pandas/utils.py
+++ b/python/pyspark/sql/pandas/utils.py
@@ -16,6 +16,7 @@
#
from pyspark.loose_version import LooseVersion
+from pyspark.errors import PySparkRuntimeError
def require_minimum_pandas_version() -> None:
@@ -66,7 +67,7 @@ def require_minimum_pyarrow_version() -> None:
"your version was %s." % (minimum_pyarrow_version,
pyarrow.__version__)
)
if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1":
- raise RuntimeError(
- "Arrow legacy IPC format is not supported in PySpark, "
- "please unset ARROW_PRE_0_15_IPC_FORMAT"
+ raise PySparkRuntimeError(
+ error_class="ARROW_LEGACY_IPC_FORMAT",
+ message_parameters={},
)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index b4fad7ad29da..7f4589557cd2 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -309,18 +309,20 @@ class SparkSession(SparkConversionMixin):
if "spark.master" in self._options and (
"spark.remote" in self._options or "SPARK_REMOTE" in os.environ
):
- raise RuntimeError(
- "Spark master cannot be configured with Spark Connect
server; "
- "however, found URL for Spark Connect [%s]"
- % self._options.get("spark.remote",
os.environ.get("SPARK_REMOTE"))
+ raise PySparkRuntimeError(
+ error_class="CANNOT_CONFIGURE_SPARK_MASTER",
+ message_parameters={
+ "url": self._options.get("spark.remote",
os.environ.get("SPARK_REMOTE"))
+ },
)
if "spark.remote" in self._options and (
"spark.master" in self._options or "MASTER" in os.environ
):
- raise RuntimeError(
- "Spark Connect server cannot be configured with Spark
master; "
- "however, found URL for Spark master [%s]"
- % self._options.get("spark.master",
os.environ.get("MASTER"))
+ raise PySparkRuntimeError(
+ error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
+ message_parameters={
+ "url": self._options.get("spark.master",
os.environ.get("MASTER"))
+ },
)
if "spark.remote" in self._options:
@@ -328,10 +330,12 @@ class SparkSession(SparkConversionMixin):
if ("SPARK_REMOTE" in os.environ and
os.environ["SPARK_REMOTE"] != remote) and (
"SPARK_LOCAL_REMOTE" in os.environ and not
remote.startswith("local")
):
- raise RuntimeError(
- "Only one Spark Connect client URL can be set;
however, got a "
- "different URL [%s] from the existing [%s]"
- % (os.environ["SPARK_REMOTE"], remote)
+ raise PySparkRuntimeError(
+ error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
+ message_parameters={
+ "new_url": os.environ["SPARK_REMOTE"],
+ "existing_url": remote,
+ },
)
def master(self, master: str) -> "SparkSession.Builder":
@@ -482,11 +486,9 @@ class SparkSession(SparkConversionMixin):
url = opts.get("spark.remote",
os.environ.get("SPARK_REMOTE"))
if url is None:
- raise RuntimeError(
- "Cannot create a Spark Connect session
because the "
- "Spark Connect remote URL has not been
set. Please define "
- "the remote URL by setting either the
'spark.remote' option "
- "or the 'SPARK_REMOTE' environment
variable."
+ raise PySparkRuntimeError(
+ error_class="CONNECT_URL_NOT_SET",
+ message_parameters={},
)
if url.startswith("local"):
@@ -503,9 +505,9 @@ class SparkSession(SparkConversionMixin):
opts["spark.remote"] = url
return
RemoteSparkSession.builder.config(map=opts).getOrCreate()
else:
- raise RuntimeError(
- "Cannot start a remote Spark session because
there "
- "is a regular Spark session already running."
+ raise PySparkRuntimeError(
+ error_class="SESSION_ALREADY_EXIST",
+ message_parameters={},
)
session = SparkSession._instantiatedSession
@@ -548,9 +550,9 @@ class SparkSession(SparkConversionMixin):
url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
if url.startswith("local"):
- raise RuntimeError(
- "Creating new SparkSessions with `local` "
- "connection string is not supported."
+ raise PySparkRuntimeError(
+ error_class="UNSUPPORTED_LOCAL_CONNECTION_STRING",
+ message_parameters={},
)
# Mark this Spark Session as Spark Connect. This prevents that
local PySpark is
@@ -559,9 +561,9 @@ class SparkSession(SparkConversionMixin):
opts["spark.remote"] = url
return RemoteSparkSession.builder.config(map=opts).create()
else:
- raise RuntimeError(
- "SparkSession.builder.create() can only be used with Spark
Connect; "
- "however, spark.remote was not found."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature":
"SparkSession.builder.create"},
)
# TODO(SPARK-38912): Replace @classproperty with @classmethod + @property
once support for
@@ -1916,9 +1918,9 @@ class SparkSession(SparkConversionMixin):
This is an API dedicated to Spark Connect client only. With regular
Spark Session, it throws
an exception.
"""
- raise RuntimeError(
- "SparkSession.client is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.client"},
)
def addArtifacts(
@@ -1949,9 +1951,9 @@ class SparkSession(SparkConversionMixin):
This is an API dedicated to Spark Connect client only. With regular
Spark Session, it throws
an exception.
"""
- raise RuntimeError(
- "SparkSession.addArtifact(s) is only supported with Spark Connect;
"
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.addArtifact(s)"},
)
addArtifact = addArtifacts
@@ -1979,9 +1981,9 @@ class SparkSession(SparkConversionMixin):
Also, this is an API dedicated to Spark Connect client only. With
regular
Spark Session, it throws an exception.
"""
- raise RuntimeError(
- "SparkSession.copyFromLocalToFs is only supported with Spark
Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.copyFromLocalToFs"},
)
def interruptAll(self) -> List[str]:
@@ -1999,9 +2001,9 @@ class SparkSession(SparkConversionMixin):
-----
There is still a possibility of operation finishing just as it is
interrupted.
"""
- raise RuntimeError(
- "SparkSession.interruptAll is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.interruptAll"},
)
def interruptTag(self, tag: str) -> List[str]:
@@ -2019,9 +2021,9 @@ class SparkSession(SparkConversionMixin):
-----
There is still a possibility of operation finishing just as it is
interrupted.
"""
- raise RuntimeError(
- "SparkSession.interruptTag is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.interruptTag"},
)
def interruptOperation(self, op_id: str) -> List[str]:
@@ -2039,9 +2041,9 @@ class SparkSession(SparkConversionMixin):
-----
There is still a possibility of operation finishing just as it is
interrupted.
"""
- raise RuntimeError(
- "SparkSession.interruptOperation is only supported with Spark
Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.interruptOperation"},
)
def addTag(self, tag: str) -> None:
@@ -2063,9 +2065,9 @@ class SparkSession(SparkConversionMixin):
tag : str
The tag to be added. Cannot contain ',' (comma) character or be an
empty string.
"""
- raise RuntimeError(
- "SparkSession.addTag is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.addTag"},
)
def removeTag(self, tag: str) -> None:
@@ -2080,9 +2082,9 @@ class SparkSession(SparkConversionMixin):
tag : list of str
The tag to be removed. Cannot contain ',' (comma) character or be
an empty string.
"""
- raise RuntimeError(
- "SparkSession.removeTag is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.removeTag"},
)
def getTags(self) -> Set[str]:
@@ -2097,9 +2099,9 @@ class SparkSession(SparkConversionMixin):
set of str
Set of tags of interrupted operations.
"""
- raise RuntimeError(
- "SparkSession.getTags is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.getTags"},
)
def clearTags(self) -> None:
@@ -2108,9 +2110,9 @@ class SparkSession(SparkConversionMixin):
.. versionadded:: 3.5.0
"""
- raise RuntimeError(
- "SparkSession.clearTags is only supported with Spark Connect; "
- "however, the current Spark session does not use Spark Connect."
+ raise PySparkRuntimeError(
+ error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+ message_parameters={"feature": "SparkSession.clearTags"},
)
diff --git a/python/pyspark/sql/streaming/state.py
b/python/pyspark/sql/streaming/state.py
index 8bf01b3ebd98..09592558f853 100644
--- a/python/pyspark/sql/streaming/state.py
+++ b/python/pyspark/sql/streaming/state.py
@@ -20,7 +20,7 @@ from typing import Tuple, Optional
from pyspark.sql.types import DateType, Row, StructType
from pyspark.sql.utils import has_numpy
-from pyspark.errors import PySparkTypeError, PySparkValueError
+from pyspark.errors import PySparkTypeError, PySparkValueError,
PySparkRuntimeError
__all__ = ["GroupState", "GroupStateTimeout"]
@@ -185,9 +185,12 @@ class GroupState:
)
if self._timeout_conf != GroupStateTimeout.ProcessingTimeTimeout:
- raise RuntimeError(
- "Cannot set timeout duration without enabling processing time
timeout in "
- "applyInPandasWithState"
+ raise PySparkRuntimeError(
+ error_class="CANNOT_WITHOUT",
+ message_parameters={
+ "condition1": "set timeout duration",
+ "condition2": "enabling processing time timeout in
applyInPandasWithState",
+ },
)
if durationMs <= 0:
@@ -208,9 +211,12 @@ class GroupState:
Event time timeout must be enabled.
"""
if self._timeout_conf != GroupStateTimeout.EventTimeTimeout:
- raise RuntimeError(
- "Cannot set timeout duration without enabling processing time
timeout in "
- "applyInPandasWithState"
+ raise PySparkRuntimeError(
+ error_class="CANNOT_WITHOUT",
+ message_parameters={
+ "condition1": "set timeout duration",
+ "condition2": "enabling processing time timeout in
applyInPandasWithState",
+ },
)
if isinstance(timestampMs, datetime.datetime):
@@ -245,9 +251,12 @@ class GroupState:
In a streaming query, this can be called only when watermark is set.
"""
if not self._watermark_present:
- raise RuntimeError(
- "Cannot get event time watermark timestamp without setting
watermark before "
- "applyInPandasWithState"
+ raise PySparkRuntimeError(
+ error_class="CANNOT_WITHOUT",
+ message_parameters={
+ "condition1": "get event time watermark timestamp",
+ "condition2": "setting watermark before
applyInPandasWithState",
+ },
)
return self._event_time_watermark_ms
diff --git a/python/pyspark/sql/tests/test_types.py
b/python/pyspark/sql/tests/test_types.py
index a07309d1dff9..06064e58c794 100644
--- a/python/pyspark/sql/tests/test_types.py
+++ b/python/pyspark/sql/tests/test_types.py
@@ -26,7 +26,12 @@ import unittest
from pyspark.sql import Row
from pyspark.sql import functions as F
-from pyspark.errors import AnalysisException, PySparkTypeError,
PySparkValueError
+from pyspark.errors import (
+ AnalysisException,
+ PySparkTypeError,
+ PySparkValueError,
+ PySparkRuntimeError,
+)
from pyspark.sql.types import (
DataType,
ByteType,
@@ -1200,15 +1205,33 @@ class TypesTestsMixin:
"interval hour to second",
)
- with self.assertRaisesRegex(RuntimeError, "interval None to 3 is
invalid"):
+ with self.assertRaises(PySparkRuntimeError) as pe:
DayTimeIntervalType(endField=DayTimeIntervalType.SECOND)
- with self.assertRaisesRegex(RuntimeError, "interval 123 to 123 is
invalid"):
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "None", "end_field": "3"},
+ )
+
+ with self.assertRaises(PySparkRuntimeError) as pe:
DayTimeIntervalType(123)
- with self.assertRaisesRegex(RuntimeError, "interval 0 to 321 is
invalid"):
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "123", "end_field": "123"},
+ )
+
+ with self.assertRaises(PySparkRuntimeError) as pe:
DayTimeIntervalType(DayTimeIntervalType.DAY, 321)
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "0", "end_field": "321"},
+ )
+
def test_daytime_interval_type(self):
# SPARK-37277: Support DayTimeIntervalType in createDataFrame
timedetlas = [
@@ -1269,15 +1292,33 @@ class TypesTestsMixin:
"interval year to month",
)
- with self.assertRaisesRegex(RuntimeError, "interval None to 3 is
invalid"):
+ with self.assertRaises(PySparkRuntimeError) as pe:
YearMonthIntervalType(endField=3)
- with self.assertRaisesRegex(RuntimeError, "interval 123 to 123 is
invalid"):
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "None", "end_field": "3"},
+ )
+
+ with self.assertRaises(PySparkRuntimeError) as pe:
YearMonthIntervalType(123)
- with self.assertRaisesRegex(RuntimeError, "interval 0 to 321 is
invalid"):
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "123", "end_field": "123"},
+ )
+
+ with self.assertRaises(PySparkRuntimeError) as pe:
YearMonthIntervalType(YearMonthIntervalType.YEAR, 321)
+ self.check_error(
+ exception=pe.exception,
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": "0", "end_field": "321"},
+ )
+
def test_yearmonth_interval_type(self):
schema1 = self.spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS
interval").schema
self.assertEqual(schema1.fields[0].dataType, YearMonthIntervalType(0,
1))
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index d3eed77b3838..fa192296348f 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -55,6 +55,7 @@ from pyspark.errors import (
PySparkTypeError,
PySparkValueError,
PySparkIndexError,
+ PySparkRuntimeError,
PySparkAttributeError,
PySparkKeyError,
)
@@ -476,7 +477,10 @@ class DayTimeIntervalType(AnsiIntervalType):
fields = DayTimeIntervalType._fields
if startField not in fields.keys() or endField not in fields.keys():
- raise RuntimeError("interval %s to %s is invalid" % (startField,
endField))
+ raise PySparkRuntimeError(
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": str(startField),
"end_field": str(endField)},
+ )
self.startField = cast(int, startField)
self.endField = cast(int, endField)
@@ -531,7 +535,10 @@ class YearMonthIntervalType(AnsiIntervalType):
fields = YearMonthIntervalType._fields
if startField not in fields.keys() or endField not in fields.keys():
- raise RuntimeError("interval %s to %s is invalid" % (startField,
endField))
+ raise PySparkRuntimeError(
+ error_class="INVALID_INTERVAL_CASTING",
+ message_parameters={"start_field": str(startField),
"end_field": str(endField)},
+ )
self.startField = cast(int, startField)
self.endField = cast(int, endField)
@@ -2594,7 +2601,10 @@ class Row(tuple):
def __setattr__(self, key: Any, value: Any) -> None:
if key != "__fields__":
- raise RuntimeError("Row is read-only")
+ raise PySparkRuntimeError(
+ error_class="READ_ONLY",
+ message_parameters={"object": "Row"},
+ )
self.__dict__[key] = value
def __reduce__(
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 351bcea3f389..b9a00d432671 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -40,7 +40,7 @@ from pyspark.sql.types import (
from pyspark.sql.utils import get_active_spark_context
from pyspark.sql.pandas.types import to_arrow_type
from pyspark.sql.pandas.utils import require_minimum_pandas_version,
require_minimum_pyarrow_version
-from pyspark.errors import PySparkTypeError, PySparkNotImplementedError
+from pyspark.errors import PySparkTypeError, PySparkNotImplementedError,
PySparkRuntimeError
if TYPE_CHECKING:
from pyspark.sql._typing import DataTypeOrString, ColumnOrName,
UserDefinedFunctionLike
@@ -416,9 +416,12 @@ class UserDefinedFunction:
if profiler_enabled and memory_profiler_enabled:
# When both profilers are enabled, they interfere with each
other,
# that makes the result profile misleading.
- raise RuntimeError(
- "'spark.python.profile' and 'spark.python.profile.memory'
configuration"
- " cannot be enabled together."
+ raise PySparkRuntimeError(
+ error_class="CANNOT_SET_TOGETHER",
+ message_parameters={
+ "arg_list": "'spark.python.profile' and "
+ "'spark.python.profile.memory' configuration"
+ },
)
elif profiler_enabled:
f = self.func
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 9f817341724a..f517696c76c7 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -39,6 +39,7 @@ from pyspark.errors import ( # noqa: F401
UnknownException,
SparkUpgradeException,
PySparkNotImplementedError,
+ PySparkRuntimeError,
)
from pyspark.errors.exceptions.captured import CapturedException # noqa: F401
from pyspark.find_spark_home import _find_spark_home
@@ -90,8 +91,9 @@ def require_test_compiled() -> None:
paths = glob.glob(test_class_path)
if len(paths) == 0:
- raise RuntimeError(
- "%s doesn't exist. Spark sql test classes are not compiled." %
test_class_path
+ raise PySparkRuntimeError(
+ error_class="TEST_CLASS_NOT_COMPILED",
+ message_parameters={"test_class_path": test_class_path},
)
@@ -274,7 +276,10 @@ def get_active_spark_context() -> SparkContext:
otherwise, returns the active SparkContext."""
sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
- raise RuntimeError("SparkContext or SparkSession should be created
first.")
+ raise PySparkRuntimeError(
+ error_class="SESSION_OR_CONTEXT_NOT_EXISTS",
+ message_parameters={},
+ )
return sc
diff --git a/python/pyspark/tests/test_profiler.py
b/python/pyspark/tests/test_profiler.py
index 018387012fc9..b7797ead2adb 100644
--- a/python/pyspark/tests/test_profiler.py
+++ b/python/pyspark/tests/test_profiler.py
@@ -123,11 +123,16 @@ class ProfilerTests2(unittest.TestCase,
PySparkErrorTestUtils):
return v + 1
try:
- self.assertRaisesRegex(
- RuntimeError,
- "'spark.python.profile' and 'spark.python.profile.memory'
configuration"
- " cannot be enabled together",
- lambda: spark.range(10).select(plus_one("id")).collect(),
+ with self.assertRaises(PySparkRuntimeError) as pe:
+ spark.range(10).select(plus_one("id")).collect()
+
+ self.check_error(
+ exception=pe.exception,
+ error_class="CANNOT_SET_TOGETHER",
+ message_parameters={
+ "arg_list": "'spark.python.profile' and "
+ "'spark.python.profile.memory' configuration"
+ },
)
finally:
sc.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]