This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7009fb58a98e [SPARK-54660][SS] Add RTM trigger to python
7009fb58a98e is described below

commit 7009fb58a98efd4049b4ffeb18d0e50f89aa2731
Author: Jerry Peng <[email protected]>
AuthorDate: Fri Jan 9 10:10:48 2026 -0800

    [SPARK-54660][SS] Add RTM trigger to python
    
    ### What changes were proposed in this pull request?
    
    Add RTM trigger to pyspark so that pyspark queries can run in RTM.  Only 
stateless (without UDF) queries will be supported for now.
    
    Also added support for spark connect since it fails a test if the method 
signatures do not match.
    
    ### Why are the changes needed?
    
    To support running RTM queries in pyspark
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, add RTM trigger to pyspark
    
    ### How was this patch tested?
    
    Add a simple test.  I will add more tests in a subsequent PR.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53448 from jerrypeng/SPARK-53998-3.
    
    Authored-by: Jerry Peng <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 python/pyspark/sql/connect/proto/commands_pb2.py   | 140 ++++++++++-----------
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  13 +-
 python/pyspark/sql/connect/streaming/readwriter.py |  19 ++-
 python/pyspark/sql/streaming/readwriter.py         |  26 +++-
 .../pyspark/sql/tests/streaming/test_streaming.py  |  22 ++++
 .../src/main/protobuf/spark/connect/commands.proto |   1 +
 .../spark/sql/connect/DataStreamWriter.scala       |   4 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |   2 +
 8 files changed, 149 insertions(+), 78 deletions(-)

diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py 
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 4eccf1b71706..53c192fda6e2 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -43,7 +43,7 @@ from pyspark.sql.connect.proto import pipelines_pb2 as 
spark_dot_connect_dot_pip
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xfb\x0e\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 \x01(\x0b\x32\x1d.spark.connect.WriteOpe [...]
+    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xfb\x0e\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 \x01(\x0b\x32\x1d.spark.connect.WriteOpe [...]
 )
 
 _globals = globals()
@@ -82,8 +82,8 @@ if not _descriptor._USE_C_DESCRIPTORS:
     _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_options 
= b"8\001"
     _globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._loaded_options = None
     _globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_options = 
b"8\001"
-    _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_start = 11757
-    _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_end = 11890
+    _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_start = 11816
+    _globals["_STREAMINGQUERYEVENTTYPE"]._serialized_end = 11949
     _globals["_COMMAND"]._serialized_start = 222
     _globals["_COMMAND"]._serialized_end = 2137
     _globals["_SQLCOMMAND"]._serialized_start = 2140
@@ -115,85 +115,85 @@ if not _descriptor._USE_C_DESCRIPTORS:
     _globals["_WRITEOPERATIONV2_MODE"]._serialized_start = 4639
     _globals["_WRITEOPERATIONV2_MODE"]._serialized_end = 4798
     _globals["_WRITESTREAMOPERATIONSTART"]._serialized_start = 4814
-    _globals["_WRITESTREAMOPERATIONSTART"]._serialized_end = 5670
+    _globals["_WRITESTREAMOPERATIONSTART"]._serialized_end = 5729
     _globals["_WRITESTREAMOPERATIONSTART_OPTIONSENTRY"]._serialized_start = 
3372
     _globals["_WRITESTREAMOPERATIONSTART_OPTIONSENTRY"]._serialized_end = 3430
-    _globals["_STREAMINGFOREACHFUNCTION"]._serialized_start = 5673
-    _globals["_STREAMINGFOREACHFUNCTION"]._serialized_end = 5852
-    _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_start = 5855
-    _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_end = 6067
-    _globals["_STREAMINGQUERYINSTANCEID"]._serialized_start = 6069
-    _globals["_STREAMINGQUERYINSTANCEID"]._serialized_end = 6134
-    _globals["_STREAMINGQUERYCOMMAND"]._serialized_start = 6137
-    _globals["_STREAMINGQUERYCOMMAND"]._serialized_end = 6769
-    _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_start = 6636
-    _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_end = 6680
-    
_globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_start = 
6682
-    _globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_end 
= 6758
-    _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_start = 6772
-    _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_end = 7913
-    _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_start = 
7355
-    _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_end = 
7525
-    
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_start 
= 7527
-    
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_end = 
7599
-    _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_start = 
7601
-    _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_end = 
7640
-    _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_start 
= 7643
-    _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_end = 
7840
-    
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_start
 = 7842
-    
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_end 
= 7898
-    _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_start = 7916
-    _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_end = 8745
-    
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_start
 = 8447
-    
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_end
 = 8526
-    
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_start
 = 8529
-    
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_end
 = 8734
-    _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_start = 8748
-    _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_end = 9824
-    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_start 
= 9356
-    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_end = 
9483
-    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_start
 = 9485
-    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_end
 = 9600
+    _globals["_STREAMINGFOREACHFUNCTION"]._serialized_start = 5732
+    _globals["_STREAMINGFOREACHFUNCTION"]._serialized_end = 5911
+    _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_start = 5914
+    _globals["_WRITESTREAMOPERATIONSTARTRESULT"]._serialized_end = 6126
+    _globals["_STREAMINGQUERYINSTANCEID"]._serialized_start = 6128
+    _globals["_STREAMINGQUERYINSTANCEID"]._serialized_end = 6193
+    _globals["_STREAMINGQUERYCOMMAND"]._serialized_start = 6196
+    _globals["_STREAMINGQUERYCOMMAND"]._serialized_end = 6828
+    _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_start = 6695
+    _globals["_STREAMINGQUERYCOMMAND_EXPLAINCOMMAND"]._serialized_end = 6739
+    
_globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_start = 
6741
+    _globals["_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND"]._serialized_end 
= 6817
+    _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_start = 6831
+    _globals["_STREAMINGQUERYCOMMANDRESULT"]._serialized_end = 7972
+    _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_start = 
7414
+    _globals["_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT"]._serialized_end = 
7584
+    
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_start 
= 7586
+    
_globals["_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT"]._serialized_end = 
7658
+    _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_start = 
7660
+    _globals["_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT"]._serialized_end = 
7699
+    _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_start 
= 7702
+    _globals["_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT"]._serialized_end = 
7899
+    
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_start
 = 7901
+    
_globals["_STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT"]._serialized_end 
= 7957
+    _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_start = 7975
+    _globals["_STREAMINGQUERYMANAGERCOMMAND"]._serialized_end = 8804
+    
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_start
 = 8506
+    
_globals["_STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND"]._serialized_end
 = 8585
+    
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_start
 = 8588
+    
_globals["_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND"]._serialized_end
 = 8793
+    _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_start = 8807
+    _globals["_STREAMINGQUERYMANAGERCOMMANDRESULT"]._serialized_end = 9883
+    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_start 
= 9415
+    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT"]._serialized_end = 
9542
+    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_start
 = 9544
+    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE"]._serialized_end
 = 9659
     _globals[
         "_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"
-    ]._serialized_start = 9602
-    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"]._serialized_end
 = 9661
+    ]._serialized_start = 9661
+    
_globals["_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT"]._serialized_end
 = 9720
     _globals[
         "_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE"
-    ]._serialized_start = 9663
+    ]._serialized_start = 9722
     _globals[
         "_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE"
-    ]._serialized_end = 9738
+    ]._serialized_end = 9797
     _globals[
         "_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT"
-    ]._serialized_start = 9740
+    ]._serialized_start = 9799
     _globals[
         "_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT"
-    ]._serialized_end = 9809
-    _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_start = 9827
-    _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_end = 10000
-    _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_start = 10003
-    _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_end = 10134
-    _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_start = 10137
-    _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_end = 10341
-    _globals["_GETRESOURCESCOMMAND"]._serialized_start = 10343
-    _globals["_GETRESOURCESCOMMAND"]._serialized_end = 10364
-    _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_start = 10367
-    _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_end = 10579
-    _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_start = 
10483
-    _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_end = 
10579
-    _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_start = 10581
-    _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_end = 10669
-    _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_start = 10671
-    _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_end = 10738
-    _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_start = 10740
-    _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_end = 10840
-    _globals["_CHECKPOINTCOMMAND"]._serialized_start = 10843
-    _globals["_CHECKPOINTCOMMAND"]._serialized_end = 11048
-    _globals["_MERGEINTOTABLECOMMAND"]._serialized_start = 11051
-    _globals["_MERGEINTOTABLECOMMAND"]._serialized_end = 11539
-    _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_start = 11542
-    _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_end = 11754
+    ]._serialized_end = 9868
+    _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_start = 9886
+    _globals["_STREAMINGQUERYLISTENERBUSCOMMAND"]._serialized_end = 10059
+    _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_start = 10062
+    _globals["_STREAMINGQUERYLISTENEREVENT"]._serialized_end = 10193
+    _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_start = 10196
+    _globals["_STREAMINGQUERYLISTENEREVENTSRESULT"]._serialized_end = 10400
+    _globals["_GETRESOURCESCOMMAND"]._serialized_start = 10402
+    _globals["_GETRESOURCESCOMMAND"]._serialized_end = 10423
+    _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_start = 10426
+    _globals["_GETRESOURCESCOMMANDRESULT"]._serialized_end = 10638
+    _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_start = 
10542
+    _globals["_GETRESOURCESCOMMANDRESULT_RESOURCESENTRY"]._serialized_end = 
10638
+    _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_start = 10640
+    _globals["_CREATERESOURCEPROFILECOMMAND"]._serialized_end = 10728
+    _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_start = 10730
+    _globals["_CREATERESOURCEPROFILECOMMANDRESULT"]._serialized_end = 10797
+    _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_start = 10799
+    _globals["_REMOVECACHEDREMOTERELATIONCOMMAND"]._serialized_end = 10899
+    _globals["_CHECKPOINTCOMMAND"]._serialized_start = 10902
+    _globals["_CHECKPOINTCOMMAND"]._serialized_end = 11107
+    _globals["_MERGEINTOTABLECOMMAND"]._serialized_start = 11110
+    _globals["_MERGEINTOTABLECOMMAND"]._serialized_end = 11598
+    _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_start = 11601
+    _globals["_EXECUTEEXTERNALCOMMAND"]._serialized_end = 11813
     _globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_start = 3372
     _globals["_EXECUTEEXTERNALCOMMAND_OPTIONSENTRY"]._serialized_end = 3430
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi 
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 66e0ffdb5273..48f746a34acc 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -928,6 +928,7 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
     AVAILABLE_NOW_FIELD_NUMBER: builtins.int
     ONCE_FIELD_NUMBER: builtins.int
     CONTINUOUS_CHECKPOINT_INTERVAL_FIELD_NUMBER: builtins.int
