This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e9a141bfa582 [SPARK-55416][SS][PYTHON][4.1] Streaming Python Data
Source memory leak when end-offset is not updated
e9a141bfa582 is described below
commit e9a141bfa582c9f51e38cd2e8324cdad820409f1
Author: vinodkc <[email protected]>
AuthorDate: Thu Feb 19 12:13:46 2026 +0900
[SPARK-55416][SS][PYTHON][4.1] Streaming Python Data Source memory leak
when end-offset is not updated
### What changes were proposed in this pull request?
Backport https://github.com/apache/spark/pull/54237 to branch-4.1
In `_SimpleStreamReaderWrapper.latestOffset()`, validate that custom
implementation of datasource based on `SimpleDataSourceStreamReader.read()`
does not return a non-empty batch with end == start. If it does, raise
PySparkException with error class `SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE`
before appending to the cache. Empty batches with end == start remain allowed.
### Why are the changes needed?
When a user implements read(start) incorrectly and returns:
- Same offset for both: end = start (e.g. both {"offset": 0}).
- Non-empty iterator: e.g. 2 rows.
If a reader returns end == start with data (e.g. return (it, {"offset":
start_idx})), the wrapper keeps appending to its prefetch cache on every
trigger while commit(end) never trims entries (first matching index is 0). The
cache grows without bound and driver (non-JVM) memory increases until OOM.
Validating and raising error before appending stops this and fails fast with a
clear error.
Empty batches with end == start remain allowed , it will allow the Python
data source to represent that there is no data to read.
### Does this PR introduce _any_ user-facing change?
Yes. Implementations that return end == start with a non-empty iterator now
get PySparkException instead of unbounded memory growth. Empty batches with end
== start are unchanged.
### How was this patch tested?
Added unit test in `test_python_streaming_datasource.py`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54321 from vinodkc/br_SPARK-55416_4.1.
Authored-by: vinodkc <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../source/tutorial/sql/python_data_source.rst | 9 +++-
python/pyspark/errors/error-conditions.json | 5 ++
python/pyspark/sql/datasource_internal.py | 28 ++++++++++-
.../sql/tests/test_python_streaming_datasource.py | 56 ++++++++++++++++++++++
4 files changed, 95 insertions(+), 3 deletions(-)
diff --git a/python/docs/source/tutorial/sql/python_data_source.rst
b/python/docs/source/tutorial/sql/python_data_source.rst
index 78ffeda0db1c..1cc1811000ca 100644
--- a/python/docs/source/tutorial/sql/python_data_source.rst
+++ b/python/docs/source/tutorial/sql/python_data_source.rst
@@ -305,7 +305,14 @@ This is the same dummy streaming reader that generate 2
rows every batch impleme
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
"""
- Takes start offset as an input, return an iterator of tuples and
the start offset of next read.
+ Takes start offset as an input, return an iterator of tuples and
+ the end offset (start offset for the next read). The end offset
must
+ advance past the start offset when returning data; otherwise Spark
+ raises a validation exception.
+ For example, returning 2 records from start_idx 0 means end should
+ be {"offset": 2} (i.e. start + 2).
+ When there is no data to read, you may return the same offset as
end and
+ start, but you must provide an empty iterator.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
diff --git a/python/pyspark/errors/error-conditions.json
b/python/pyspark/errors/error-conditions.json
index 295b372cade5..326671c0d5ad 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1119,6 +1119,11 @@
"SparkContext or SparkSession should be created first."
]
},
+ "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+ "message": [
+ "SimpleDataSourceStreamReader.read() returned a non-empty batch but the
end offset: <end_offset> did not advance past the start offset: <start_offset>.
The end offset must represent the position after the last record returned."
+ ]
+ },
"SLICE_WITH_STEP": {
"message": [
"Slice with step is not supported."
diff --git a/python/pyspark/sql/datasource_internal.py
b/python/pyspark/sql/datasource_internal.py
index 6df0be4192ec..9467bfdf73bc 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -28,7 +28,7 @@ from pyspark.sql.datasource import (
SimpleDataSourceStreamReader,
)
from pyspark.sql.types import StructType
-from pyspark.errors import PySparkNotImplementedError
+from pyspark.errors import PySparkException, PySparkNotImplementedError
def _streamReader(datasource: DataSource, schema: StructType) ->
"DataSourceStreamReader":
@@ -88,12 +88,36 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
self.initial_offset = self.simple_reader.initialOffset()
return self.initial_offset
+ def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple])
-> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ start_str = json.dumps(start)
+ end_str = json.dumps(end)
+ if end_str != start_str:
+ self.cache.append(PrefetchedCacheEntry(start, end, it))
+ return
+ try:
+ next(it)
+ except StopIteration:
+ return
+ raise PySparkException(
+ errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ messageParameters={
+ "start_offset": start_str,
+ "end_offset": end_str,
+ },
+ )
+
def latestOffset(self) -> dict:
# when query start for the first time, use initial offset as the start
offset.
if self.current_offset is None:
self.current_offset = self.initialOffset()
(iter, end) = self.simple_reader.read(self.current_offset)
- self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
+ self.add_result_to_cache(self.current_offset, end, iter)
self.current_offset = end
return end
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index 9879231540f1..ecf28677689b 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -34,6 +34,7 @@ from pyspark.testing.sqlutils import (
have_pyarrow,
pyarrow_requirement_message,
)
+from pyspark.errors import PySparkException
from pyspark.testing import assertDataFrameEqual
from pyspark.testing.sqlutils import ReusedSQLTestCase
@@ -251,6 +252,61 @@ class BasePythonStreamingDataSourceTestsMixin:
q.awaitTermination()
self.assertIsNone(q.exception(), "No exception has to be propagated.")
+ def test_simple_stream_reader_offset_did_not_advance_raises(self):
+ """Returning end == start with non-empty data raises
+ SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Bug: return same offset as end despite returning data
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+ return (it, start)
+
+ def readBetweenOffsets(self, start: dict, end: dict):
+ return iter([])
+
+ def commit(self, end: dict):
+ pass
+
+ reader = BuggySimpleStreamReader()
+ wrapper = _SimpleStreamReaderWrapper(reader)
+ with self.assertRaises(PySparkException) as cm:
+ wrapper.latestOffset()
+ self.assertEqual(
+ cm.exception.getCondition(),
+ "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ )
+
+ def
test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self):
+ """read() with end == start and empty iterator: no exception, no cache
entry."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class EmptyBatchReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Valid: same offset as end but empty iterator (no data)
+ return (iter([]), start)
+
+ def readBetweenOffsets(self, start: dict, end: dict):
+ return iter([])
+
+ def commit(self, end: dict):
+ pass
+
+ reader = EmptyBatchReader()
+ wrapper = _SimpleStreamReaderWrapper(reader)
+ end = wrapper.latestOffset()
+ start = {"offset": 0}
+ self.assertEqual(end, start)
+ self.assertEqual(len(wrapper.cache), 0)
+
def test_stream_writer(self):
input_dir =
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
output_dir =
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]