This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c1359567f21 [SPARK-43262][CONNECT][SS][PYTHON] Migrate Spark Connect
Structured Streaming errors into error class
c1359567f21 is described below
commit c1359567f217b78f59a0a754b6448f942a132855
Author: itholic <[email protected]>
AuthorDate: Tue Apr 25 09:37:21 2023 +0800
[SPARK-43262][CONNECT][SS][PYTHON] Migrate Spark Connect Structured
Streaming errors into error class
### What changes were proposed in this pull request?
This PR proposes to migrate built-in `TypeError` and `ValueError` from
Spark Connect Structured Streaming into PySpark error framework.
### Why are the changes needed?
To leverage the PySpark error framework for Spark Connect Structured
Streaming
### Does this PR introduce _any_ user-facing change?
No API changes, only error improvements.
### How was this patch tested?
The existing CI should pass.
Closes #40928 from itholic/error_streaming_connect.
Authored-by: itholic <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/streaming/query.py | 7 ++-
python/pyspark/sql/connect/streaming/readwriter.py | 70 ++++++++++++++++------
2 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/python/pyspark/sql/connect/streaming/query.py
b/python/pyspark/sql/connect/streaming/query.py
index a2b2e81357e..eb196971985 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -19,7 +19,7 @@ import json
import sys
from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
-from pyspark.errors import StreamingQueryException
+from pyspark.errors import StreamingQueryException, PySparkValueError
import pyspark.sql.connect.proto as pb2
from pyspark.sql.streaming.query import (
StreamingQuery as PySparkStreamingQuery,
@@ -73,7 +73,10 @@ class StreamingQuery:
cmd = pb2.StreamingQueryCommand()
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout <= 0:
- raise ValueError("timeout must be a positive integer or float.
Got %s" % timeout)
+ raise PySparkValueError(
+ error_class="VALUE_NOT_POSITIVE",
+ message_parameters={"arg_name": "timeout", "arg_value":
type(timeout).__name__},
+ )
cmd.await_termination.timeout_ms = int(timeout * 1000)
terminated =
self._execute_streaming_query_cmd(cmd).await_termination.terminated
return terminated
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py
b/python/pyspark/sql/connect/streaming/readwriter.py
index 0775c1ab4a3..df336a932cb 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -30,6 +30,7 @@ from pyspark.sql.streaming.readwriter import (
DataStreamWriter as PySparkDataStreamWriter,
)
from pyspark.sql.types import Row, StructType
+from pyspark.errors import PySparkTypeError, PySparkValueError
if TYPE_CHECKING:
from pyspark.sql.connect.session import SparkSession
@@ -64,7 +65,10 @@ class DataStreamReader(OptionUtils):
elif isinstance(schema, str):
self._schema = schema
else:
- raise TypeError("schema should be StructType or string")
+ raise PySparkTypeError(
+ error_class="NOT_STR_OR_STRUCT",
+ message_parameters={"arg_name": "schema", "arg_type":
type(schema).__name__},
+ )
return self
schema.__doc__ = PySparkDataStreamReader.schema.__doc__
@@ -95,9 +99,9 @@ class DataStreamReader(OptionUtils):
self.schema(schema)
self.options(**options)
if path is not None and (type(path) != str or len(path.strip()) == 0):
- raise ValueError(
- "If the path is provided for stream, it needs to be a "
- + "non-empty string. List of paths are not supported."
+ raise PySparkValueError(
+ error_class="VALUE_NOT_NON_EMPTY_STR",
+ message_parameters={"arg_name": "path", "arg_value":
str(path)},
)
plan = DataSource(
@@ -163,7 +167,10 @@ class DataStreamReader(OptionUtils):
if isinstance(path, str):
return self.load(path=path, format="json")
else:
- raise TypeError("path can be only a single string")
+ raise PySparkTypeError(
+ error_class="NOT_STR",
+ message_parameters={"arg_name": "path", "arg_type":
type(path).__name__},
+ )
json.__doc__ = PySparkDataStreamReader.json.__doc__
@@ -182,7 +189,10 @@ class DataStreamReader(OptionUtils):
if isinstance(path, str):
return self.load(path=path, format="orc")
else:
- raise TypeError("path can be only a single string")
+ raise PySparkTypeError(
+ error_class="NOT_STR",
+ message_parameters={"arg_name": "path", "arg_type":
type(path).__name__},
+ )
orc.__doc__ = PySparkDataStreamReader.orc.__doc__
@@ -212,7 +222,10 @@ class DataStreamReader(OptionUtils):
if isinstance(path, str):
return self.load(path=path, format="parquet")
else:
- raise TypeError("path can be only a single string")
+ raise PySparkTypeError(
+ error_class="NOT_STR",
+ message_parameters={"arg_name": "path", "arg_type":
type(path).__name__},
+ )
parquet.__doc__ = PySparkDataStreamReader.parquet.__doc__
@@ -233,7 +246,10 @@ class DataStreamReader(OptionUtils):
if isinstance(path, str):
return self.load(path=path, format="text")
else:
- raise TypeError("path can be only a single string")
+ raise PySparkTypeError(
+ error_class="NOT_STR",
+ message_parameters={"arg_name": "path", "arg_type":
type(path).__name__},
+ )
text.__doc__ = PySparkDataStreamReader.text.__doc__
@@ -306,7 +322,10 @@ class DataStreamReader(OptionUtils):
if isinstance(path, str):
return self.load(path=path, format="csv")
else:
- raise TypeError("path can be only a single string")
+ raise PySparkTypeError(
+ error_class="NOT_STR",
+ message_parameters={"arg_name": "path", "arg_type":
type(path).__name__},
+ )
csv.__doc__ = PySparkDataStreamReader.csv.__doc__
@@ -402,32 +421,49 @@ class DataStreamWriter:
params = [processingTime, once, continuous, availableNow]
if params.count(None) == 4:
- raise ValueError("No trigger provided")
+ raise PySparkValueError(
+ error_class="ONLY_ALLOW_SINGLE_TRIGGER",
+ message_parameters={},
+ )
elif params.count(None) < 3:
- raise ValueError("Multiple triggers not allowed.")
+ raise PySparkValueError(
+ error_class="ONLY_ALLOW_SINGLE_TRIGGER",
+ message_parameters={},
+ )
if processingTime is not None:
if type(processingTime) != str or len(processingTime.strip()) == 0:
- raise ValueError(
- "Value for processingTime must be a non empty string. Got:
%s" % processingTime
+ raise PySparkValueError(
+ error_class="VALUE_NOT_NON_EMPTY_STR",
+ message_parameters={
+ "arg_name": "processingTime",
+ "arg_value": str(processingTime),
+ },
)
self._write_proto.processing_time_interval = processingTime.strip()
elif once is not None:
if once is not True:
- raise ValueError("Value for once must be True. Got: %s" % once)
+ raise PySparkValueError(
+ error_class="VALUE_NOT_TRUE",
+ message_parameters={"arg_name": "once", "arg_value":
str(once)},
+ )
self._write_proto.once = True
elif continuous is not None:
if type(continuous) != str or len(continuous.strip()) == 0:
- raise ValueError(
- "Value for continuous must be a non empty string. Got: %s"
% continuous
+ raise PySparkValueError(
+ error_class="VALUE_NOT_NON_EMPTY_STR",
+ message_parameters={"arg_name": "continuous", "arg_value":
str(continuous)},
)
self._write_proto.continuous_checkpoint_interval =
continuous.strip()
else:
if availableNow is not True:
- raise ValueError("Value for availableNow must be True. Got:
%s" % availableNow)
+ raise PySparkValueError(
+ error_class="VALUE_NOT_TRUE",
+ message_parameters={"arg_name": "availableNow",
"arg_value": str(availableNow)},
+ )
self._write_proto.available_now = True
return self
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]