This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fbb4bcad088a [SPARK-46226][PYTHON] Migrate all remaining 
`RuntimeError` into PySpark error framework
fbb4bcad088a is described below

commit fbb4bcad088a012a8f05d664aa6e473bb2419d4f
Author: Haejoon Lee <[email protected]>
AuthorDate: Wed Dec 6 10:50:06 2023 +0900

    [SPARK-46226][PYTHON] Migrate all remaining `RuntimeError` into PySpark 
error framework
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate all remaining `RuntimeError` from 
`pyspark/sql/*` into PySpark error framework, `PySparkRuntimeError` with 
assigning dedicated error classes. Total 30 `RuntimeError`s are managed.
    
    ### Why are the changes needed?
    
    To improve the error handling in PySpark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, but the user-facing error messages will be improved.
    
    ### How was this patch tested?
    
    The existing CI should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44143 from itholic/session_custom_error.
    
    Authored-by: Haejoon Lee <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/errors/error_classes.py        |  80 ++++++++++++++++++
 python/pyspark/sql/connect/client/reattach.py |   8 +-
 python/pyspark/sql/connect/client/retries.py  |   6 +-
 python/pyspark/sql/pandas/serializers.py      |   9 +-
 python/pyspark/sql/pandas/utils.py            |   7 +-
 python/pyspark/sql/session.py                 | 114 +++++++++++++-------------
 python/pyspark/sql/streaming/state.py         |  29 ++++---
 python/pyspark/sql/tests/test_types.py        |  55 +++++++++++--
 python/pyspark/sql/types.py                   |  16 +++-
 python/pyspark/sql/udf.py                     |  11 ++-
 python/pyspark/sql/utils.py                   |  11 ++-
 python/pyspark/tests/test_profiler.py         |  15 ++--
 12 files changed, 262 insertions(+), 99 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index e0ca8a938ec2..7dd5cd927052 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -29,6 +29,11 @@ ERROR_CLASSES_JSON = """
       "Argument `<arg_name>` is required when <condition>."
     ]
   },
+  "ARROW_LEGACY_IPC_FORMAT": {
+    "message": [
+      "Arrow legacy IPC format is not supported in PySpark, please unset 
ARROW_PRE_0_15_IPC_FORMAT."
+    ]
+  },
   "ATTRIBUTE_NOT_CALLABLE" : {
     "message" : [
       "Attribute `<attr_name>` in provided object `<obj_name>` is not 
callable."
@@ -79,6 +84,16 @@ ERROR_CLASSES_JSON = """
       "Argument `<arg_name>` cannot be None."
     ]
   },
+  "CANNOT_CONFIGURE_SPARK_CONNECT": {
+    "message": [
+      "Spark Connect server cannot be configured with Spark master; however, 
found URL for Spark master [<url>]."
+    ]
+  },
+  "CANNOT_CONFIGURE_SPARK_MASTER": {
+    "message": [
+      "Spark master cannot be configured with Spark Connect server; however, 
found URL for Spark Connect [<url>]."
+    ]
+  },
   "CANNOT_CONVERT_COLUMN_INTO_BOOL": {
     "message": [
       "Cannot convert column into bool: please use '&' for 'and', '|' for 
'or', '~' for 'not' when building DataFrame boolean expressions."
@@ -149,11 +164,26 @@ ERROR_CLASSES_JSON = """
       "returnType can not be specified when `<arg_name>` is a user-defined 
function, but got <return_type>."
     ]
   },
+  "CANNOT_WITHOUT": {
+    "message": [
+      "Cannot <condition1> without <condition2>."
+    ]
+  },
   "COLUMN_IN_LIST": {
     "message": [
       "`<func_name>` does not allow a Column in a list."
     ]
   },
+  "CONNECT_URL_ALREADY_DEFINED" : {
+    "message" : [
+      "Only one Spark Connect client URL can be set; however, got a different 
URL [<new_url>] from the existing [<existing_url>]."
+    ]
+  },
+  "CONNECT_URL_NOT_SET" : {
+    "message" : [
+      "Cannot create a Spark Connect session because the Spark Connect remote 
URL has not been set. Please define the remote URL by setting either the 
'spark.remote' option or the 'SPARK_REMOTE' environment variable."
+    ]
+  },
   "CONTEXT_ONLY_VALID_ON_DRIVER" : {
     "message" : [
       "It appears that you are attempting to reference SparkContext from a 
broadcast variable, action, or transformation. SparkContext can only be used on 
the driver, not in code that it run on workers. For more information, see 
SPARK-5063."
@@ -231,6 +261,11 @@ ERROR_CLASSES_JSON = """
       "Duplicated field names in Arrow Struct are not allowed, got 
<field_names>"
     ]
   },
