This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 16411188c7b [SPARK-43315][CONNECT][PYTHON][SS] Migrate remaining 
errors from DataFrame(Reader|Writer) into error class
16411188c7b is described below

commit 16411188c7ba6cb19c46a2bd512b2485a4c03e2c
Author: itholic <[email protected]>
AuthorDate: Sat May 6 09:47:58 2023 +0900

    [SPARK-43315][CONNECT][PYTHON][SS] Migrate remaining errors from 
DataFrame(Reader|Writer) into error class
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate all remaining errors from 
DataFrame(Reader|Writer) into error class
    
    ### Why are the changes needed?
    
    To improve PySpark error
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, only error improvement.
    
    ### How was this patch tested?
    
    This existing CI should pass.
    
    Closes #40986 from itholic/error_readwriter.
    
    Authored-by: itholic <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/errors/error_classes.py             |  5 +++++
 python/pyspark/sql/connect/readwriter.py           | 23 +++++++++++++++-------
 python/pyspark/sql/readwriter.py                   | 16 ++++++++++++---
 python/pyspark/sql/streaming/readwriter.py         | 21 +++++++++++++++-----
 .../sql/tests/streaming/test_streaming_foreach.py  |  2 +-
 5 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 27f047d0e36..6af8d5bc6ff 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -84,6 +84,11 @@ ERROR_CLASSES_JSON = """
       "Cannot convert <from_type> into <to_type>."
     ]
   },
+  "CANNOT_GET_BATCH_ID": {
+    "message": [
+      "Could not get batch id from <obj_name>."
+    ]
+  },
   "CANNOT_INFER_ARRAY_TYPE": {
     "message": [
       "Can not infer Array Type from an list with None as the first element."
diff --git a/python/pyspark/sql/connect/readwriter.py 
b/python/pyspark/sql/connect/readwriter.py
index f26ddb1da27..cfcbcede348 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -30,7 +30,7 @@ from pyspark.sql.readwriter import (
     DataFrameReader as PySparkDataFrameReader,
     DataFrameWriterV2 as PySparkDataFrameWriterV2,
 )
-from pyspark.errors import PySparkAttributeError, PySparkTypeError
+from pyspark.errors import PySparkAttributeError, PySparkTypeError, 
PySparkValueError
 
 if TYPE_CHECKING:
     from pyspark.sql.connect.dataframe import DataFrame
@@ -497,17 +497,21 @@ class DataFrameWriter(OptionUtils):
         self, numBuckets: int, col: Union[str, TupleOrListOfString], *cols: 
Optional[str]
     ) -> "DataFrameWriter":
         if not isinstance(numBuckets, int):
-            raise PySparkTypeError(
-                error_class="NOT_INT",
+            raise PySparkValueError(
+                error_class="CANNOT_SET_TOGETHER",
                 message_parameters={
-                    "arg_name": "numBuckets",
-                    "arg_type": type(numBuckets).__name__,
+                    "arg_list": f"`col` of type {type(col).__name__} and 
`cols`",
                 },
             )
 
         if isinstance(col, (list, tuple)):
             if cols:
-                raise ValueError("col is a {0} but cols are not 
empty".format(type(col)))
+                raise PySparkValueError(
+                    error_class="NOT_INT",
+                    message_parameters={
+                        "arg_list": "numBuckets",
+                    },
+                )
 
             col, cols = col[0], col[1:]  # type: ignore[assignment]
 
@@ -548,7 +552,12 @@ class DataFrameWriter(OptionUtils):
     ) -> "DataFrameWriter":
         if isinstance(col, (list, tuple)):
             if cols:
-                raise ValueError("col is a {0} but cols are not 
empty".format(type(col)))
+                raise PySparkValueError(
+                    error_class="CANNOT_SET_TOGETHER",
+                    message_parameters={
+                        "arg_list": f"`col` of type {type(col).__name__} and 
`cols`",
+                    },
+                )
 
             col, cols = col[0], col[1:]  # type: ignore[assignment]
 
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2c42dae42f2..cfac8fdbc68 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -24,7 +24,7 @@ from pyspark.sql.column import _to_seq, _to_java_column, 
Column
 from pyspark.sql.types import StructType
 from pyspark.sql import utils
 from pyspark.sql.utils import to_str
-from pyspark.errors import PySparkTypeError
+from pyspark.errors import PySparkTypeError, PySparkValueError
 
 if TYPE_CHECKING:
     from pyspark.sql._typing import OptionalPrimitiveType, ColumnOrName
@@ -1281,7 +1281,12 @@ class DataFrameWriter(OptionUtils):
 
         if isinstance(col, (list, tuple)):
             if cols:
-                raise ValueError("col is a {0} but cols are not 
empty".format(type(col)))
+                raise PySparkValueError(
+                    error_class="CANNOT_SET_TOGETHER",
+                    message_parameters={
+                        "arg_list": f"`col` of type {type(col).__name__} and 
`cols`",
+                    },
+                )
 
             col, cols = col[0], col[1:]  # type: ignore[assignment]
 
@@ -1358,7 +1363,12 @@ class DataFrameWriter(OptionUtils):
         """
         if isinstance(col, (list, tuple)):
             if cols:
