This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 16411188c7b [SPARK-43315][CONNECT][PYTHON][SS] Migrate remaining
errors from DataFrame(Reader|Writer) into error class
16411188c7b is described below
commit 16411188c7ba6cb19c46a2bd512b2485a4c03e2c
Author: itholic <[email protected]>
AuthorDate: Sat May 6 09:47:58 2023 +0900
[SPARK-43315][CONNECT][PYTHON][SS] Migrate remaining errors from
DataFrame(Reader|Writer) into error class
### What changes were proposed in this pull request?
This PR proposes to migrate all remaining errors from
DataFrame(Reader|Writer) into error class
### Why are the changes needed?
To improve PySpark error
### Does this PR introduce _any_ user-facing change?
No API changes, only error improvement.
### How was this patch tested?
This existing CI should pass.
Closes #40986 from itholic/error_readwriter.
Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/errors/error_classes.py | 5 +++++
python/pyspark/sql/connect/readwriter.py | 23 +++++++++++++++-------
python/pyspark/sql/readwriter.py | 16 ++++++++++++---
python/pyspark/sql/streaming/readwriter.py | 21 +++++++++++++++-----
.../sql/tests/streaming/test_streaming_foreach.py | 2 +-
5 files changed, 51 insertions(+), 16 deletions(-)
diff --git a/python/pyspark/errors/error_classes.py
b/python/pyspark/errors/error_classes.py
index 27f047d0e36..6af8d5bc6ff 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -84,6 +84,11 @@ ERROR_CLASSES_JSON = """
"Cannot convert <from_type> into <to_type>."
]
},
+ "CANNOT_GET_BATCH_ID": {
+ "message": [
+ "Could not get batch id from <obj_name>."
+ ]
+ },
"CANNOT_INFER_ARRAY_TYPE": {
"message": [
"Can not infer Array Type from an list with None as the first element."
diff --git a/python/pyspark/sql/connect/readwriter.py
b/python/pyspark/sql/connect/readwriter.py
index f26ddb1da27..cfcbcede348 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -30,7 +30,7 @@ from pyspark.sql.readwriter import (
DataFrameReader as PySparkDataFrameReader,
DataFrameWriterV2 as PySparkDataFrameWriterV2,
)
-from pyspark.errors import PySparkAttributeError, PySparkTypeError
+from pyspark.errors import PySparkAttributeError, PySparkTypeError,
PySparkValueError
if TYPE_CHECKING:
from pyspark.sql.connect.dataframe import DataFrame
@@ -497,17 +497,21 @@ class DataFrameWriter(OptionUtils):
self, numBuckets: int, col: Union[str, TupleOrListOfString], *cols:
Optional[str]
) -> "DataFrameWriter":
if not isinstance(numBuckets, int):
- raise PySparkTypeError(
- error_class="NOT_INT",
+ raise PySparkValueError(
+ error_class="CANNOT_SET_TOGETHER",
message_parameters={
- "arg_name": "numBuckets",
- "arg_type": type(numBuckets).__name__,
+ "arg_list": f"`col` of type {type(col).__name__} and
`cols`",
},
)
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not
empty".format(type(col)))
+ raise PySparkValueError(
+ error_class="NOT_INT",
+ message_parameters={
+ "arg_list": "numBuckets",
+ },
+ )
col, cols = col[0], col[1:] # type: ignore[assignment]
@@ -548,7 +552,12 @@ class DataFrameWriter(OptionUtils):
) -> "DataFrameWriter":
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not
empty".format(type(col)))
+ raise PySparkValueError(
+ error_class="CANNOT_SET_TOGETHER",
+ message_parameters={
+ "arg_list": f"`col` of type {type(col).__name__} and
`cols`",
+ },
+ )
col, cols = col[0], col[1:] # type: ignore[assignment]
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2c42dae42f2..cfac8fdbc68 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -24,7 +24,7 @@ from pyspark.sql.column import _to_seq, _to_java_column,
Column
from pyspark.sql.types import StructType
from pyspark.sql import utils
from pyspark.sql.utils import to_str
-from pyspark.errors import PySparkTypeError
+from pyspark.errors import PySparkTypeError, PySparkValueError
if TYPE_CHECKING:
from pyspark.sql._typing import OptionalPrimitiveType, ColumnOrName
@@ -1281,7 +1281,12 @@ class DataFrameWriter(OptionUtils):
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not
empty".format(type(col)))
+ raise PySparkValueError(
+ error_class="CANNOT_SET_TOGETHER",
+ message_parameters={
+ "arg_list": f"`col` of type {type(col).__name__} and
`cols`",
+ },
+ )
col, cols = col[0], col[1:] # type: ignore[assignment]
@@ -1358,7 +1363,12 @@ class DataFrameWriter(OptionUtils):
"""
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not
empty".format(type(col)))
+ raise PySparkValueError(
+ error_class="CANNOT_SET_TOGETHER",
+ message_parameters={
+ "arg_list": f"`col` of type {type(col).__name__} and
`cols`",
+ },
+ )
col, cols = col[0], col[1:] # type: ignore[assignment]
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index f805d0cc152..51595a6babd 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -26,7 +26,12 @@ from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.streaming.query import StreamingQuery
from pyspark.sql.types import Row, StructType
from pyspark.sql.utils import ForeachBatchFunction
-from pyspark.errors import PySparkTypeError, PySparkValueError
+from pyspark.errors import (
+ PySparkTypeError,
+ PySparkValueError,
+ PySparkAttributeError,
+ PySparkRuntimeError,
+)
if TYPE_CHECKING:
from pyspark.sql.session import SparkSession
@@ -1251,10 +1256,13 @@ class DataStreamWriter:
# 'close(error)' methods.
if not hasattr(f, "process"):
- raise AttributeError("Provided object does not have a
'process' method")
+ raise PySparkAttributeError(
+ error_class="ATTRIBUTE_NOT_CALLABLE",
+ message_parameters={"attr_name": "process", "obj_name":
"f"},
+ )
if not callable(getattr(f, "process")):
- raise PySparkTypeError(
+ raise PySparkAttributeError(
error_class="ATTRIBUTE_NOT_CALLABLE",
message_parameters={"attr_name": "process", "obj_name":
"f"},
)
@@ -1262,7 +1270,7 @@ class DataStreamWriter:
def doesMethodExist(method_name: str) -> bool:
exists = hasattr(f, method_name)
if exists and not callable(getattr(f, method_name)):
- raise PySparkTypeError(
+ raise PySparkAttributeError(
error_class="ATTRIBUTE_NOT_CALLABLE",
message_parameters={"attr_name": method_name,
"obj_name": "f"},
)
@@ -1278,7 +1286,10 @@ class DataStreamWriter:
if epoch_id:
int_epoch_id = int(epoch_id)
else:
- raise RuntimeError("Could not get batch id from
TaskContext")
+ raise PySparkRuntimeError(
+ error_class="CANNOT_GET_BATCH_ID",
+ message_parameters={"obj_name": "TaskContext"},
+ )
# Check if the data should be processed
should_process = True
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
b/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
index 9ad3fee0972..104f8d3ed13 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
@@ -243,7 +243,7 @@ class StreamingTestsForeach(ReusedSQLTestCase):
def open(self, partition):
pass
- tester.assert_invalid_writer(WriterWithoutProcess(), "does not have a
'process'")
+ tester.assert_invalid_writer(WriterWithoutProcess(),
"ATTRIBUTE_NOT_CALLABLE")
class WriterWithNonCallableProcess:
process = True
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]