This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7009fb58a98e [SPARK-54660][SS] Add RTM trigger to python
7009fb58a98e is described below
commit 7009fb58a98efd4049b4ffeb18d0e50f89aa2731
Author: Jerry Peng <[email protected]>
AuthorDate: Fri Jan 9 10:10:48 2026 -0800
[SPARK-54660][SS] Add RTM trigger to python
### What changes were proposed in this pull request?
Add RTM trigger to pyspark so that pyspark queries can run in RTM. Only
stateless (without UDF) queries will be supported for now.
Also added support for spark connect since it fails a test if the method
signatures do not match.
### Why are the changes needed?
To support running RTM queries in pyspark
### Does this PR introduce _any_ user-facing change?
Yes, add RTM trigger to pyspark
### How was this patch tested?
Add a simple test. I will add more tests in a subsequent PR.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53448 from jerrypeng/SPARK-53998-3.
Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
python/pyspark/sql/connect/proto/commands_pb2.py | 140 ++++++++++-----------
python/pyspark/sql/connect/proto/commands_pb2.pyi | 13 +-
python/pyspark/sql/connect/streaming/readwriter.py | 19 ++-
python/pyspark/sql/streaming/readwriter.py | 26 +++-
.../pyspark/sql/tests/streaming/test_streaming.py | 22 ++++
.../src/main/protobuf/spark/connect/commands.proto | 1 +
.../spark/sql/connect/DataStreamWriter.scala | 4 +-
.../sql/connect/planner/SparkConnectPlanner.scala | 2 +
8 files changed, 149 insertions(+), 78 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 4eccf1b71706..53c192fda6e2 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -43,7 +43,7 @@ from pyspark.sql.connect.proto import pipelines_pb2 as
spark_dot_connect_dot_pip
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xfb\x0e\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOpe [...]
+
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xfb\x0e\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOpe [...]
)
_globals = globals()
@@ -82,8 +82,8 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_options
= b"8\001"
_globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._loaded_options = None
_globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_options =
b"8\001"
- _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_start = 11757
- _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_end = 11890
+ _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_start = 11816
+ _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_end = 11949
_globals["_COMMAND"]._serialized_start = 222
_globals["_COMMAND"]._serialized_end = 2137
_globals["_SQLCOMMAND"]._serialized_start = 2140
@@ -115,85 +115,85 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_WRITEOPERATIONV2_MODE"]._serialized_start = 4639
_globals["_WRITEOPERATIONV2_MODE"]._serialized_end = 4798
_globals["_WRITESTREAMOPERATIONSTART"]._serialized_start = 4814
- _globals["_WRITESTREAMOPERATIONSTART"]._serialized_end = 5670
+ _globals["_WRITESTREAMOPERATIONSTART"]._serialized_end = 5729
_globals["_WRITESTREAMOPERATIONSTART_OPTIONSENTRY"]._serialized_start =
3372
_globals["_WRITESTREAMOPERATIONSTART_OPTIONSENTRY"]._serialized_end = 3430
- _globals["_STREAMINGFOREACHFUNCTION"]._serialized_start = 5673
- _globals["_STREAMINGFOREACHFUNCTION"]._serialized_end = 5852
- _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_start = 5855
- _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_end = 6067
- _globals["_STREAMINGQUERYINSTANCEID"]._serialized_start = 6069
- _globals["_STREAMINGQUERYINSTANCEID"]._serialized_end = 6134
- _globals["_STREAMINGQUERYCOMMAND"]._serialized_start = 6137
- _globals["_STREAMINGQUERYCOMMAND"]._serialized_end = 6769
- _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_start = 6636
- _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_end = 6680
-
_globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_start =
6682
- _globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_end
= 6758
- _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_start = 6772
- _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_end = 7913
- _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_start =
7355
- _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_end =
7525
-
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_start
= 7527
-
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_end =
7599
- _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_start =
7601
- _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_end =
7640
- _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_start
= 7643
- _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_end =
7840
-
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_start
= 7842
-
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_end
= 7898
- _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_start = 7916
- _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_end = 8745
-
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_start
= 8447
-
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_end
= 8526
-
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_start
= 8529
-
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_end
= 8734
- _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_start = 8748
- _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_end = 9824
-
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_start
= 9356
-
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_end =
9483
-
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_start
= 9485
-
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_end
= 9600
+ _globals["_STREAMINGFOREACHFUNCTION"]._serialized_start = 5732
+ _globals["_STREAMINGFOREACHFUNCTION"]._serialized_end = 5911
+ _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_start = 5914
+ _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_end = 6126
+ _globals["_STREAMINGQUERYINSTANCEID"]._serialized_start = 6128
+ _globals["_STREAMINGQUERYINSTANCEID"]._serialized_end = 6193
+ _globals["_STREAMINGQUERYCOMMAND"]._serialized_start = 6196
+ _globals["_STREAMINGQUERYCOMMAND"]._serialized_end = 6828
+ _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_start = 6695
+ _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_end = 6739
+
_globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_start =
6741
+ _globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_end
= 6817
+ _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_start = 6831
+ _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_end = 7972
+ _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_start =
7414
+ _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_end =
7584
+
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_start
= 7586
+
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_end =
7658
+ _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_start =
7660
+ _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_end =
7699
+ _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_start
= 7702
+ _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_end =
7899
+
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_start
= 7901
+
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_end
= 7957
+ _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_start = 7975
+ _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_end = 8804
+
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_start
= 8506
+
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_end
= 8585
+
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_start
= 8588
+
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_end
= 8793
+ _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_start = 8807
+ _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_end = 9883
+
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_start
= 9415
+
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_end =
9542
+
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_start
= 9544
+
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_end
= 9659
_globals[
"_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"
- ]._serialized_start = 9602
-
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"]._serialized_end
= 9661
+ ]._serialized_start = 9661
+
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"]._serialized_end
= 9720
_globals[
"_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE"
- ]._serialized_start = 9663
+ ]._serialized_start = 9722
_globals[
"_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE"
- ]._serialized_end = 9738
+ ]._serialized_end = 9797
_globals[
"_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT"
- ]._serialized_start = 9740
+ ]._serialized_start = 9799
_globals[
"_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT"
- ]._serialized_end = 9809
- _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_start = 9827
- _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_end = 10000
- _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_start = 10003
- _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_end = 10134
- _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_start = 10137
- _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_end = 10341
- _globals["_GETRESOURCESCOMMAND"]._serialized_start = 10343
- _globals["_GETRESOURCESCOMMAND"]._serialized_end = 10364
- _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_start = 10367
- _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_end = 10579
- _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_start =
10483
- _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_end =
10579
- _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_start = 10581
- _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_end = 10669
- _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_start = 10671
- _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_end = 10738
- _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_start = 10740
- _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_end = 10840
- _globals["_CHECKPOINTCOMMAND"]._serialized_start = 10843
- _globals["_CHECKPOINTCOMMAND"]._serialized_end = 11048
- _globals["_MERGEINTOTABLECOMMAND"]._serialized_start = 11051
- _globals["_MERGEINTOTABLECOMMAND"]._serialized_end = 11539
- _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_start = 11542
- _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_end = 11754
+ ]._serialized_end = 9868
+ _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_start = 9886
+ _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_end = 10059
+ _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_start = 10062
+ _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_end = 10193
+ _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_start = 10196
+ _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_end = 10400
+ _globals["_GETRESOURCESCOMMAND"]._serialized_start = 10402
+ _globals["_GETRESOURCESCOMMAND"]._serialized_end = 10423
+ _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_start = 10426
+ _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_end = 10638
+ _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_start =
10542
+ _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_end =
10638
+ _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_start = 10640
+ _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_end = 10728
+ _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_start = 10730
+ _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_end = 10797
+ _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_start = 10799
+ _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_end = 10899
+ _globals["_CHECKPOINTCOMMAND"]._serialized_start = 10902
+ _globals["_CHECKPOINTCOMMAND"]._serialized_end = 11107
+ _globals["_MERGEINTOTABLECOMMAND"]._serialized_start = 11110
+ _globals["_MERGEINTOTABLECOMMAND"]._serialized_end = 11598
+ _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_start = 11601
+ _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_end = 11813
_globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_start = 3372
_globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_end = 3430
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 66e0ffdb5273..48f746a34acc 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -928,6 +928,7 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
AVAILABLE_NOW_FIELD_NUMBER: builtins.int
ONCE_FIELD_NUMBER: builtins.int
CONTINUOUS_CHECKPOINT_INTERVAL_FIELD_NUMBER: builtins.int
+ REAL_TIME_BATCH_DURATION_FIELD_NUMBER: builtins.int
OUTPUT_MODE_FIELD_NUMBER: builtins.int
QUERY_NAME_FIELD_NUMBER: builtins.int
PATH_FIELD_NUMBER: builtins.int
@@ -954,6 +955,7 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
available_now: builtins.bool
once: builtins.bool
continuous_checkpoint_interval: builtins.str
+ real_time_batch_duration: builtins.str
output_mode: builtins.str
query_name: builtins.str
path: builtins.str
@@ -978,6 +980,7 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
available_now: builtins.bool = ...,
once: builtins.bool = ...,
continuous_checkpoint_interval: builtins.str = ...,
+ real_time_batch_duration: builtins.str = ...,
output_mode: builtins.str = ...,
query_name: builtins.str = ...,
path: builtins.str = ...,
@@ -1005,6 +1008,8 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
b"path",
"processing_time_interval",
b"processing_time_interval",
+ "real_time_batch_duration",
+ b"real_time_batch_duration",
"sink_destination",
b"sink_destination",
"table_name",
@@ -1044,6 +1049,8 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
b"processing_time_interval",
"query_name",
b"query_name",
+ "real_time_batch_duration",
+ b"real_time_batch_duration",
"sink_destination",
b"sink_destination",
"table_name",
@@ -1061,7 +1068,11 @@ class
WriteStreamOperationStart(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["trigger", b"trigger"]
) -> (
typing_extensions.Literal[
- "processing_time_interval", "available_now", "once",
"continuous_checkpoint_interval"
+ "processing_time_interval",
+ "available_now",
+ "once",
+ "continuous_checkpoint_interval",
+ "real_time_batch_duration",
]
| None
): ...
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py
b/python/pyspark/sql/connect/streaming/readwriter.py
index 21c513f88c0f..b7ca7dd1cbc0 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -493,6 +493,10 @@ class DataStreamWriter:
def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
...
+ @overload
+ def trigger(self, *, realTime: str) -> "DataStreamWriter":
+ ...
+
def trigger(
self,
*,
@@ -500,15 +504,16 @@ class DataStreamWriter:
once: Optional[bool] = None,
continuous: Optional[str] = None,
availableNow: Optional[bool] = None,
+ realTime: Optional[str] = None,
) -> "DataStreamWriter":
- params = [processingTime, once, continuous, availableNow]
+ params = [processingTime, once, continuous, availableNow, realTime]
- if params.count(None) == 4:
+ if params.count(None) == 5:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
)
- elif params.count(None) < 3:
+ elif params.count(None) < 4:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
@@ -541,6 +546,14 @@ class DataStreamWriter:
)
self._write_proto.continuous_checkpoint_interval =
continuous.strip()
+ elif realTime is not None:
+ if type(realTime) != str or len(realTime.strip()) == 0:
+ raise PySparkValueError(
+ errorClass="VALUE_NOT_NON_EMPTY_STR",
+ messageParameters={"arg_name": "realTime", "arg_value":
str(realTime)},
+ )
+ self._write_proto.real_time_batch_duration = realTime.strip()
+
else:
if availableNow is not True:
raise PySparkValueError(
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index 8121dd609950..ffb4415eaac3 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1243,6 +1243,7 @@ class DataStreamWriter:
once: Optional[bool] = None,
continuous: Optional[str] = None,
availableNow: Optional[bool] = None,
+ realTime: Optional[str] = None,
) -> "DataStreamWriter":
"""Set the trigger for the stream query. If this is not set it will
run the query as fast
as possible, which is equivalent to setting the trigger to
``processingTime='0 seconds'``.
@@ -1268,6 +1269,10 @@ class DataStreamWriter:
availableNow : bool, optional
if set to True, set a trigger that processes all available data in
multiple
batches then terminates the query. Only one trigger can be set.
+ realTime : str, optional
+ a batch duration as a string, e.g. '5 seconds', '1 minute'.
+ Set a trigger that runs a real time mode query with
+ batch at the specified duration. Only one trigger can be set.
Notes
-----
@@ -1291,15 +1296,20 @@ class DataStreamWriter:
>>> df.writeStream.trigger(availableNow=True)
<...streaming.readwriter.DataStreamWriter object ...>
+
+ Trigger the query for real time mode execution every 5 seconds.
+
+ >>> df.writeStream.trigger(realTime='5 seconds')
+ <...streaming.readwriter.DataStreamWriter object ...>
"""
- params = [processingTime, once, continuous, availableNow]
+ params = [processingTime, once, continuous, availableNow, realTime]
- if params.count(None) == 4:
+ if params.count(None) == 5:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
)
- elif params.count(None) < 3:
+ elif params.count(None) < 4:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
@@ -1342,6 +1352,16 @@ class DataStreamWriter:
jTrigger = getattr(
self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
).Continuous(interval)
+ elif realTime is not None:
+ if type(realTime) != str or len(realTime.strip()) == 0:
+ raise PySparkValueError(
+ errorClass="VALUE_NOT_NON_EMPTY_STR",
+ messageParameters={"arg_name": "realTime", "arg_value":
str(realTime)},
+ )
+ batch_duration = realTime.strip()
+ jTrigger = getattr(
+ self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
+ ).RealTime(batch_duration)
else:
if availableNow is not True:
raise PySparkValueError(
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 1f4fa8f4644d..0b5884efb717 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -147,6 +147,28 @@ class StreamingTestsMixin:
except TypeError:
pass
+ def test_stream_real_time_trigger(self):
+ df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+ tmpPath = tempfile.mkdtemp()
+ shutil.rmtree(tmpPath)
+ self.assertTrue(df.isStreaming)
+ out = os.path.join(tmpPath, "out")
+ chk = os.path.join(tmpPath, "chk")
+ try:
+ q = (
+ df.writeStream.format("console")
+ .trigger(realTime="5 seconds")
+ .option("checkpointLocation", chk)
+ .outputMode("update")
+ .start(out)
+ )
+ q.processAllAvailable()
+ except Exception as e:
+ # This error is expected
+ self._assert_exception_tree_contains_msg(
+ e, "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED"
+ )
+
def test_stream_read_options(self):
schema = StructType([StructField("data", StringType(), False)])
df = (
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
index 861af17e263a..c22e76e3542f 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -232,6 +232,7 @@ message WriteStreamOperationStart {
bool available_now = 6;
bool once = 7;
string continuous_checkpoint_interval = 8;
+ string real_time_batch_duration = 100;
}
string output_mode = 9;
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
index a42a463e2c42..ffa11b5d7ab0 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{Command, WriteStreamOperationStart}
import org.apache.spark.sql.{Dataset => DS, ForeachWriter}
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
ForeachWriterPacket}
-import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger,
ContinuousTrigger, OneTimeTrigger, ProcessingTimeTrigger}
+import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger,
ContinuousTrigger, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger}
import org.apache.spark.sql.streaming
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent
@@ -70,6 +70,8 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
sinkBuilder.setOnce(true)
case ContinuousTrigger(intervalMs) =>
sinkBuilder.setContinuousCheckpointInterval(s"$intervalMs
milliseconds")
+ case RealTimeTrigger(batchDurationMs) =>
+ sinkBuilder.setRealTimeBatchDuration(s"$batchDurationMs milliseconds")
}
this
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b2c32df4d863..f55d391eab54 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3433,6 +3433,8 @@ class SparkConnectPlanner(
writer.trigger(Trigger.Once())
case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+ case TriggerCase.REAL_TIME_BATCH_DURATION =>
+ writer.trigger(Trigger.RealTime(writeOp.getRealTimeBatchDuration))
case TriggerCase.TRIGGER_NOT_SET =>
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]