This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ad454f46d85 [SPARK-42992][PYTHON] Introduce PySparkRuntimeError
ad454f46d85 is described below

commit ad454f46d85e17dc59991ecd6c3326a8c6de8ea3
Author: itholic <haejoon....@databricks.com>
AuthorDate: Wed Apr 26 13:51:52 2023 +0800

    [SPARK-42992][PYTHON] Introduce PySparkRuntimeError
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce new error for PySpark, and also applied it to 
existing RuntimeError under `python/pyspark/*.py`.
    
    ### Why are the changes needed?
    
    To cover the built-in RuntimeError by PySpark error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, it's internal error framework improvement.
    
    ### How was this patch tested?
    
    The existing CI should pass.
    
    Closes #40617 from itholic/pyspark_runtime_error.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/accumulators.py                     |  15 ++-
 python/pyspark/broadcast.py                        |  23 +++-
 python/pyspark/conf.py                             |   6 +-
 python/pyspark/context.py                          |  40 +++---
 python/pyspark/errors/__init__.py                  |   2 +
 python/pyspark/errors/error_classes.py             | 137 +++++++++++++++++++++
 python/pyspark/errors/exceptions/base.py           |   6 +
 python/pyspark/errors/utils.py                     |   4 +-
 python/pyspark/java_gateway.py                     |  18 ++-
 python/pyspark/profiler.py                         |   6 +-
 python/pyspark/rdd.py                              |  33 +++--
 .../sql/tests/pandas/test_pandas_grouped_map.py    |   2 +-
 python/pyspark/taskcontext.py                      |  30 +++--
 python/pyspark/tests/test_context.py               |   2 +-
 python/pyspark/tests/test_profiler.py              |  27 ++--
 python/pyspark/util.py                             |  13 +-
 python/pyspark/worker.py                           |  75 ++++++-----
 17 files changed, 346 insertions(+), 93 deletions(-)

diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index ce4bb561814..dc8520a844d 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -23,6 +23,7 @@ import threading
 from typing import Callable, Dict, Generic, Tuple, Type, TYPE_CHECKING, 
TypeVar, Union
 
 from pyspark.serializers import read_int, CPickleSerializer
+from pyspark.errors import PySparkRuntimeError
 
 if TYPE_CHECKING:
     from pyspark._typing import SupportsIAdd  # noqa: F401
@@ -140,14 +141,24 @@ class Accumulator(Generic[T]):
     def value(self) -> T:
         """Get the accumulator's value; only usable in driver program"""
         if self._deserialized:
-            raise RuntimeError("Accumulator.value cannot be accessed inside 
tasks")
+            raise PySparkRuntimeError(
+                error_class="VALUE_NOT_ACCESSIBLE",
+                message_parameters={
+                    "value": "Accumulator.value",
+                },
+            )
         return self._value
 
     @value.setter
     def value(self, value: T) -> None:
         """Sets the accumulator's value; only usable in driver program"""
         if self._deserialized:
-            raise RuntimeError("Accumulator.value cannot be accessed inside 
tasks")
+            raise PySparkRuntimeError(
+                error_class="VALUE_NOT_ACCESSIBLE",
+                message_parameters={
+                    "value": "Accumulator.value",
+                },
+            )
         self._value = value
 
     def add(self, term: T) -> None:
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index b1a9b790af5..a72bf1e059b 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -40,6 +40,7 @@ from typing.io import BinaryIO  # type: ignore[import]
 from pyspark.java_gateway import local_connect_and_auth
 from pyspark.serializers import ChunkedStream, pickle_protocol
 from pyspark.util import print_exec
+from pyspark.errors import PySparkRuntimeError
 
 if TYPE_CHECKING:
     from pyspark import SparkContext
@@ -58,7 +59,12 @@ def _from_id(bid: int) -> "Broadcast[Any]":
     from pyspark.broadcast import _broadcastRegistry
 
     if bid not in _broadcastRegistry:
-        raise RuntimeError("Broadcast variable '%s' not loaded!" % bid)
+        raise PySparkRuntimeError(
+            error_class="BROADCAST_VARIABLE_NOT_LOADED",
+            message_parameters={
+                "variable": str(bid),
+            },
+        )
     return _broadcastRegistry[bid]
 
 
@@ -293,7 +299,10 @@ class Broadcast(Generic[T]):
         >>> b.unpersist()
         """
         if self._jbroadcast is None:
-            raise RuntimeError("Broadcast can only be unpersisted in driver")
+            raise PySparkRuntimeError(
+                error_class="INVALID_BROADCAST_OPERATION",
+                message_parameters={"operation": "unpersisted"},
+            )
         self._jbroadcast.unpersist(blocking)
 
     def destroy(self, blocking: bool = False) -> None:
@@ -320,13 +329,19 @@ class Broadcast(Generic[T]):
         >>> b.destroy()
         """
         if self._jbroadcast is None:
-            raise RuntimeError("Broadcast can only be destroyed in driver")
+            raise PySparkRuntimeError(
+                error_class="INVALID_BROADCAST_OPERATION",
+                message_parameters={"operation": "destroyed"},
+            )
         self._jbroadcast.destroy(blocking)
         os.unlink(self._path)
 
     def __reduce__(self) -> Tuple[Callable[[int], "Broadcast[T]"], Tuple[int]]:
         if self._jbroadcast is None:
-            raise RuntimeError("Broadcast can only be serialized in driver")
+            raise PySparkRuntimeError(
+                error_class="INVALID_BROADCAST_OPERATION",
+                message_parameters={"operation": "serialized"},
+            )
         assert self._pickle_registry is not None
         self._pickle_registry.add(self)
         return _from_id, (self._jbroadcast.id(),)
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index 1ddc8f5ddaa..dd203e2d26b 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -21,6 +21,7 @@ import sys
 from typing import Dict, List, Optional, Tuple, cast, overload
 
 from py4j.java_gateway import JVMView, JavaObject
+from pyspark.errors import PySparkRuntimeError
 
 
 class SparkConf:
@@ -182,7 +183,10 @@ class SparkConf:
     ) -> "SparkConf":
         """Set an environment variable to be passed to executors."""
         if (key is not None and pairs is not None) or (key is None and pairs 
is None):
-            raise RuntimeError("Either pass one key-value pair or a list of 
pairs")
+            raise PySparkRuntimeError(
+                error_class="KEY_VALUE_PAIR_REQUIRED",
+                message_parameters={},
+            )
         elif key is not None:
             self.set("spark.executorEnv.{}".format(key), cast(str, value))
         elif pairs is not None:
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c3f18127121..7b89a13b115 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -68,6 +68,7 @@ from pyspark.taskcontext import TaskContext
 from pyspark.traceback_utils import CallSite, first_spark_call
 from pyspark.status import StatusTracker
 from pyspark.profiler import ProfilerCollector, BasicProfiler, 
UDFBasicProfiler, MemoryProfiler
+from pyspark.errors import PySparkRuntimeError
 from py4j.java_gateway import is_instance_of, JavaGateway, JavaObject, JVMView
 
 if TYPE_CHECKING:
@@ -180,8 +181,9 @@ class SparkContext:
         memory_profiler_cls: Type[MemoryProfiler] = MemoryProfiler,
     ):
         if "SPARK_REMOTE" in os.environ and "SPARK_LOCAL_REMOTE" not in 
os.environ:
-            raise RuntimeError(
-                "Remote client cannot create a SparkContext. Create 
SparkSession instead."
+            raise PySparkRuntimeError(
+                error_class="CONTEXT_UNAVAILABLE_FOR_REMOTE_CLIENT",
+                message_parameters={},
             )
 
         if conf is None or conf.get("spark.executor.allowSparkContext", 
"false").lower() != "true":
@@ -266,9 +268,15 @@ class SparkContext:
 
         # Check that we have at least the required parameters
         if not self._conf.contains("spark.master"):
-            raise RuntimeError("A master URL must be set in your 
configuration")
+            raise PySparkRuntimeError(
+                error_class="MASTER_URL_NOT_SET",
+                message_parameters={},
+            )
         if not self._conf.contains("spark.app.name"):
-            raise RuntimeError("An application name must be set in your 
configuration")
+            raise PySparkRuntimeError(
+                error_class="APPLICATION_NAME_NOT_SET",
+                message_parameters={},
+            )
 
         # Read back our properties from the conf in case we loaded some of 
them from
         # the classpath or an external config file
@@ -459,10 +467,9 @@ class SparkContext:
 
     def __getnewargs__(self) -> NoReturn:
         # This method is called when attempting to pickle SparkContext, which 
is always an error:
-        raise RuntimeError(
-            "It appears that you are attempting to reference SparkContext from 
a broadcast "
-            "variable, action, or transformation. SparkContext can only be 
used on the driver, "
-            "not in code that it run on workers. For more information, see 
SPARK-5063."
+        raise PySparkRuntimeError(
+            error_class="CONTEXT_ONLY_VALID_ON_DRIVER",
+            message_parameters={},
         )
 
     def __enter__(self) -> "SparkContext":
@@ -2331,9 +2338,9 @@ class SparkContext:
         if self.profiler_collector is not None:
             self.profiler_collector.show_profiles()
         else:
-            raise RuntimeError(
-                "'spark.python.profile' or 'spark.python.profile.memory' 
configuration"
-                " must be set to 'true' to enable Python profile."
+            raise PySparkRuntimeError(
+                error_class="INCORRECT_CONF_FOR_PROFILE",
+                message_parameters={},
             )
 
     def dump_profiles(self, path: str) -> None:
@@ -2348,9 +2355,9 @@ class SparkContext:
         if self.profiler_collector is not None:
             self.profiler_collector.dump_profiles(path)
         else:
-            raise RuntimeError(
-                "'spark.python.profile' or 'spark.python.profile.memory' 
configuration"
-                " must be set to 'true' to enable Python profile."
+            raise PySparkRuntimeError(
+                error_class="INCORRECT_CONF_FOR_PROFILE",
+                message_parameters={},
             )
 
     def getConf(self) -> SparkConf:
@@ -2387,7 +2394,10 @@ class SparkContext:
         Throws an exception if a SparkContext is about to be created in 
executors.
         """
         if TaskContext.get() is not None:
-            raise RuntimeError("SparkContext should only be created and 
accessed on the driver.")
+            raise PySparkRuntimeError(
+                error_class="CONTEXT_ONLY_VALID_ON_DRIVER",
+                message_parameters={},
+            )
 
 
 def _test() -> None:
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index 05b4f192940..1525f351ea4 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -37,6 +37,7 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
     PySparkTypeError,
     PySparkValueError,
     PySparkAttributeError,
+    PySparkRuntimeError,
     PySparkAssertionError,
     PySparkNotImplementedError,
 )
@@ -61,6 +62,7 @@ __all__ = [
     "PySparkTypeError",
     "PySparkValueError",
     "PySparkAttributeError",
+    "PySparkRuntimeError",
     "PySparkAssertionError",
     "PySparkNotImplementedError",
 ]
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 046132eb0ce..34efd471707 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -19,6 +19,11 @@ import json
 
 ERROR_CLASSES_JSON = """
 {
+  "APPLICATION_NAME_NOT_SET" : {
+    "message" : [
+      "An application name must be set in your configuration."
+    ]
+  },
   "ARGUMENT_REQUIRED": {
     "message": [
       "Argument `<arg_name>` is required when <condition>."
@@ -29,6 +34,16 @@ ERROR_CLASSES_JSON = """
       "Attribute `<attr_name>` in provided object `<obj_name>` is not 
callable."
     ]
   },
+  "BROADCAST_VARIABLE_NOT_LOADED": {
+    "message": [
+      "Broadcast variable `<variable>` not loaded."
+    ]
+  },
+  "CALL_BEFORE_INITIALIZE": {
+    "message": [
+      "Not supported to call `<func_name>` before initialize <object>."
+    ]
+  },
   "CANNOT_ACCESS_TO_DUNDER": {
     "message": [
       "Dunder(double underscore) attribute is for internal use only."
@@ -59,6 +74,11 @@ ERROR_CLASSES_JSON = """
       "Can not infer Array Type from an list with None as the first element."
     ]
   },
+  "CANNOT_OPEN_SOCKET": {
+    "message": [
+      "Can not open socket: <errors>."
+    ]
+  },
   "CANNOT_PARSE_DATATYPE": {
     "message": [
       "Unable to parse datatype from schema. <error>."
@@ -84,6 +104,16 @@ ERROR_CLASSES_JSON = """
       "`<func_name>` does not allow a Column in a list."
     ]
   },
+  "CONTEXT_ONLY_VALID_ON_DRIVER" : {
+    "message" : [
+      "It appears that you are attempting to reference SparkContext from a 
broadcast variable, action, or transformation. SparkContext can only be used on 
the driver, not in code that it run on workers. For more information, see 
SPARK-5063."
+    ]
+  },
+  "CONTEXT_UNAVAILABLE_FOR_REMOTE_CLIENT" : {
+    "message" : [
+      "Remote client cannot create a SparkContext. Create SparkSession 
instead."
+    ]
+  },
   "DISALLOWED_TYPE_FOR_CONTAINER" : {
     "message" : [
       "Argument `<arg_name>`(type: <arg_type>) should only contain a type in 
[<allowed_types>], got <return_type>"
@@ -94,6 +124,17 @@ ERROR_CLASSES_JSON = """
       "Function `<func_name>` should return Column, got <return_type>."
     ]
   },
+  "INCORRECT_CONF_FOR_PROFILE" : {
+    "message" : [
+      "`spark.python.profile` or `spark.python.profile.memory` configuration",
+      " must be set to `true` to enable Python profile."
+    ]
+  },
+  "INVALID_BROADCAST_OPERATION": {
+    "message": [
+      "Broadcast can only be <operation> in driver."
+    ]
+  },
   "INVALID_CALL_ON_UNRESOLVED_OBJECT": {
     "message": [
       "Invalid call to `<func_name>` on unresolved object."
@@ -124,16 +165,41 @@ ERROR_CLASSES_JSON = """
       "when() can only be applied on a Column previously generated by when() 
function, and cannot be applied once otherwise() is applied."
     ]
   },
+  "INVALID_WINDOW_BOUND_TYPE" : {
+    "message" : [
+      "Invalid window bound type: <window_bound_type>."
+    ]
+  },
+  "JAVA_GATEWAY_EXITED" : {
+    "message" : [
+      "Java gateway process exited before sending its port number."
+    ]
+  },
   "JVM_ATTRIBUTE_NOT_SUPPORTED" : {
     "message" : [
       "Attribute `<attr_name>` is not supported in Spark Connect as it depends 
on the JVM. If you need to use this attribute, do not use Spark Connect when 
creating your session."
     ]
   },
+  "KEY_VALUE_PAIR_REQUIRED" : {
+    "message" : [
+      "Key-value pair or a list of pairs is required."
+    ]
+  },
   "LENGTH_SHOULD_BE_THE_SAME" : {
     "message" : [
       "<arg1> and <arg2> should be of the same length, got <arg1_length> and 
<arg2_length>."
     ]
   },
+  "MASTER_URL_NOT_SET" : {
+    "message" : [
+      "A master URL must be set in your configuration."
+    ]
+  },
+  "MISSING_LIBRARY_FOR_PROFILER" : {
+    "message" : [
+      "Install the 'memory_profiler' library in the cluster to enable memory 
profiling."
+    ]
+  },
   "MISSING_VALID_PLAN" : {
     "message" : [
       "Argument to <operator> does not contain a valid plan."
@@ -269,6 +335,11 @@ ERROR_CLASSES_JSON = """
       "Argument `<arg_name>` should be an int, got <arg_type>."
     ]
   },
+  "NOT_IN_BARRIER_STAGE" : {
+    "message" : [
+      "It is not in a barrier stage."
+    ]
+  },
   "NOT_ITERABLE" : {
     "message" : [
       "<objectName> is not iterable."
@@ -354,6 +425,52 @@ ERROR_CLASSES_JSON = """
       "Only a single trigger is allowed."
     ]
   },
+  "PIPE_FUNCTION_EXITED" : {
+    "message" : [
+      "Pipe function `<func_name>` exited with error code <error_code>."
+    ]
+  },
+  "PYTHON_HASH_SEED_NOT_SET" : {
+    "message" : [
+      "Randomness of hash of string should be disabled via PYTHONHASHSEED."
+    ]
+  },
+  "PYTHON_VERSION_MISMATCH" : {
+    "message" : [
+      "Python in worker has different version <worker_version> than that in 
driver <driver_version>, PySpark cannot run with different minor versions.",
+      "Please check environment variables PYSPARK_PYTHON and 
PYSPARK_DRIVER_PYTHON are correctly set."
+    ]
+  },
+  "RDD_TRANSFORM_ONLY_VALID_ON_DRIVER" : {
+    "message" : [
+      "It appears that you are attempting to broadcast an RDD or reference an 
RDD from an ",
+      "action or transformation. RDD transformations and actions can only be 
invoked by the ",
+      "driver, not inside of other transformations; for example, ",
+      "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the 
values ",
+      "transformation and count action cannot be performed inside of the 
rdd1.map ",
+      "transformation. For more information, see SPARK-5063."
+    ]
+  },
+  "RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF" : {
+    "message" : [
+      "Column names of the returned pandas.DataFrame do not match specified 
schema.<missing><extra>"
+    ]
+  },
+  "RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF" : {
+    "message" : [
+      "Number of columns of the returned pandas.DataFrame doesn't match 
specified schema. Expected: <expected> Actual: <actual>"
+    ]
+  },
+  "RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF" : {
+    "message" : [
+      "The length of output in Scalar iterator pandas UDF should be the same 
with the input's; however, the length of output was <output_length> and the 
length of input was <input_length>."
+    ]
+  },
+  "SCHEMA_MISMATCH_FOR_PANDAS_UDF" : {
+    "message" : [
+      "Result vector from pandas_udf was not the required length: expected 
<expected>, got <actual>."
+    ]
+  },
   "SLICE_WITH_STEP" : {
     "message" : [
       "Slice with step is not supported."
@@ -364,6 +481,21 @@ ERROR_CLASSES_JSON = """
       "State is either not defined or has already been removed."
     ]
   },
+  "STOP_ITERATION_OCCURRED" : {
+    "message" : [
+      "Caught StopIteration thrown from user's code; failing the task: <exc>"
+    ]
+  },
+  "STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF" : {
+    "message" : [
+      "pandas iterator UDF should exhaust the input iterator."
+    ]
+  },
+  "UNEXPECTED_RESPONSE_FROM_SERVER" : {
+    "message" : [
+      "Unexpected response from iterator server."
+    ]
+  },
   "UNSUPPORTED_DATA_TYPE" : {
     "message" : [
       "Unsupported DataType `<data_type>`."
@@ -399,6 +531,11 @@ ERROR_CLASSES_JSON = """
       "Function `<func_name>` should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments."
     ]
   },
+  "VALUE_NOT_ACCESSIBLE": {
+    "message": [
+      "Value `<value>` cannot be accessed inside tasks."
+    ]
+  },
   "VALUE_NOT_ANY_OR_ALL" : {
     "message" : [
       "Value for `<arg_name>` must be 'any' or 'all', got '<arg_value>'."
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index 9b229f6376e..543ee9473e3 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -198,6 +198,12 @@ class PySparkAttributeError(PySparkException, 
AttributeError):
     """
 
 
+class PySparkRuntimeError(PySparkException, RuntimeError):
+    """
+    Wrapper class for RuntimeError to support error classes.
+    """
+
+
 class PySparkAssertionError(PySparkException, AssertionError):
     """
     Wrapper class for AssertionError to support error classes.
diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py
index 69a72f86b9f..bbe8d613315 100644
--- a/python/pyspark/errors/utils.py
+++ b/python/pyspark/errors/utils.py
@@ -72,7 +72,7 @@ class ErrorClassesReader:
               "message" : [
                 "Problem <A> because of <B>."
               ],
-              "subClass" : {
+              "sub_class" : {
                 "SUB_ERROR_CLASS" : {
                   "message" : [
                     "Do <C> to fix the problem."
@@ -104,7 +104,7 @@ class ErrorClassesReader:
         else:
             # Generate message template for sub error class if exists.
             sub_error_class = error_classes[1]
-            main_error_class_subclass_info_map = 
main_error_class_info_map["subClass"]
+            main_error_class_subclass_info_map = 
main_error_class_info_map["sub_class"]
             if sub_error_class in main_error_class_subclass_info_map:
                 sub_error_class_info_map = 
main_error_class_subclass_info_map[sub_error_class]
             else:
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index aee206dd6b3..691120a1312 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -30,6 +30,7 @@ from py4j.java_gateway import java_import, JavaGateway, 
JavaObject, GatewayParam
 from py4j.clientserver import ClientServer, JavaParameters, PythonParameters
 from pyspark.find_spark_home import _find_spark_home
 from pyspark.serializers import read_int, write_with_length, UTF8Deserializer
+from pyspark.errors import PySparkRuntimeError
 
 
 def launch_gateway(conf=None, popen_kwargs=None):
@@ -103,7 +104,10 @@ def launch_gateway(conf=None, popen_kwargs=None):
                 time.sleep(0.1)
 
             if not os.path.isfile(conn_info_file):
-                raise RuntimeError("Java gateway process exited before sending 
its port number")
+                raise PySparkRuntimeError(
+                    error_class="JAVA_GATEWAY_EXITED",
+                    message_parameters={},
+                )
 
             with open(conn_info_file, "rb") as info:
                 gateway_port = read_int(info)
@@ -172,7 +176,10 @@ def _do_server_auth(conn, auth_secret):
     reply = UTF8Deserializer().loads(conn)
     if reply != "ok":
         conn.close()
-        raise RuntimeError("Unexpected reply from iterator server.")
+        raise PySparkRuntimeError(
+            error_class="UNEXPECTED_RESPONSE_FROM_SERVER",
+            message_parameters={},
+        )
 
 
 def local_connect_and_auth(port, auth_secret):
@@ -210,7 +217,12 @@ def local_connect_and_auth(port, auth_secret):
             errors.append("tried to connect to %s, but an error occurred: %s" 
% (sa, emsg))
             sock.close()
             sock = None
-    raise RuntimeError("could not open socket: %s" % errors)
+    raise PySparkRuntimeError(
+        error_class="CANNOT_OPEN_SOCKET",
+        message_parameters={
+            "errors": str(errors),
+        },
+    )
 
 
 def ensure_callback_server_started(gw):
diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py
index cb668b874a2..d7990d2fd1b 100644
--- a/python/pyspark/profiler.py
+++ b/python/pyspark/profiler.py
@@ -45,6 +45,7 @@ except Exception:
     has_memory_profiler = False
 
 from pyspark.accumulators import AccumulatorParam
+from pyspark.errors import PySparkRuntimeError
 
 if TYPE_CHECKING:
     from pyspark.context import SparkContext
@@ -413,8 +414,9 @@ class MemoryProfiler(Profiler):
             self._accumulator.add(codemap_dict)  # type: ignore[arg-type]
             return ret
         else:
-            raise RuntimeError(
-                "Install the 'memory_profiler' library in the cluster to 
enable memory profiling."
+            raise PySparkRuntimeError(
+                error_class="MISSING_LIBRARY_FOR_PROFILER",
+                message_parameters={},
             )
 
     def stats(self) -> CodeMapDict:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 06e4facc962..13f93fbdad6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -87,6 +87,7 @@ from pyspark.shuffle import (
 )
 from pyspark.traceback_utils import SCCallSiteSync
 from pyspark.util import fail_on_stopiteration, _parse_memory
+from pyspark.errors import PySparkRuntimeError
 
 
 if TYPE_CHECKING:
@@ -167,7 +168,10 @@ def portable_hash(x: Hashable) -> int:
     """
 
     if "PYTHONHASHSEED" not in os.environ:
-        raise RuntimeError("Randomness of hash of string should be disabled 
via PYTHONHASHSEED")
+        raise PySparkRuntimeError(
+            error_class="PYTHON_HASH_SEED_NOT_SET",
+            message_parameters={},
+        )
 
     if x is None:
         return 0
@@ -368,13 +372,9 @@ class RDD(Generic[T_co]):
 
     def __getnewargs__(self) -> NoReturn:
         # This method is called when attempting to pickle an RDD, which is 
always an error:
-        raise RuntimeError(
-            "It appears that you are attempting to broadcast an RDD or 
reference an RDD from an "
-            "action or transformation. RDD transformations and actions can 
only be invoked by the "
-            "driver, not inside of other transformations; for example, "
-            "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because 
the values "
-            "transformation and count action cannot be performed inside of the 
rdd1.map "
-            "transformation. For more information, see SPARK-5063."
+        raise PySparkRuntimeError(
+            error_class="RDD_TRANSFORM_ONLY_VALID_ON_DRIVER",
+            message_parameters={},
         )
 
     @property
@@ -1699,9 +1699,12 @@ class RDD(Generic[T_co]):
             def check_return_code() -> Iterable[int]:
                 pipe.wait()
                 if checkCode and pipe.returncode:
-                    raise RuntimeError(
-                        "Pipe function `%s' exited "
-                        "with error code %d" % (command, pipe.returncode)
+                    raise PySparkRuntimeError(
+                        error_class="PIPE_FUNCTION_EXITED",
+                        message_parameters={
+                            "func_name": command,
+                            "error_code": str(pipe.returncode),
+                        },
                     )
                 else:
                     for i in range(0):
@@ -5215,7 +5218,13 @@ class RDD(Generic[T_co]):
     def toDF(
         self: "RDD[Any]", schema: Optional[Any] = None, sampleRatio: 
Optional[float] = None
     ) -> "DataFrame":
-        raise RuntimeError("""RDD.toDF was called before SparkSession was 
initialized.""")
+        raise PySparkRuntimeError(
+            error_class="CALL_BEFORE_INITIALIZE",
+            message_parameters={
+                "func_name": "RDD.toDF",
+                "object": "SparkSession",
+            },
+        )
 
 
 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, 
Any, Any, Any]:
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 0eff994ead7..0a2f81c6e5b 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -596,7 +596,7 @@ class GroupedApplyInPandasTestsMixin:
             with QuietTest(self.sc):
                 with self.assertRaisesRegex(
                     PythonException,
-                    "RuntimeError: Column names of the returned 
pandas.DataFrame do not match "
+                    "Column names of the returned pandas.DataFrame do not 
match "
                     "specified schema. Missing: id. Unexpected: iid.\n",
                 ):
                     grouped_df.apply(column_name_typo).collect()
diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py
index 5e8765e2cbb..329820c3ee0 100644
--- a/python/pyspark/taskcontext.py
+++ b/python/pyspark/taskcontext.py
@@ -19,6 +19,7 @@ from typing import ClassVar, Type, Dict, List, Optional, 
Union, cast
 from pyspark.java_gateway import local_connect_and_auth
 from pyspark.resource import ResourceInformation
 from pyspark.serializers import read_int, write_int, write_with_length, 
UTF8Deserializer
+from pyspark.errors import PySparkRuntimeError
 
 
 class TaskContext:
@@ -355,7 +356,10 @@ class BarrierTaskContext(TaskContext):
         This API is experimental
         """
         if not isinstance(cls._taskContext, BarrierTaskContext):
-            raise RuntimeError("It is not in a barrier stage")
+            raise PySparkRuntimeError(
+                error_class="NOT_IN_BARRIER_STAGE",
+                message_parameters={},
+            )
         return cls._taskContext
 
     @classmethod
@@ -386,8 +390,12 @@ class BarrierTaskContext(TaskContext):
         or a `SparkException` after timeout.
         """
         if self._port is None or self._secret is None:
-            raise RuntimeError(
-                "Not supported to call barrier() before initialize 
BarrierTaskContext."
+            raise PySparkRuntimeError(
+                error_class="CALL_BEFORE_INITIALIZE",
+                message_parameters={
+                    "func_name": "barrier",
+                    "object": "BarrierTaskContext",
+                },
             )
         else:
             _load_from_socket(self._port, self._secret, BARRIER_FUNCTION)
@@ -411,8 +419,12 @@ class BarrierTaskContext(TaskContext):
         if not isinstance(message, str):
             raise TypeError("Argument `message` must be of type `str`")
         elif self._port is None or self._secret is None:
-            raise RuntimeError(
-                "Not supported to call barrier() before initialize 
BarrierTaskContext."
+            raise PySparkRuntimeError(
+                error_class="CALL_BEFORE_INITIALIZE",
+                message_parameters={
+                    "func_name": "allGather",
+                    "object": "BarrierTaskContext",
+                },
             )
         else:
             return _load_from_socket(self._port, self._secret, 
ALL_GATHER_FUNCTION, message)
@@ -438,8 +450,12 @@ class BarrierTaskContext(TaskContext):
         '...:...'
         """
         if self._port is None or self._secret is None:
-            raise RuntimeError(
-                "Not supported to call getTaskInfos() before initialize " + 
"BarrierTaskContext."
+            raise PySparkRuntimeError(
+                error_class="CALL_BEFORE_INITIALIZE",
+                message_parameters={
+                    "func_name": "getTaskInfos",
+                    "object": "BarrierTaskContext",
+                },
             )
         else:
             addresses = cast(Dict[str, str], 
self._localProperties).get("addresses", "")
diff --git a/python/pyspark/tests/test_context.py 
b/python/pyspark/tests/test_context.py
index d819656f3b7..0a9628977af 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -287,7 +287,7 @@ class ContextTests(unittest.TestCase):
             with self.assertRaises(Exception) as context:
                 sc.range(2).foreach(lambda _: SparkContext())
             self.assertIn(
-                "SparkContext should only be created and accessed on the 
driver.",
+                "CONTEXT_ONLY_VALID_ON_DRIVER",
                 str(context.exception),
             )
 
diff --git a/python/pyspark/tests/test_profiler.py 
b/python/pyspark/tests/test_profiler.py
index 58a486cbbcc..018387012fc 100644
--- a/python/pyspark/tests/test_profiler.py
+++ b/python/pyspark/tests/test_profiler.py
@@ -25,8 +25,8 @@ from pyspark import SparkConf, SparkContext, BasicProfiler
 from pyspark.profiler import has_memory_profiler
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import udf
-from pyspark.errors import PythonException
-from pyspark.testing.utils import PySparkTestCase
+from pyspark.errors import PythonException, PySparkRuntimeError
+from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils
 
 
 class ProfilerTests(PySparkTestCase):
@@ -84,7 +84,7 @@ class ProfilerTests(PySparkTestCase):
         rdd.foreach(heavy_foo)
 
 
-class ProfilerTests2(unittest.TestCase):
+class ProfilerTests2(unittest.TestCase, PySparkErrorTestUtils):
     def test_profiler_disabled(self):
         sc = SparkContext(
             conf=SparkConf()
@@ -92,15 +92,20 @@ class ProfilerTests2(unittest.TestCase):
             .set("spark.python.profile.memory", "false")
         )
         try:
-            self.assertRaisesRegex(
-                RuntimeError,
-                "'spark.python.profile' or 'spark.python.profile.memory' 
configuration must be set",
-                lambda: sc.show_profiles(),
+            with self.assertRaises(PySparkRuntimeError) as pe:
+                sc.show_profiles()
+            self.check_error(
+                exception=pe.exception,
+                error_class="INCORRECT_CONF_FOR_PROFILE",
+                message_parameters={},
             )
-            self.assertRaisesRegex(
-                RuntimeError,
-                "'spark.python.profile' or 'spark.python.profile.memory' 
configuration must be set",
-                lambda: sc.dump_profiles("/tmp/abc"),
+
+            with self.assertRaises(PySparkRuntimeError) as pe:
+                sc.dump_profiles("/tmp/abc")
+            self.check_error(
+                exception=pe.exception,
+                error_class="INCORRECT_CONF_FOR_PROFILE",
+                message_parameters={},
             )
         finally:
             sc.stop()
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index b7b972a5d35..5232c929e16 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -27,6 +27,8 @@ import traceback
 from types import TracebackType
 from typing import Any, Callable, Iterator, List, Optional, TextIO, Tuple
 
+from pyspark.errors import PySparkRuntimeError
+
 from py4j.clientserver import ClientServer
 
 __all__: List[str] = []
@@ -80,8 +82,11 @@ def fail_on_stopiteration(f: Callable) -> Callable:
         try:
             return f(*args, **kwargs)
         except StopIteration as exc:
-            raise RuntimeError(
-                "Caught StopIteration thrown from user's code; failing the 
task", exc
+            raise PySparkRuntimeError(
+                error_class="STOP_ITERATION_OCCURRED",
+                message_parameters={
+                    "exc": str(exc),
+                },
             )
 
     return wrapper
@@ -152,7 +157,7 @@ def try_simplify_traceback(tb: TracebackType) -> 
Optional[TracebackType]:
         ...
       File "/.../pyspark/util.py", line ...
         ...
-    RuntimeError: ...
+    pyspark.errors.exceptions.base.PySparkRuntimeError: ...
     >>> "pyspark/util.py" in exc_info
     True
 
@@ -168,7 +173,7 @@ def try_simplify_traceback(tb: TracebackType) -> 
Optional[TracebackType]:
     ...         traceback.format_exception(
     ...             type(e), e, 
try_simplify_traceback(skip_doctest_traceback(tb))))
     >>> print(exc_info)  # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS
-    RuntimeError: ...
+    pyspark.errors.exceptions.base.PySparkRuntimeError: ...
     >>> "pyspark/util.py" in exc_info
     False
 
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c9d9ea866a8..bc40e5fc4ef 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -64,6 +64,7 @@ from pyspark.sql.pandas.types import to_arrow_type
 from pyspark.sql.types import StructType
 from pyspark.util import fail_on_stopiteration, try_simplify_traceback
 from pyspark import shuffle
+from pyspark.errors import PySparkRuntimeError
 
 pickleSer = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
@@ -117,9 +118,12 @@ def wrap_scalar_pandas_udf(f, return_type):
 
     def verify_result_length(result, length):
         if len(result) != length:
-            raise RuntimeError(
-                "Result vector from pandas_udf was not the required length: "
-                "expected %d, got %d" % (length, len(result))
+            raise PySparkRuntimeError(
+                error_class="SCHEMA_MISMATCH_FOR_PANDAS_UDF",
+                message_parameters={
+                    "expected": str(length),
+                    "actual": str(len(result)),
+                },
             )
         return result
 
@@ -173,16 +177,21 @@ def verify_pandas_result(result, return_type, 
assign_cols_by_name):
             extra = sorted(list(column_names.difference(field_names)))
             extra = f" Unexpected: {', '.join(extra)}." if extra else ""
 
-            raise RuntimeError(
-                "Column names of the returned pandas.DataFrame do not match 
specified schema."
-                "{}{}".format(missing, extra)
+            raise PySparkRuntimeError(
+                error_class="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF",
+                message_parameters={
+                    "missing": missing,
+                    "extra": extra,
+                },
             )
         # otherwise the number of columns of result have to match the return 
type
         elif len(result.columns) != len(return_type):
-            raise RuntimeError(
-                "Number of columns of the returned pandas.DataFrame "
-                "doesn't match specified schema. "
-                "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns))
+            raise PySparkRuntimeError(
+                error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
+                message_parameters={
+                    "expected": str(len(return_type)),
+                    "actual": str(len(result.columns)),
+                },
             )
 
 
@@ -278,10 +287,12 @@ def wrap_grouped_map_pandas_udf_with_state(f, 
return_type):
                 len(result.columns) == len(return_type)
                 or (len(result.columns) == 0 and result.empty)
             ):
-                raise RuntimeError(
-                    "Number of columns of the element (pandas.DataFrame) in 
return iterator "
-                    "doesn't match specified schema. "
-                    "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns))
+                raise PySparkRuntimeError(
+                    error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
+                    message_parameters={
+                        "expected": str(len(return_type)),
+                        "actual": str(len(result.columns)),
+                    },
                 )
 
             return result
@@ -330,7 +341,12 @@ def wrap_window_agg_pandas_udf(f, return_type, 
runner_conf, udf_index):
     elif window_bound_type == "unbounded":
         return wrap_unbounded_window_agg_pandas_udf(f, return_type)
     else:
-        raise RuntimeError("Invalid window bound type: {} 
".format(window_bound_type))
+        raise PySparkRuntimeError(
+            error_class="INVALID_WINDOW_BOUND_TYPE",
+            message_parameters={
+                "window_bound_type": window_bound_type,
+            },
+        )
 
 
 def wrap_unbounded_window_agg_pandas_udf(f, return_type):
@@ -554,13 +570,18 @@ def read_udfs(pickleSer, infile, eval_type):
                 except StopIteration:
                     pass
                 else:
-                    raise RuntimeError("pandas iterator UDF should exhaust the 
input " "iterator.")
+                    raise PySparkRuntimeError(
+                        
error_class="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF",
+                        message_parameters={},
+                    )
 
                 if num_output_rows != num_input_rows:
-                    raise RuntimeError(
-                        "The length of output in Scalar iterator pandas UDF 
should be "
-                        "the same with the input's; however, the length of 
output was %d and the "
-                        "length of input was %d." % (num_output_rows, 
num_input_rows)
+                    raise PySparkRuntimeError(
+                        
error_class="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF",
+                        message_parameters={
+                            "output_length": str(num_output_rows),
+                            "input_length": str(num_input_rows),
+                        },
                     )
 
         # profiling is not supported for UDF
@@ -700,14 +721,12 @@ def main(infile, outfile):
 
         version = utf8_deserializer.loads(infile)
         if version != "%d.%d" % sys.version_info[:2]:
-            raise RuntimeError(
-                (
-                    "Python in worker has different version %s than that in "
-                    + "driver %s, PySpark cannot run with different minor 
versions. "
-                    + "Please check environment variables PYSPARK_PYTHON and "
-                    + "PYSPARK_DRIVER_PYTHON are correctly set."
-                )
-                % ("%d.%d" % sys.version_info[:2], version)
+            raise PySparkRuntimeError(
+                error_class="PYTHON_VERSION_MISMATCH",
+                message_parameters={
+                    "worker_version": str(sys.version_info[:2]),
+                    "driver_version": str(version),
+                },
             )
 
         # read inputs only for a barrier task


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to