-                raise ValueError("col is a {0} but cols are not 
empty".format(type(col)))
+                raise PySparkValueError(
+                    error_class="CANNOT_SET_TOGETHER",
+                    message_parameters={
+                        "arg_list": f"`col` of type {type(col).__name__} and 
`cols`",
+                    },
+                )
 
             col, cols = col[0], col[1:]  # type: ignore[assignment]
 
diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index f805d0cc152..51595a6babd 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -26,7 +26,12 @@ from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.streaming.query import StreamingQuery
 from pyspark.sql.types import Row, StructType
 from pyspark.sql.utils import ForeachBatchFunction
-from pyspark.errors import PySparkTypeError, PySparkValueError
+from pyspark.errors import (
+    PySparkTypeError,
+    PySparkValueError,
+    PySparkAttributeError,
+    PySparkRuntimeError,
+)
 
 if TYPE_CHECKING:
     from pyspark.sql.session import SparkSession
@@ -1251,10 +1256,13 @@ class DataStreamWriter:
             # 'close(error)' methods.
 
             if not hasattr(f, "process"):
-                raise AttributeError("Provided object does not have a 
'process' method")
+                raise PySparkAttributeError(
+                    error_class="ATTRIBUTE_NOT_CALLABLE",
+                    message_parameters={"attr_name": "process", "obj_name": 
"f"},
+                )
 
             if not callable(getattr(f, "process")):
-                raise PySparkTypeError(
+                raise PySparkAttributeError(
                     error_class="ATTRIBUTE_NOT_CALLABLE",
                     message_parameters={"attr_name": "process", "obj_name": 
"f"},
                 )
@@ -1262,7 +1270,7 @@ class DataStreamWriter:
             def doesMethodExist(method_name: str) -> bool:
                 exists = hasattr(f, method_name)
                 if exists and not callable(getattr(f, method_name)):
-                    raise PySparkTypeError(
+                    raise PySparkAttributeError(
                         error_class="ATTRIBUTE_NOT_CALLABLE",
                         message_parameters={"attr_name": method_name, 
"obj_name": "f"},
                     )
@@ -1278,7 +1286,10 @@ class DataStreamWriter:
                 if epoch_id:
                     int_epoch_id = int(epoch_id)
                 else:
-                    raise RuntimeError("Could not get batch id from 
TaskContext")
+                    raise PySparkRuntimeError(
+                        error_class="CANNOT_GET_BATCH_ID",
+                        message_parameters={"obj_name": "TaskContext"},
+                    )
 
                 # Check if the data should be processed
                 should_process = True
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach.py 
b/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
index 9ad3fee0972..104f8d3ed13 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach.py
@@ -243,7 +243,7 @@ class StreamingTestsForeach(ReusedSQLTestCase):
             def open(self, partition):
                 pass
 
-        tester.assert_invalid_writer(WriterWithoutProcess(), "does not have a 
'process'")
+        tester.assert_invalid_writer(WriterWithoutProcess(), 
"ATTRIBUTE_NOT_CALLABLE")
 
         class WriterWithNonCallableProcess:
             process = True


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to