+    REAL_TIME_BATCH_DURATION_FIELD_NUMBER: builtins.int
     OUTPUT_MODE_FIELD_NUMBER: builtins.int
     QUERY_NAME_FIELD_NUMBER: builtins.int
     PATH_FIELD_NUMBER: builtins.int
@@ -954,6 +955,7 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
     available_now: builtins.bool
     once: builtins.bool
     continuous_checkpoint_interval: builtins.str
+    real_time_batch_duration: builtins.str
     output_mode: builtins.str
     query_name: builtins.str
     path: builtins.str
@@ -978,6 +980,7 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
         available_now: builtins.bool = ...,
         once: builtins.bool = ...,
         continuous_checkpoint_interval: builtins.str = ...,
+        real_time_batch_duration: builtins.str = ...,
         output_mode: builtins.str = ...,
         query_name: builtins.str = ...,
         path: builtins.str = ...,
@@ -1005,6 +1008,8 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
             b"path",
             "processing_time_interval",
             b"processing_time_interval",
+            "real_time_batch_duration",
+            b"real_time_batch_duration",
             "sink_destination",
             b"sink_destination",
             "table_name",
@@ -1044,6 +1049,8 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
             b"processing_time_interval",
             "query_name",
             b"query_name",
+            "real_time_batch_duration",
+            b"real_time_batch_duration",
             "sink_destination",
             b"sink_destination",
             "table_name",
@@ -1061,7 +1068,11 @@ class 
WriteStreamOperationStart(google.protobuf.message.Message):
         self, oneof_group: typing_extensions.Literal["trigger", b"trigger"]
     ) -> (
         typing_extensions.Literal[
-            "processing_time_interval", "available_now", "once", 
"continuous_checkpoint_interval"
+            "processing_time_interval",
+            "available_now",
+            "once",
+            "continuous_checkpoint_interval",
+            "real_time_batch_duration",
         ]
         | None
     ): ...
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index 21c513f88c0f..b7ca7dd1cbc0 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -493,6 +493,10 @@ class DataStreamWriter:
     def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
         ...
 
+    @overload
+    def trigger(self, *, realTime: str) -> "DataStreamWriter":
+        ...
+
     def trigger(
         self,
         *,
@@ -500,15 +504,16 @@ class DataStreamWriter:
         once: Optional[bool] = None,
         continuous: Optional[str] = None,
         availableNow: Optional[bool] = None,
+        realTime: Optional[str] = None,
     ) -> "DataStreamWriter":
-        params = [processingTime, once, continuous, availableNow]
+        params = [processingTime, once, continuous, availableNow, realTime]
 
-        if params.count(None) == 4:
+        if params.count(None) == 5:
             raise PySparkValueError(
                 errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
                 messageParameters={},
             )
-        elif params.count(None) < 3:
+        elif params.count(None) < 4:
             raise PySparkValueError(
                 errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
                 messageParameters={},
@@ -541,6 +546,14 @@ class DataStreamWriter:
                 )
             self._write_proto.continuous_checkpoint_interval = 
continuous.strip()
 