+  "ERROR_OCCURRED_WHILE_CALLING" : {
+    "message" : [
+      "An error occurred while calling <func_name>: <error_msg>."
+    ]
+  },
   "HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN" : {
     "message" : [
       "Function `<func_name>` should return Column, got <return_type>."
@@ -272,6 +307,11 @@ ERROR_CLASSES_JSON = """
       "Invalid URL for Spark Connect: <detail>"
     ]
   },
+  "INVALID_INTERVAL_CASTING": {
+    "message": [
+      "Interval <start_field> to <end_field> is invalid."
+    ]
+  },
   "INVALID_ITEM_FOR_CONTAINER": {
     "message": [
       "All items in `<arg_name>` should be in <allowed_types>, got 
<item_type>."
@@ -657,6 +697,11 @@ ERROR_CLASSES_JSON = """
       "Argument `<arg_name>` should be a WindowSpec, got <arg_type>."
     ]
   },
+  "NO_ACTIVE_EXCEPTION" : {
+    "message" : [
+      "No active exception."
+    ]
+  },
   "NO_ACTIVE_OR_DEFAULT_SESSION" : {
     "message" : [
       "No active or default Spark session found. Please create a new Spark 
session before running the code."
@@ -682,6 +727,16 @@ ERROR_CLASSES_JSON = """
       "Only a single trigger is allowed."
     ]
   },
+  "ONLY_SUPPORTED_WITH_SPARK_CONNECT" : {
+    "message" : [
+      "<feature> is only supported with Spark Connect; however, the current 
Spark session does not use Spark Connect."
+    ]
+  },
+  "PACKAGE_NOT_INSTALLED" : {
+    "message" : [
+      "<package_name> >= <minimum_version> must be installed; however, it was 
not found."
+    ]
+  },
   "PIPE_FUNCTION_EXITED" : {
     "message" : [
       "Pipe function `<func_name>` exited with error code <error_code>."
@@ -723,6 +778,16 @@ ERROR_CLASSES_JSON = """
       "transformation. For more information, see SPARK-5063."
     ]
   },
