This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature in repository https://gitbox.apache.org/repos/asf/spark.git
commit a84ebc869b9a88c87ba6ba1e0e51e960b2fc5d26 Author: Jungtaek Lim <[email protected]> AuthorDate: Mon Jan 19 16:07:43 2026 +0900 add e2e test for simple stream data reader & availableNow --- .../sql/tests/test_python_streaming_datasource.py | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index a5ae69de1d96..1bb0c0895be9 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -26,6 +26,7 @@ from pyspark.sql.datasource import ( DataSourceStreamWriter, DataSourceStreamArrowWriter, SimpleDataSourceStreamReader, + SupportsTriggerAvailableNow, WriterCommitMessage, ) from pyspark.sql.streaming import StreamingQueryException @@ -251,6 +252,50 @@ class BasePythonStreamingDataSourceTestsMixin: q.awaitTermination() self.assertIsNone(q.exception(), "No exception has to be propagated.") + def test_simple_stream_reader_trigger_available_now(self): + class SimpleStreamReader(SimpleDataSourceStreamReader, SupportsTriggerAvailableNow): + def initialOffset(self): + return {"offset": 0} + + def read(self, start: dict): + start_idx = start["offset"] + end_offset = min(start_idx + 2, self.desired_end_offset["offset"]) + it = iter([(i, ) for i in range(start_idx, end_offset)]) + return (it, {"offset": end_offset}) + + def commit(self, end): + pass + + def readBetweenOffsets(self, start: dict, end: dict): + start_idx = start["offset"] + end_idx = end["offset"] + return iter([(i,) for i in range(start_idx, end_idx)]) + + def prepareForTriggerAvailableNow(self) -> None: + self.desired_end_offset = {"offset": 10} + + class SimpleDataSource(DataSource): + def schema(self): + return "id INT" + + def simpleStreamReader(self, schema): + return SimpleStreamReader() + + self.spark.dataSource.register(SimpleDataSource) + df = self.spark.readStream.format("SimpleDataSource").load() + + def check_batch(df, batch_id): + # the last offset for the data is 9 since the desired end offset is 10 + # the batch isn't triggered with no data, so either we have one data or two data in each batch + if batch_id * 2 + 1 > 9: + assertDataFrameEqual(df, [Row(batch_id * 2)]) + else: + assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 1)]) + + q = df.writeStream.foreachBatch(check_batch).trigger(availableNow=True).start() + q.awaitTermination(timeout=30) + self.assertIsNone(q.exception(), "No exception has to be propagated.") + def test_stream_writer(self): input_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_input") output_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_output") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