+        elif realTime is not None:
+            if type(realTime) != str or len(realTime.strip()) == 0:
+                raise PySparkValueError(
+                    errorClass="VALUE_NOT_NON_EMPTY_STR",
+                    messageParameters={"arg_name": "realTime", "arg_value": 
str(realTime)},
+                )
+            self._write_proto.real_time_batch_duration = realTime.strip()
+
         else:
             if availableNow is not True:
                 raise PySparkValueError(
diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index 8121dd609950..ffb4415eaac3 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1243,6 +1243,7 @@ class DataStreamWriter:
         once: Optional[bool] = None,
         continuous: Optional[str] = None,
         availableNow: Optional[bool] = None,
+        realTime: Optional[str] = None,
     ) -> "DataStreamWriter":
         """Set the trigger for the stream query. If this is not set it will 
run the query as fast
         as possible, which is equivalent to setting the trigger to 
``processingTime='0 seconds'``.
@@ -1268,6 +1269,10 @@ class DataStreamWriter:
         availableNow : bool, optional
             if set to True, set a trigger that processes all available data in 
multiple
             batches then terminates the query. Only one trigger can be set.
+        realTime : str, optional
+            a batch duration as a string, e.g. '5 seconds', '1 minute'.
+            Set a trigger that runs a real time mode query with
+            batch at the specified duration. Only one trigger can be set.
 
         Notes
         -----
@@ -1291,15 +1296,20 @@ class DataStreamWriter:
 
         >>> df.writeStream.trigger(availableNow=True)
         <...streaming.readwriter.DataStreamWriter object ...>
+
+        Trigger the query for real time mode execution every 5 seconds.
+
+        >>> df.writeStream.trigger(realTime='5 seconds')
+        <...streaming.readwriter.DataStreamWriter object ...>
         """
-        params = [processingTime, once, continuous, availableNow]
+        params = [processingTime, once, continuous, availableNow, realTime]
 
-        if params.count(None) == 4:
+        if params.count(None) == 5:
             raise PySparkValueError(
                 errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
                 messageParameters={},
             )
-        elif params.count(None) < 3:
+        elif params.count(None) < 4:
             raise PySparkValueError(
                 errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
                 messageParameters={},
@@ -1342,6 +1352,16 @@ class DataStreamWriter:
             jTrigger = getattr(
                 self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
             ).Continuous(interval)
+        elif realTime is not None:
+            if type(realTime) != str or len(realTime.strip()) == 0:
+                raise PySparkValueError(
+                    errorClass="VALUE_NOT_NON_EMPTY_STR",
+                    messageParameters={"arg_name": "realTime", "arg_value": 
str(realTime)},
+                )
+            batch_duration = realTime.strip()
+            jTrigger = getattr(
+                self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
+            ).RealTime(batch_duration)
         else:
             if availableNow is not True:
                 raise PySparkValueError(
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 1f4fa8f4644d..0b5884efb717 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -147,6 +147,28 @@ class StreamingTestsMixin:
         except TypeError:
             pass
 
+    def test_stream_real_time_trigger(self):
+        df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, "out")
+        chk = os.path.join(tmpPath, "chk")
+        try:
+            q = (
+                df.writeStream.format("console")
+                .trigger(realTime="5 seconds")
+                .option("checkpointLocation", chk)
+                .outputMode("update")
+                .start(out)
+            )
+            q.processAllAvailable()
+        except Exception as e:
+            # This error is expected
+            self._assert_exception_tree_contains_msg(
+                e, "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED"
+            )
+
     def test_stream_read_options(self):
         schema = StructType([StructField("data", StringType(), False)])
         df = (
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
index 861af17e263a..c22e76e3542f 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -232,6 +232,7 @@ message WriteStreamOperationStart {
     bool available_now = 6;
     bool once = 7;
     string continuous_checkpoint_interval = 8;
+    string real_time_batch_duration = 100;
   }
 
   string output_mode = 9;
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
index a42a463e2c42..ffa11b5d7ab0 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{Command, WriteStreamOperationStart}
 import org.apache.spark.sql.{Dataset => DS, ForeachWriter}
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket}
-import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, 
ContinuousTrigger, OneTimeTrigger, ProcessingTimeTrigger}
+import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, 
ContinuousTrigger, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger}
 import org.apache.spark.sql.streaming
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent
@@ -70,6 +70,8 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
         sinkBuilder.setOnce(true)
       case ContinuousTrigger(intervalMs) =>
         sinkBuilder.setContinuousCheckpointInterval(s"$intervalMs 
milliseconds")
+      case RealTimeTrigger(batchDurationMs) =>
+        sinkBuilder.setRealTimeBatchDuration(s"$batchDurationMs milliseconds")
     }
     this
   }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b2c32df4d863..f55d391eab54 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3433,6 +3433,8 @@ class SparkConnectPlanner(
         writer.trigger(Trigger.Once())
       case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
         
writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.REAL_TIME_BATCH_DURATION =>
+        writer.trigger(Trigger.RealTime(writeOp.getRealTimeBatchDuration))
       case TriggerCase.TRIGGER_NOT_SET =>
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to