+  "READ_ONLY" : {
+    "message" : [
+      "<object> is read-only."
+    ]
+  },
+  "RESPONSE_ALREADY_RECEIVED" : {
+    "message" : [
+      "OPERATION_NOT_FOUND on the server but responses were already received 
from it."
+    ]
+  },
   "RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF" : {
     "message" : [
       "Column names of the returned pyarrow.Table do not match specified 
schema.<missing><extra>"
@@ -773,6 +838,11 @@ ERROR_CLASSES_JSON = """
       "There should not be an existing Spark Session or Spark Context."
     ]
   },
+  "SESSION_OR_CONTEXT_NOT_EXISTS" : {
+    "message" : [
+      "SparkContext or SparkSession should be created first.."
+    ]
+  },
   "SHOULD_NOT_DATAFRAME": {
     "message": [
       "Argument `<arg_name>` should not be a DataFrame."
@@ -803,6 +873,11 @@ ERROR_CLASSES_JSON = """
       "Cannot serialize the function `<name>`. If you accessed the Spark 
session, or a DataFrame defined outside of the function, or any object that 
contains a Spark session, please be aware that they are not allowed in Spark 
Connect. For `foreachBatch`, please access the Spark session using 
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch` 
function. For `StreamingQueryListener`, please access the Spark session using 
`self.spark`. For details please check out th [...]
     ]
   },
+  "TEST_CLASS_NOT_COMPILED" : {
+    "message" : [
+      "<test_class_path> doesn't exist. Spark sql test classes are not 
compiled."
+    ]
+  },
   "TOO_MANY_VALUES" : {
     "message" : [
       "Expected <expected> values for `<item>`, got <actual>."
@@ -928,6 +1003,11 @@ ERROR_CLASSES_JSON = """
       "Unsupported Literal '<literal>'."
     ]
   },
+  "UNSUPPORTED_LOCAL_CONNECTION_STRING" : {
+    "message" : [
+      "Creating new SparkSessions with `local` connection string is not 
supported."
+    ]
+  },
   "UNSUPPORTED_NUMPY_ARRAY_SCALAR" : {
     "message" : [
       "The type of array scalar '<dtype>' is not supported."
diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 9fa0f2541337..4468582ca80e 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -32,6 +32,7 @@ from grpc_status import rpc_status
 
 import pyspark.sql.connect.proto as pb2
 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib
+from pyspark.errors import PySparkRuntimeError
 
 
 class ExecutePlanResponseReattachableIterator(Generator):
@@ -255,10 +256,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
             status = rpc_status.from_call(cast(grpc.Call, e))
             if status is not None and "INVALID_HANDLE.OPERATION_NOT_FOUND" in 
status.message:
                 if self._last_returned_response_id is not None:
-                    raise RuntimeError(
-                        "OPERATION_NOT_FOUND on the server but "
-                        "responses were already received from it.",
-                        e,
+                    raise PySparkRuntimeError(
+                        error_class="RESPONSE_ALREADY_RECEIVED",
+                        message_parameters={},
                     )
                 # Try a new ExecutePlan, and throw upstream for retry.
                 self._iterator = iter(
diff --git a/python/pyspark/sql/connect/client/retries.py 
b/python/pyspark/sql/connect/client/retries.py
index 26aa6893dfae..88fc3fe1ffd6 100644
--- a/python/pyspark/sql/connect/client/retries.py
+++ b/python/pyspark/sql/connect/client/retries.py
@@ -22,6 +22,7 @@ import typing
 from typing import Optional, Callable, Generator, List, Type
 from types import TracebackType
 from pyspark.sql.connect.client.logging import logger
+from pyspark.errors import PySparkRuntimeError
 
 """
 This module contains retry system. The system is designed to be
@@ -201,7 +202,10 @@ class Retrying:
 
     def _last_exception(self) -> BaseException:
         if self._exception is None:
-            raise RuntimeError("No active exception")
+            raise PySparkRuntimeError(
+                error_class="NO_ACTIVE_EXCEPTION",
+                message_parameters={},
+            )
         return self._exception
 
     def _wait(self) -> None:
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 6c5bd826a023..8b2b583ddaab 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -74,9 +74,12 @@ class ArrowCollectSerializer(Serializer):
         num = read_int(stream)
         if num == -1:
             error_msg = UTF8Deserializer().loads(stream)
-            raise RuntimeError(
-                "An error occurred while calling "
-                "ArrowCollectSerializer.load_stream: {}".format(error_msg)
+            raise PySparkRuntimeError(
+                error_class="ERROR_OCCURRED_WHILE_CALLING",
+                message_parameters={
+                    "func_name": "ArrowCollectSerializer.load_stream",
+                    "error_msg": error_msg,
+                },
             )
         batch_order = []
         for i in range(num):
diff --git a/python/pyspark/sql/pandas/utils.py 
b/python/pyspark/sql/pandas/utils.py
index b62be2081028..db60f77c391c 100644
--- a/python/pyspark/sql/pandas/utils.py
+++ b/python/pyspark/sql/pandas/utils.py
@@ -16,6 +16,7 @@
 #
 
 from pyspark.loose_version import LooseVersion
+from pyspark.errors import PySparkRuntimeError
 
 
 def require_minimum_pandas_version() -> None:
@@ -66,7 +67,7 @@ def require_minimum_pyarrow_version() -> None:
             "your version was %s." % (minimum_pyarrow_version, 
pyarrow.__version__)
         )
     if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1":
-        raise RuntimeError(
-            "Arrow legacy IPC format is not supported in PySpark, "
-            "please unset ARROW_PRE_0_15_IPC_FORMAT"
+        raise PySparkRuntimeError(
+            error_class="ARROW_LEGACY_IPC_FORMAT",
+            message_parameters={},
         )
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index b4fad7ad29da..7f4589557cd2 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -309,18 +309,20 @@ class SparkSession(SparkConversionMixin):
             if "spark.master" in self._options and (
                 "spark.remote" in self._options or "SPARK_REMOTE" in os.environ
             ):
-                raise RuntimeError(
-                    "Spark master cannot be configured with Spark Connect 
server; "
-                    "however, found URL for Spark Connect [%s]"
-                    % self._options.get("spark.remote", 
os.environ.get("SPARK_REMOTE"))
+                raise PySparkRuntimeError(
+                    error_class="CANNOT_CONFIGURE_SPARK_MASTER",
+                    message_parameters={
+                        "url": self._options.get("spark.remote", 
os.environ.get("SPARK_REMOTE"))
+                    },
                 )
             if "spark.remote" in self._options and (
                 "spark.master" in self._options or "MASTER" in os.environ
             ):
-                raise RuntimeError(
-                    "Spark Connect server cannot be configured with Spark 
master; "
-                    "however, found URL for Spark master [%s]"
-                    % self._options.get("spark.master", 
os.environ.get("MASTER"))
+                raise PySparkRuntimeError(
+                    error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
+                    message_parameters={
+                        "url": self._options.get("spark.master", 
os.environ.get("MASTER"))
+                    },
                 )
 
             if "spark.remote" in self._options:
@@ -328,10 +330,12 @@ class SparkSession(SparkConversionMixin):
                 if ("SPARK_REMOTE" in os.environ and 
os.environ["SPARK_REMOTE"] != remote) and (
                     "SPARK_LOCAL_REMOTE" in os.environ and not 
remote.startswith("local")
                 ):
-                    raise RuntimeError(
-                        "Only one Spark Connect client URL can be set; 
however, got a "
-                        "different URL [%s] from the existing [%s]"
-                        % (os.environ["SPARK_REMOTE"], remote)
+                    raise PySparkRuntimeError(
+                        error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
+                        message_parameters={
+                            "new_url": os.environ["SPARK_REMOTE"],
+                            "existing_url": remote,
+                        },
                     )
 
         def master(self, master: str) -> "SparkSession.Builder":
@@ -482,11 +486,9 @@ class SparkSession(SparkConversionMixin):
                             url = opts.get("spark.remote", 
os.environ.get("SPARK_REMOTE"))
 
                             if url is None:
-                                raise RuntimeError(
-                                    "Cannot create a Spark Connect session 
because the "
-                                    "Spark Connect remote URL has not been 
set. Please define "
-                                    "the remote URL by setting either the 
'spark.remote' option "
-                                    "or the 'SPARK_REMOTE' environment 
variable."
+                                raise PySparkRuntimeError(
+                                    error_class="CONNECT_URL_NOT_SET",
+                                    message_parameters={},
                                 )
 
                             if url.startswith("local"):
@@ -503,9 +505,9 @@ class SparkSession(SparkConversionMixin):
                             opts["spark.remote"] = url
                             return 
RemoteSparkSession.builder.config(map=opts).getOrCreate()
                         else:
-                            raise RuntimeError(
-                                "Cannot start a remote Spark session because 
there "
-                                "is a regular Spark session already running."
+                            raise PySparkRuntimeError(
+                                error_class="SESSION_ALREADY_EXIST",
+                                message_parameters={},
                             )
 
                 session = SparkSession._instantiatedSession
@@ -548,9 +550,9 @@ class SparkSession(SparkConversionMixin):
 
                 url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
                 if url.startswith("local"):
-                    raise RuntimeError(
-                        "Creating new SparkSessions with `local` "
-                        "connection string is not supported."
+                    raise PySparkRuntimeError(
+                        error_class="UNSUPPORTED_LOCAL_CONNECTION_STRING",
+                        message_parameters={},
                     )
 
                 # Mark this Spark Session as Spark Connect. This prevents that 
local PySpark is
@@ -559,9 +561,9 @@ class SparkSession(SparkConversionMixin):
                 opts["spark.remote"] = url
                 return RemoteSparkSession.builder.config(map=opts).create()
             else:
-                raise RuntimeError(
-                    "SparkSession.builder.create() can only be used with Spark 
Connect; "
-                    "however, spark.remote was not found."
+                raise PySparkRuntimeError(
+                    error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+                    message_parameters={"feature": 
"SparkSession.builder.create"},
                 )
 
     # TODO(SPARK-38912): Replace @classproperty with @classmethod + @property 
once support for
@@ -1916,9 +1918,9 @@ class SparkSession(SparkConversionMixin):
         This is an API dedicated to Spark Connect client only. With regular 
Spark Session, it throws
         an exception.
         """
-        raise RuntimeError(
-            "SparkSession.client is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.client"},
         )
 
     def addArtifacts(
@@ -1949,9 +1951,9 @@ class SparkSession(SparkConversionMixin):
         This is an API dedicated to Spark Connect client only. With regular 
Spark Session, it throws
         an exception.
         """
-        raise RuntimeError(
-            "SparkSession.addArtifact(s) is only supported with Spark Connect; 
"
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.addArtifact(s)"},
         )
 
     addArtifact = addArtifacts
@@ -1979,9 +1981,9 @@ class SparkSession(SparkConversionMixin):
         Also, this is an API dedicated to Spark Connect client only. With 
regular
         Spark Session, it throws an exception.
         """
-        raise RuntimeError(
-            "SparkSession.copyFromLocalToFs is only supported with Spark 
Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.copyFromLocalToFs"},
         )
 
     def interruptAll(self) -> List[str]:
@@ -1999,9 +2001,9 @@ class SparkSession(SparkConversionMixin):
         -----
         There is still a possibility of operation finishing just as it is 
interrupted.
         """
-        raise RuntimeError(
-            "SparkSession.interruptAll is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.interruptAll"},
         )
 
     def interruptTag(self, tag: str) -> List[str]:
@@ -2019,9 +2021,9 @@ class SparkSession(SparkConversionMixin):
         -----
         There is still a possibility of operation finishing just as it is 
interrupted.
         """
-        raise RuntimeError(
-            "SparkSession.interruptTag is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.interruptTag"},
         )
 
     def interruptOperation(self, op_id: str) -> List[str]:
@@ -2039,9 +2041,9 @@ class SparkSession(SparkConversionMixin):
         -----
         There is still a possibility of operation finishing just as it is 
interrupted.
         """
-        raise RuntimeError(
-            "SparkSession.interruptOperation is only supported with Spark 
Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.interruptOperation"},
         )
 
     def addTag(self, tag: str) -> None:
@@ -2063,9 +2065,9 @@ class SparkSession(SparkConversionMixin):
         tag : str
             The tag to be added. Cannot contain ',' (comma) character or be an 
empty string.
         """
-        raise RuntimeError(
-            "SparkSession.addTag is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.addTag"},
         )
 
     def removeTag(self, tag: str) -> None:
@@ -2080,9 +2082,9 @@ class SparkSession(SparkConversionMixin):
         tag : list of str
             The tag to be removed. Cannot contain ',' (comma) character or be 
an empty string.
         """
-        raise RuntimeError(
-            "SparkSession.removeTag is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.removeTag"},
         )
 
     def getTags(self) -> Set[str]:
@@ -2097,9 +2099,9 @@ class SparkSession(SparkConversionMixin):
         set of str
             Set of tags of interrupted operations.
         """
-        raise RuntimeError(
-            "SparkSession.getTags is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.getTags"},
         )
 
     def clearTags(self) -> None:
@@ -2108,9 +2110,9 @@ class SparkSession(SparkConversionMixin):
 
         .. versionadded:: 3.5.0
         """
-        raise RuntimeError(
-            "SparkSession.clearTags is only supported with Spark Connect; "
-            "however, the current Spark session does not use Spark Connect."
+        raise PySparkRuntimeError(
+            error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
+            message_parameters={"feature": "SparkSession.clearTags"},
         )
 
 
diff --git a/python/pyspark/sql/streaming/state.py 
b/python/pyspark/sql/streaming/state.py
index 8bf01b3ebd98..09592558f853 100644
--- a/python/pyspark/sql/streaming/state.py
+++ b/python/pyspark/sql/streaming/state.py
@@ -20,7 +20,7 @@ from typing import Tuple, Optional
 
 from pyspark.sql.types import DateType, Row, StructType
 from pyspark.sql.utils import has_numpy
-from pyspark.errors import PySparkTypeError, PySparkValueError
+from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
 
 __all__ = ["GroupState", "GroupStateTimeout"]
 
@@ -185,9 +185,12 @@ class GroupState:
             )
 
         if self._timeout_conf != GroupStateTimeout.ProcessingTimeTimeout:
-            raise RuntimeError(
-                "Cannot set timeout duration without enabling processing time 
timeout in "
-                "applyInPandasWithState"
+            raise PySparkRuntimeError(
+                error_class="CANNOT_WITHOUT",
+                message_parameters={
+                    "condition1": "set timeout duration",
+                    "condition2": "enabling processing time timeout in 
applyInPandasWithState",
+                },
             )
 
         if durationMs <= 0:
@@ -208,9 +211,12 @@ class GroupState:
         Event time timeout must be enabled.
         """
         if self._timeout_conf != GroupStateTimeout.EventTimeTimeout:
-            raise RuntimeError(
-                "Cannot set timeout duration without enabling processing time 
timeout in "
-                "applyInPandasWithState"
+            raise PySparkRuntimeError(
+                error_class="CANNOT_WITHOUT",
+                message_parameters={
+                    "condition1": "set timeout duration",
+                    "condition2": "enabling processing time timeout in 
applyInPandasWithState",
+                },
             )
 
         if isinstance(timestampMs, datetime.datetime):
@@ -245,9 +251,12 @@ class GroupState:
         In a streaming query, this can be called only when watermark is set.
         """
         if not self._watermark_present:
-            raise RuntimeError(
-                "Cannot get event time watermark timestamp without setting 
watermark before "
-                "applyInPandasWithState"
+            raise PySparkRuntimeError(
+                error_class="CANNOT_WITHOUT",
+                message_parameters={
+                    "condition1": "get event time watermark timestamp",
+                    "condition2": "setting watermark before 
applyInPandasWithState",
+                },
             )
         return self._event_time_watermark_ms
 
diff --git a/python/pyspark/sql/tests/test_types.py 
b/python/pyspark/sql/tests/test_types.py
index a07309d1dff9..06064e58c794 100644
--- a/python/pyspark/sql/tests/test_types.py
+++ b/python/pyspark/sql/tests/test_types.py
@@ -26,7 +26,12 @@ import unittest
 
 from pyspark.sql import Row
 from pyspark.sql import functions as F
-from pyspark.errors import AnalysisException, PySparkTypeError, 
PySparkValueError
+from pyspark.errors import (
+    AnalysisException,
+    PySparkTypeError,
+    PySparkValueError,
+    PySparkRuntimeError,
+)
 from pyspark.sql.types import (
     DataType,
     ByteType,
@@ -1200,15 +1205,33 @@ class TypesTestsMixin:
             "interval hour to second",
         )
 
-        with self.assertRaisesRegex(RuntimeError, "interval None to 3 is 
invalid"):
+        with self.assertRaises(PySparkRuntimeError) as pe:
             DayTimeIntervalType(endField=DayTimeIntervalType.SECOND)
 
-        with self.assertRaisesRegex(RuntimeError, "interval 123 to 123 is 
invalid"):
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "None", "end_field": "3"},
+        )
+
+        with self.assertRaises(PySparkRuntimeError) as pe:
             DayTimeIntervalType(123)
 
-        with self.assertRaisesRegex(RuntimeError, "interval 0 to 321 is 
invalid"):
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "123", "end_field": "123"},
+        )
+
+        with self.assertRaises(PySparkRuntimeError) as pe:
             DayTimeIntervalType(DayTimeIntervalType.DAY, 321)
 
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "0", "end_field": "321"},
+        )
+
     def test_daytime_interval_type(self):
         # SPARK-37277: Support DayTimeIntervalType in createDataFrame
         timedetlas = [
@@ -1269,15 +1292,33 @@ class TypesTestsMixin:
             "interval year to month",
         )
 
-        with self.assertRaisesRegex(RuntimeError, "interval None to 3 is 
invalid"):
+        with self.assertRaises(PySparkRuntimeError) as pe:
             YearMonthIntervalType(endField=3)
 
-        with self.assertRaisesRegex(RuntimeError, "interval 123 to 123 is 
invalid"):
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "None", "end_field": "3"},
+        )
+
+        with self.assertRaises(PySparkRuntimeError) as pe:
             YearMonthIntervalType(123)
 
-        with self.assertRaisesRegex(RuntimeError, "interval 0 to 321 is 
invalid"):
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "123", "end_field": "123"},
+        )
+
+        with self.assertRaises(PySparkRuntimeError) as pe:
             YearMonthIntervalType(YearMonthIntervalType.YEAR, 321)
 
+        self.check_error(
+            exception=pe.exception,
+            error_class="INVALID_INTERVAL_CASTING",
+            message_parameters={"start_field": "0", "end_field": "321"},
+        )
+
     def test_yearmonth_interval_type(self):
         schema1 = self.spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS 
interval").schema
         self.assertEqual(schema1.fields[0].dataType, YearMonthIntervalType(0, 
1))
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index d3eed77b3838..fa192296348f 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -55,6 +55,7 @@ from pyspark.errors import (
     PySparkTypeError,
     PySparkValueError,
     PySparkIndexError,
+    PySparkRuntimeError,
     PySparkAttributeError,
     PySparkKeyError,
 )
@@ -476,7 +477,10 @@ class DayTimeIntervalType(AnsiIntervalType):
 
         fields = DayTimeIntervalType._fields
         if startField not in fields.keys() or endField not in fields.keys():
-            raise RuntimeError("interval %s to %s is invalid" % (startField, 
endField))
+            raise PySparkRuntimeError(
+                error_class="INVALID_INTERVAL_CASTING",
+                message_parameters={"start_field": str(startField), 
"end_field": str(endField)},
+            )
         self.startField = cast(int, startField)
         self.endField = cast(int, endField)
 
@@ -531,7 +535,10 @@ class YearMonthIntervalType(AnsiIntervalType):
 
         fields = YearMonthIntervalType._fields
         if startField not in fields.keys() or endField not in fields.keys():
-            raise RuntimeError("interval %s to %s is invalid" % (startField, 
endField))
+            raise PySparkRuntimeError(
+                error_class="INVALID_INTERVAL_CASTING",
+                message_parameters={"start_field": str(startField), 
"end_field": str(endField)},
+            )
         self.startField = cast(int, startField)
         self.endField = cast(int, endField)
 
@@ -2594,7 +2601,10 @@ class Row(tuple):
 
     def __setattr__(self, key: Any, value: Any) -> None:
         if key != "__fields__":
-            raise RuntimeError("Row is read-only")
+            raise PySparkRuntimeError(
+                error_class="READ_ONLY",
+                message_parameters={"object": "Row"},
+            )
         self.__dict__[key] = value
 
     def __reduce__(
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 351bcea3f389..b9a00d432671 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -40,7 +40,7 @@ from pyspark.sql.types import (
 from pyspark.sql.utils import get_active_spark_context
 from pyspark.sql.pandas.types import to_arrow_type
 from pyspark.sql.pandas.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
-from pyspark.errors import PySparkTypeError, PySparkNotImplementedError
+from pyspark.errors import PySparkTypeError, PySparkNotImplementedError, 
PySparkRuntimeError
 
 if TYPE_CHECKING:
     from pyspark.sql._typing import DataTypeOrString, ColumnOrName, 
UserDefinedFunctionLike
@@ -416,9 +416,12 @@ class UserDefinedFunction:
             if profiler_enabled and memory_profiler_enabled:
                 # When both profilers are enabled, they interfere with each 
other,
                 # that makes the result profile misleading.
-                raise RuntimeError(
-                    "'spark.python.profile' and 'spark.python.profile.memory' 
configuration"
-                    " cannot be enabled together."
+                raise PySparkRuntimeError(
+                    error_class="CANNOT_SET_TOGETHER",
+                    message_parameters={
+                        "arg_list": "'spark.python.profile' and "
+                        "'spark.python.profile.memory' configuration"
+                    },
                 )
             elif profiler_enabled:
                 f = self.func
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 9f817341724a..f517696c76c7 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -39,6 +39,7 @@ from pyspark.errors import (  # noqa: F401
     UnknownException,
     SparkUpgradeException,
     PySparkNotImplementedError,
+    PySparkRuntimeError,
 )
 from pyspark.errors.exceptions.captured import CapturedException  # noqa: F401
 from pyspark.find_spark_home import _find_spark_home
@@ -90,8 +91,9 @@ def require_test_compiled() -> None:
     paths = glob.glob(test_class_path)
 
     if len(paths) == 0:
-        raise RuntimeError(
-            "%s doesn't exist. Spark sql test classes are not compiled." % 
test_class_path
+        raise PySparkRuntimeError(
+            error_class="TEST_CLASS_NOT_COMPILED",
+            message_parameters={"test_class_path": test_class_path},
         )
 
 
@@ -274,7 +276,10 @@ def get_active_spark_context() -> SparkContext:
     otherwise, returns the active SparkContext."""
     sc = SparkContext._active_spark_context
     if sc is None or sc._jvm is None:
-        raise RuntimeError("SparkContext or SparkSession should be created 
first.")
+        raise PySparkRuntimeError(
+            error_class="SESSION_OR_CONTEXT_NOT_EXISTS",
+            message_parameters={},
+        )
     return sc
 
 
diff --git a/python/pyspark/tests/test_profiler.py 
b/python/pyspark/tests/test_profiler.py
index 018387012fc9..b7797ead2adb 100644
--- a/python/pyspark/tests/test_profiler.py
+++ b/python/pyspark/tests/test_profiler.py
@@ -123,11 +123,16 @@ class ProfilerTests2(unittest.TestCase, 
PySparkErrorTestUtils):
             return v + 1
 
         try:
-            self.assertRaisesRegex(
-                RuntimeError,
-                "'spark.python.profile' and 'spark.python.profile.memory' 
configuration"
-                " cannot be enabled together",
-                lambda: spark.range(10).select(plus_one("id")).collect(),
+            with self.assertRaises(PySparkRuntimeError) as pe:
+                spark.range(10).select(plus_one("id")).collect()
+
+            self.check_error(
+                exception=pe.exception,
+                error_class="CANNOT_SET_TOGETHER",
+                message_parameters={
+                    "arg_list": "'spark.python.profile' and "
+                    "'spark.python.profile.memory' configuration"
+                },
             )
         finally:
             sc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to