This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1359567f21 [SPARK-43262][CONNECT][SS][PYTHON] Migrate Spark Connect 
Structured Streaming errors into error class
c1359567f21 is described below

commit c1359567f217b78f59a0a754b6448f942a132855
Author: itholic <[email protected]>
AuthorDate: Tue Apr 25 09:37:21 2023 +0800

    [SPARK-43262][CONNECT][SS][PYTHON] Migrate Spark Connect Structured 
Streaming errors into error class
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate built-in `TypeError` and `ValueError` from 
Spark Connect Structured Streaming into PySpark error framework.
    
    ### Why are the changes needed?
    
    To leverage the PySpark error framework for Spark Connect Structured 
Streaming
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, only error improvements.
    
    ### How was this patch tested?
    
    The existing CI should pass.
    
    Closes #40928 from itholic/error_streaming_connect.
    
    Authored-by: itholic <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/streaming/query.py      |  7 ++-
 python/pyspark/sql/connect/streaming/readwriter.py | 70 ++++++++++++++++------
 2 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index a2b2e81357e..eb196971985 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -19,7 +19,7 @@ import json
 import sys
 from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
 
-from pyspark.errors import StreamingQueryException
+from pyspark.errors import StreamingQueryException, PySparkValueError
 import pyspark.sql.connect.proto as pb2
 from pyspark.sql.streaming.query import (
     StreamingQuery as PySparkStreamingQuery,
@@ -73,7 +73,10 @@ class StreamingQuery:
         cmd = pb2.StreamingQueryCommand()
         if timeout is not None:
             if not isinstance(timeout, (int, float)) or timeout <= 0:
-                raise ValueError("timeout must be a positive integer or float. 
Got %s" % timeout)
+                raise PySparkValueError(
+                    error_class="VALUE_NOT_POSITIVE",
+                    message_parameters={"arg_name": "timeout", "arg_value": 
type(timeout).__name__},
+                )
             cmd.await_termination.timeout_ms = int(timeout * 1000)
             terminated = 
self._execute_streaming_query_cmd(cmd).await_termination.terminated
             return terminated
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index 0775c1ab4a3..df336a932cb 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -30,6 +30,7 @@ from pyspark.sql.streaming.readwriter import (
     DataStreamWriter as PySparkDataStreamWriter,
 )
 from pyspark.sql.types import Row, StructType
+from pyspark.errors import PySparkTypeError, PySparkValueError
 
 if TYPE_CHECKING:
     from pyspark.sql.connect.session import SparkSession
@@ -64,7 +65,10 @@ class DataStreamReader(OptionUtils):
         elif isinstance(schema, str):
             self._schema = schema
         else:
-            raise TypeError("schema should be StructType or string")
+            raise PySparkTypeError(
+                error_class="NOT_STR_OR_STRUCT",
+                message_parameters={"arg_name": "schema", "arg_type": 
type(schema).__name__},
+            )
         return self
 
     schema.__doc__ = PySparkDataStreamReader.schema.__doc__
@@ -95,9 +99,9 @@ class DataStreamReader(OptionUtils):
             self.schema(schema)
         self.options(**options)
         if path is not None and (type(path) != str or len(path.strip()) == 0):
-            raise ValueError(
-                "If the path is provided for stream, it needs to be a "
-                + "non-empty string. List of paths are not supported."
+            raise PySparkValueError(
+                error_class="VALUE_NOT_NON_EMPTY_STR",
+                message_parameters={"arg_name": "path", "arg_value": 
str(path)},
             )
 
         plan = DataSource(
@@ -163,7 +167,10 @@ class DataStreamReader(OptionUtils):
         if isinstance(path, str):
             return self.load(path=path, format="json")
         else:
-            raise TypeError("path can be only a single string")
+            raise PySparkTypeError(
+                error_class="NOT_STR",
+                message_parameters={"arg_name": "path", "arg_type": 
type(path).__name__},
+            )
 
     json.__doc__ = PySparkDataStreamReader.json.__doc__
 
@@ -182,7 +189,10 @@ class DataStreamReader(OptionUtils):
         if isinstance(path, str):
             return self.load(path=path, format="orc")
         else:
-            raise TypeError("path can be only a single string")
+            raise PySparkTypeError(
+                error_class="NOT_STR",
+                message_parameters={"arg_name": "path", "arg_type": 
type(path).__name__},
+            )
 
     orc.__doc__ = PySparkDataStreamReader.orc.__doc__
 
@@ -212,7 +222,10 @@ class DataStreamReader(OptionUtils):
         if isinstance(path, str):
             return self.load(path=path, format="parquet")
         else:
-            raise TypeError("path can be only a single string")
+            raise PySparkTypeError(
+                error_class="NOT_STR",
+                message_parameters={"arg_name": "path", "arg_type": 
type(path).__name__},
+            )
 
     parquet.__doc__ = PySparkDataStreamReader.parquet.__doc__
 
@@ -233,7 +246,10 @@ class DataStreamReader(OptionUtils):
         if isinstance(path, str):
             return self.load(path=path, format="text")
         else:
-            raise TypeError("path can be only a single string")
+            raise PySparkTypeError(
+                error_class="NOT_STR",
+                message_parameters={"arg_name": "path", "arg_type": 
type(path).__name__},
+            )
 
     text.__doc__ = PySparkDataStreamReader.text.__doc__
 
@@ -306,7 +322,10 @@ class DataStreamReader(OptionUtils):
         if isinstance(path, str):
             return self.load(path=path, format="csv")
         else:
-            raise TypeError("path can be only a single string")
+            raise PySparkTypeError(
+                error_class="NOT_STR",
+                message_parameters={"arg_name": "path", "arg_type": 
type(path).__name__},
+            )
 
     csv.__doc__ = PySparkDataStreamReader.csv.__doc__
 
@@ -402,32 +421,49 @@ class DataStreamWriter:
         params = [processingTime, once, continuous, availableNow]
 
         if params.count(None) == 4:
-            raise ValueError("No trigger provided")
+            raise PySparkValueError(
+                error_class="ONLY_ALLOW_SINGLE_TRIGGER",
+                message_parameters={},
+            )
         elif params.count(None) < 3:
-            raise ValueError("Multiple triggers not allowed.")
+            raise PySparkValueError(
+                error_class="ONLY_ALLOW_SINGLE_TRIGGER",
+                message_parameters={},
+            )
 
         if processingTime is not None:
             if type(processingTime) != str or len(processingTime.strip()) == 0:
-                raise ValueError(
-                    "Value for processingTime must be a non empty string. Got: 
%s" % processingTime
+                raise PySparkValueError(
+                    error_class="VALUE_NOT_NON_EMPTY_STR",
+                    message_parameters={
+                        "arg_name": "processingTime",
+                        "arg_value": str(processingTime),
+                    },
                 )
             self._write_proto.processing_time_interval = processingTime.strip()
 
         elif once is not None:
             if once is not True:
-                raise ValueError("Value for once must be True. Got: %s" % once)
+                raise PySparkValueError(
+                    error_class="VALUE_NOT_TRUE",
+                    message_parameters={"arg_name": "once", "arg_value": 
str(once)},
+                )
             self._write_proto.once = True
 
         elif continuous is not None:
             if type(continuous) != str or len(continuous.strip()) == 0:
-                raise ValueError(
-                    "Value for continuous must be a non empty string. Got: %s" 
% continuous
+                raise PySparkValueError(
+                    error_class="VALUE_NOT_NON_EMPTY_STR",
+                    message_parameters={"arg_name": "continuous", "arg_value": 
str(continuous)},
                 )
             self._write_proto.continuous_checkpoint_interval = 
continuous.strip()
 
         else:
             if availableNow is not True:
-                raise ValueError("Value for availableNow must be True. Got: 
%s" % availableNow)
+                raise PySparkValueError(
+                    error_class="VALUE_NOT_TRUE",
+                    message_parameters={"arg_name": "availableNow", 
"arg_value": str(availableNow)},
+                )
             self._write_proto.available_now = True
 
         return self


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to