This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e9a141bfa582 [SPARK-55416][SS][PYTHON][4.1] Streaming Python Data 
Source memory leak when end-offset is not updated
e9a141bfa582 is described below

commit e9a141bfa582c9f51e38cd2e8324cdad820409f1
Author: vinodkc <[email protected]>
AuthorDate: Thu Feb 19 12:13:46 2026 +0900

    [SPARK-55416][SS][PYTHON][4.1] Streaming Python Data Source memory leak 
when end-offset is not updated
    
    ### What changes were proposed in this pull request?
    
    Backport https://github.com/apache/spark/pull/54237 to branch-4.1
    In `_SimpleStreamReaderWrapper.latestOffset()`, validate that  custom 
implementation of datasource based on `SimpleDataSourceStreamReader.read()` 
does not return a non-empty batch with end == start. If it does, raise 
PySparkException with error class `SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE` 
before appending to the cache. Empty batches with end == start remain allowed.
    
    ### Why are the changes needed?
    
    When a user implements read(start) incorrectly and returns:
    
    - Same offset for both: end = start (e.g. both {"offset": 0}).
    - Non-empty iterator: e.g. 2 rows.
    
    If a reader returns end == start with data (e.g. return (it, {"offset": 
start_idx})), the wrapper keeps appending to its prefetch cache on every 
trigger while commit(end) never trims entries (first matching index is 0). The 
cache grows without bound and driver (non-JVM) memory increases until OOM. 
Validating and raising error before appending stops this and fails fast with a 
clear error.
    
    Empty batches with end == start remain allowed , it will allow the Python 
data source to represent that there is no data to read.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Implementations that return end == start with a non-empty iterator now 
get PySparkException instead of unbounded memory growth. Empty batches with end 
== start are unchanged.
    
    ### How was this patch tested?
    
    Added unit test in `test_python_streaming_datasource.py`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54321 from vinodkc/br_SPARK-55416_4.1.
    
    Authored-by: vinodkc <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../source/tutorial/sql/python_data_source.rst     |  9 +++-
 python/pyspark/errors/error-conditions.json        |  5 ++
 python/pyspark/sql/datasource_internal.py          | 28 ++++++++++-
 .../sql/tests/test_python_streaming_datasource.py  | 56 ++++++++++++++++++++++
 4 files changed, 95 insertions(+), 3 deletions(-)

diff --git a/python/docs/source/tutorial/sql/python_data_source.rst 
b/python/docs/source/tutorial/sql/python_data_source.rst
index 78ffeda0db1c..1cc1811000ca 100644
--- a/python/docs/source/tutorial/sql/python_data_source.rst
+++ b/python/docs/source/tutorial/sql/python_data_source.rst
@@ -305,7 +305,14 @@ This is the same dummy streaming reader that generate 2 
rows every batch impleme
 
         def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
             """
-            Takes start offset as an input, return an iterator of tuples and 
the start offset of next read.
+            Takes start offset as an input, return an iterator of tuples and
+            the end offset (start offset for the next read). The end offset 
must
+            advance past the start offset when returning data; otherwise Spark
+            raises a validation exception.
+            For example, returning 2 records from start_idx 0 means end should
+            be {"offset": 2} (i.e. start + 2).
+            When there is no data to read, you may return the same offset as 
end and
+            start, but you must provide an empty iterator.
             """
             start_idx = start["offset"]
             it = iter([(i,) for i in range(start_idx, start_idx + 2)])
diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index 295b372cade5..326671c0d5ad 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1119,6 +1119,11 @@
       "SparkContext or SparkSession should be created first."
     ]
   },
+  "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+    "message": [
+      "SimpleDataSourceStreamReader.read() returned a non-empty batch but the 
end offset: <end_offset> did not advance past the start offset: <start_offset>. 
The end offset must represent the position after the last record returned."
+    ]
+  },
   "SLICE_WITH_STEP": {
     "message": [
       "Slice with step is not supported."
diff --git a/python/pyspark/sql/datasource_internal.py 
b/python/pyspark/sql/datasource_internal.py
index 6df0be4192ec..9467bfdf73bc 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -28,7 +28,7 @@ from pyspark.sql.datasource import (
     SimpleDataSourceStreamReader,
 )
 from pyspark.sql.types import StructType
-from pyspark.errors import PySparkNotImplementedError
+from pyspark.errors import PySparkException, PySparkNotImplementedError
 
 
 def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
@@ -88,12 +88,36 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
             self.initial_offset = self.simple_reader.initialOffset()
         return self.initial_offset
 
+    def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple]) 
-> None:
+        """
+        Validates that read() did not return a non-empty batch with end equal 
to start,
+        which would cause the same batch to be processed repeatedly. When end 
!= start,
+        appends the result to the cache; when end == start with empty 
iterator, does not
+        cache (avoids unbounded cache growth).
+        """
+        start_str = json.dumps(start)
+        end_str = json.dumps(end)
+        if end_str != start_str:
+            self.cache.append(PrefetchedCacheEntry(start, end, it))
+            return
+        try:
+            next(it)
+        except StopIteration:
+            return
+        raise PySparkException(
+            errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+            messageParameters={
+                "start_offset": start_str,
+                "end_offset": end_str,
+            },
+        )
+
     def latestOffset(self) -> dict:
         # when query start for the first time, use initial offset as the start 
offset.
         if self.current_offset is None:
             self.current_offset = self.initialOffset()
         (iter, end) = self.simple_reader.read(self.current_offset)
-        self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+        self.add_result_to_cache(self.current_offset, end, iter)
         self.current_offset = end
         return end
 
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index 9879231540f1..ecf28677689b 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -34,6 +34,7 @@ from pyspark.testing.sqlutils import (
     have_pyarrow,
     pyarrow_requirement_message,
 )
+from pyspark.errors import PySparkException
 from pyspark.testing import assertDataFrameEqual
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
@@ -251,6 +252,61 @@ class BasePythonStreamingDataSourceTestsMixin:
         q.awaitTermination()
         self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+    def test_simple_stream_reader_offset_did_not_advance_raises(self):
+        """Returning end == start with non-empty data raises
+        SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+        from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+        class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+            def initialOffset(self):
+                return {"offset": 0}
+
+            def read(self, start: dict):
+                # Bug: return same offset as end despite returning data
+                start_idx = start["offset"]
+                it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+                return (it, start)
+
+            def readBetweenOffsets(self, start: dict, end: dict):
+                return iter([])
+
+            def commit(self, end: dict):
+                pass
+
+        reader = BuggySimpleStreamReader()
+        wrapper = _SimpleStreamReaderWrapper(reader)
+        with self.assertRaises(PySparkException) as cm:
+            wrapper.latestOffset()
+        self.assertEqual(
+            cm.exception.getCondition(),
+            "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+        )
+
+    def 
test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self):
+        """read() with end == start and empty iterator: no exception, no cache 
entry."""
+        from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+        class EmptyBatchReader(SimpleDataSourceStreamReader):
+            def initialOffset(self):
+                return {"offset": 0}
+
+            def read(self, start: dict):
+                # Valid: same offset as end but empty iterator (no data)
+                return (iter([]), start)
+
+            def readBetweenOffsets(self, start: dict, end: dict):
+                return iter([])
+
+            def commit(self, end: dict):
+                pass
+
+        reader = EmptyBatchReader()
+        wrapper = _SimpleStreamReaderWrapper(reader)
+        end = wrapper.latestOffset()
+        start = {"offset": 0}
+        self.assertEqual(end, start)
+        self.assertEqual(len(wrapper.cache), 0)
+
     def test_stream_writer(self):
         input_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
         output_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to