This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ff2b9c2ddc0 [SPARK-43353][PYTHON] Migrate remaining session errors
into error class
ff2b9c2ddc0 is described below
commit ff2b9c2ddc07c02f6ba4c68a2fd66243919acfb6
Author: itholic <[email protected]>
AuthorDate: Tue May 30 13:49:45 2023 +0900
[SPARK-43353][PYTHON] Migrate remaining session errors into error class
### What changes were proposed in this pull request?
This PR proposes to migrate remaining Spark session errors into error class
### Why are the changes needed?
To leverage PySpark error framework.
### Does this PR introduce _any_ user-facing change?
No API changes.
### How was this patch tested?
The existing CI should pass.
Closes #41031 from itholic/error_session.
Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/errors/error_classes.py | 20 ++++++++++++++
python/pyspark/sql/session.py | 48 +++++++++++++++++++++++++++-------
2 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/python/pyspark/errors/error_classes.py
b/python/pyspark/errors/error_classes.py
index 817b8ce60db..2d82d03eb6d 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -89,6 +89,11 @@ ERROR_CLASSES_JSON = """
"Cannot convert <from_type> into <to_type>."
]
},
+ "CANNOT_DETERMINE_TYPE": {
+ "message": [
+ "Some of types cannot be determined after inferring."
+ ]
+ },
"CANNOT_GET_BATCH_ID": {
"message": [
"Could not get batch id from <obj_name>."
@@ -470,6 +475,11 @@ ERROR_CLASSES_JSON = """
"Argument `<arg_name>` should be a list[str], got <arg_type>."
]
},
+ "NOT_LIST_OR_NONE_OR_STRUCT" : {
+ "message" : [
+ "Argument `<arg_name>` should be a list, None or StructType, got
<arg_type>."
+ ]
+ },
"NOT_LIST_OR_STR_OR_TUPLE" : {
"message" : [
"Argument `<arg_name>` should be a list, str or tuple, got <arg_type>."
@@ -576,6 +586,11 @@ ERROR_CLASSES_JSON = """
"Result vector from pandas_udf was not the required length: expected
<expected>, got <actual>."
]
},
+ "SESSION_ALREADY_EXIST" : {
+ "message" : [
+ "Cannot start a remote Spark session because there is a regular Spark
session already running."
+ ]
+ },
"SESSION_NOT_SAME" : {
"message" : [
"Both Datasets must belong to the same SparkSession."
@@ -586,6 +601,11 @@ ERROR_CLASSES_JSON = """
"There should not be an existing Spark Session or Spark Context."
]
},
+ "SHOULD_NOT_DATAFRAME": {
+ "message": [
+ "Argument `<arg_name>` should not be a DataFrame."
+ ]
+ },
"SLICE_WITH_STEP" : {
"message" : [
"Slice with step is not supported."
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index df970a0bf37..e96dc9cee3f 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -64,6 +64,7 @@ from pyspark.sql.types import (
)
from pyspark.errors.exceptions.captured import install_exception_handler
from pyspark.sql.utils import is_timestamp_ntz_preferred, to_str
+from pyspark.errors import PySparkValueError, PySparkTypeError
if TYPE_CHECKING:
from pyspark.sql._typing import AtomicValue, RowLike, OptionalPrimitiveType
@@ -873,7 +874,10 @@ class SparkSession(SparkConversionMixin):
:class:`pyspark.sql.types.StructType`
"""
if not data:
- raise ValueError("can not infer schema from empty dataset")
+ raise PySparkValueError(
+ error_class="CANNOT_INFER_EMPTY_SCHEMA",
+ message_parameters={},
+ )
infer_dict_as_struct = self._jconf.inferDictAsStruct()
infer_array_from_first_element =
self._jconf.legacyInferArrayTypeFromFirstElement()
prefer_timestamp_ntz = is_timestamp_ntz_preferred()
@@ -891,7 +895,10 @@ class SparkSession(SparkConversionMixin):
),
)
if _has_nulltype(schema):
- raise ValueError("Some of types cannot be determined after
inferring")
+ raise PySparkValueError(
+ error_class="CANNOT_DETERMINE_TYPE",
+ message_parameters={},
+ )
return schema
def _inferSchema(
@@ -917,7 +924,10 @@ class SparkSession(SparkConversionMixin):
"""
first = rdd.first()
if isinstance(first, Sized) and len(first) == 0:
- raise ValueError("The first row in RDD is empty, can not infer
schema")
+ raise PySparkValueError(
+ error_class="CANNOT_INFER_EMPTY_SCHEMA",
+ message_parameters={},
+ )
infer_dict_as_struct = self._jconf.inferDictAsStruct()
infer_array_from_first_element =
self._jconf.legacyInferArrayTypeFromFirstElement()
@@ -944,9 +954,9 @@ class SparkSession(SparkConversionMixin):
if not _has_nulltype(schema):
break
else:
- raise ValueError(
- "Some of types cannot be determined by the "
- "first 100 rows, please try again with sampling"
+ raise PySparkValueError(
+ error_class="CANNOT_DETERMINE_TYPE",
+ message_parameters={},
)
else:
if samplingRatio < 0.99:
@@ -985,7 +995,13 @@ class SparkSession(SparkConversionMixin):
tupled_rdd = rdd
else:
- raise TypeError("schema should be StructType or list or None, but
got: %s" % schema)
+ raise PySparkTypeError(
+ error_class="NOT_LIST_OR_NONE_OR_STRUCT",
+ message_parameters={
+ "arg_name": "schema",
+ "arg_type": type(schema).__name__,
+ },
+ )
# convert python objects to sql data
internal_rdd = tupled_rdd.map(struct.toInternal)
@@ -1016,7 +1032,13 @@ class SparkSession(SparkConversionMixin):
tupled_data = data
else:
- raise TypeError("schema should be StructType or list or None, but
got: %s" % schema)
+ raise PySparkTypeError(
+ error_class="NOT_LIST_OR_NONE_OR_STRUCT",
+ message_parameters={
+ "arg_name": "schema",
+ "arg_type": type(schema).__name__,
+ },
+ )
# convert python objects to sql data
internal_data = [struct.toInternal(row) for row in tupled_data]
@@ -1265,7 +1287,10 @@ class SparkSession(SparkConversionMixin):
assert self._jvm is not None
self._jvm.SparkSession.setActiveSession(self._jsparkSession)
if isinstance(data, DataFrame):
- raise TypeError("data is already a DataFrame")
+ raise PySparkTypeError(
+ error_class="SHOULD_NOT_DATAFRAME",
+ message_parameters={"arg_name": "data"},
+ )
if isinstance(schema, str):
schema = cast(Union[AtomicType, StructType, str],
_parse_datatype_string(schema))
@@ -1294,7 +1319,10 @@ class SparkSession(SparkConversionMixin):
require_minimum_pandas_version()
if data.ndim not in [1, 2]:
- raise ValueError("NumPy array input should be of 1 or 2
dimensions.")
+ raise PySparkValueError(
+ error_class="INVALID_NDARRAY_DIMENSION",
+ message_parameters={"dimensions": "1 or 2"},
+ )
if data.ndim == 1 or data.shape[1] == 1:
column_names = ["value"]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]