This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch 
WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature
in repository https://gitbox.apache.org/repos/asf/spark.git

commit a84ebc869b9a88c87ba6ba1e0e51e960b2fc5d26
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Jan 19 16:07:43 2026 +0900

    add e2e test for simple stream data reader & availableNow
---
 .../sql/tests/test_python_streaming_datasource.py  | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index a5ae69de1d96..1bb0c0895be9 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -26,6 +26,7 @@ from pyspark.sql.datasource import (
     DataSourceStreamWriter,
     DataSourceStreamArrowWriter,
     SimpleDataSourceStreamReader,
+    SupportsTriggerAvailableNow,
     WriterCommitMessage,
 )
 from pyspark.sql.streaming import StreamingQueryException
@@ -251,6 +252,50 @@ class BasePythonStreamingDataSourceTestsMixin:
         q.awaitTermination()
         self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+    def test_simple_stream_reader_trigger_available_now(self):
+        class SimpleStreamReader(SimpleDataSourceStreamReader, 
SupportsTriggerAvailableNow):
+            def initialOffset(self):
+                return {"offset": 0}
+
+            def read(self, start: dict):
+                start_idx = start["offset"]
+                end_offset = min(start_idx + 2, 
self.desired_end_offset["offset"])
+                it = iter([(i, ) for i in range(start_idx, end_offset)])
+                return (it, {"offset": end_offset})
+
+            def commit(self, end):
+                pass
+
+            def readBetweenOffsets(self, start: dict, end: dict):
+                start_idx = start["offset"]
+                end_idx = end["offset"]
+                return iter([(i,) for i in range(start_idx, end_idx)])
+
+            def prepareForTriggerAvailableNow(self) -> None:
+                self.desired_end_offset = {"offset": 10}
+
+        class SimpleDataSource(DataSource):
+            def schema(self):
+                return "id INT"
+
+            def simpleStreamReader(self, schema):
+                return SimpleStreamReader()
+
+        self.spark.dataSource.register(SimpleDataSource)
+        df = self.spark.readStream.format("SimpleDataSource").load()
+
+        def check_batch(df, batch_id):
+            # the last offset for the data is 9 since the desired end offset 
is 10
+            # the batch isn't triggered with no data, so either we have one 
data or two data in each batch
+            if batch_id * 2 + 1 > 9:
+                assertDataFrameEqual(df, [Row(batch_id * 2)])
+            else:
+                assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 
+ 1)])
+
+        q = 
df.writeStream.foreachBatch(check_batch).trigger(availableNow=True).start()
+        q.awaitTermination(timeout=30)
+        self.assertIsNone(q.exception(), "No exception has to be propagated.")
+
     def test_stream_writer(self):
         input_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
         output_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to