This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature in repository https://gitbox.apache.org/repos/asf/spark.git
commit a035e4e26b3ea2328440f1d928738cead2fc6c3a Author: Jungtaek Lim <[email protected]> AuthorDate: Mon Jan 26 11:17:45 2026 +0900 Melt SupportsAdmissionControl into stream reader --- python/pyspark/sql/datasource.py | 56 +++++++++++++++++----- python/pyspark/sql/datasource_internal.py | 3 +- python/pyspark/sql/streaming/datasource.py | 46 ------------------ .../streaming/python_streaming_source_runner.py | 28 +++++------ .../streaming/PythonStreamingDataSourceSuite.scala | 19 ++++---- 5 files changed, 66 insertions(+), 86 deletions(-) diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index f1908180a3ba..ed8c20e862b4 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -714,26 +714,56 @@ class DataSourceStreamReader(ABC): messageParameters={"feature": "initialOffset"}, ) - def latestOffset(self) -> dict: - """ - Returns the most recent offset available. + from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadLimit - Returns - ------- - dict - A dict or recursive dict whose key and value are primitive types, which includes - Integer, String and Boolean. - - Examples - -------- - >>> def latestOffset(self): - ... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} + # FIXME: Will this work without abstractmethod marker? + # @abstractmethod + def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict: + """ + FIXME: docstring needed + + /** + * Returns the most recent offset available given a read limit. The start offset can be used + * to figure out how much new data should be read given the limit. Users should implement this + * method instead of latestOffset for a MicroBatchStream or getOffset for Source. + * <p> + * When this method is called on a `Source`, the source can return `null` if there is no + * data to process. In addition, for the very first micro-batch, the `startOffset` will be + * null as well. + * <p> + * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset` + * for the very first micro-batch. The source can return `null` if there is no data to process. + */ """ raise PySparkNotImplementedError( errorClass="NOT_IMPLEMENTED", messageParameters={"feature": "latestOffset"}, ) + def getDefaultReadLimit(self) -> ReadLimit: + """ + FIXME: docstring needed + + /** + * Returns the read limits potentially passed to the data source through options when creating + * the data source. + */ + """ + return ReadAllAvailable() + + def reportLatestOffset(self) -> Optional[dict]: + """ + FIXME: docstring needed + + /** + * Returns the most recent offset available. + * <p> + * The source can return `null`, if there is no data to process or the source does not support + * to this method. + */ + """ + return None + def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: """ Returns a list of InputPartition given the start and end offsets. Each InputPartition diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index 79d7246bea6c..63505f354569 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -30,7 +30,6 @@ from pyspark.sql.datasource import ( from pyspark.sql.streaming.datasource import ( ReadAllAvailable, ReadLimit, - SupportsAdmissionControl, ReadMaxBytes, ReadMaxRows, ReadMinRows, @@ -65,7 +64,7 @@ class PrefetchedCacheEntry: self.iterator = iterator -class _SimpleStreamReaderWrapper(DataSourceStreamReader, SupportsAdmissionControl): +class _SimpleStreamReaderWrapper(DataSourceStreamReader): """ A private class that wrap :class:`SimpleDataSourceStreamReader` in prefetch and cache pattern, so that :class:`SimpleDataSourceStreamReader` can integrate with streaming engine like an diff --git a/python/pyspark/sql/streaming/datasource.py b/python/pyspark/sql/streaming/datasource.py index 7d01049c7c20..b4ad7decd4ed 100644 --- a/python/pyspark/sql/streaming/datasource.py +++ b/python/pyspark/sql/streaming/datasource.py @@ -117,52 +117,6 @@ class ReadMaxBytes(ReadLimit): return {"max_bytes": self.max_bytes} -class SupportsAdmissionControl(ABC): - def getDefaultReadLimit(self) -> ReadLimit: - """ - FIXME: docstring needed - - /** - * Returns the read limits potentially passed to the data source through options when creating - * the data source. - */ - """ - return ReadAllAvailable() - - @abstractmethod - def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict: - """ - FIXME: docstring needed - - /** - * Returns the most recent offset available given a read limit. The start offset can be used - * to figure out how much new data should be read given the limit. Users should implement this - * method instead of latestOffset for a MicroBatchStream or getOffset for Source. - * <p> - * When this method is called on a `Source`, the source can return `null` if there is no - * data to process. In addition, for the very first micro-batch, the `startOffset` will be - * null as well. - * <p> - * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset` - * for the very first micro-batch. The source can return `null` if there is no data to process. - */ - """ - pass - - def reportLatestOffset(self) -> Optional[dict]: - """ - FIXME: docstring needed - - /** - * Returns the most recent offset available. - * <p> - * The source can return `null`, if there is no data to process or the source does not support - * to this method. - */ - """ - return None - - class SupportsTriggerAvailableNow(ABC): @abstractmethod def prepareForTriggerAvailableNow(self) -> None: diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index c6391bceffd3..c881d4367f8d 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -34,7 +34,6 @@ from pyspark.sql.datasource import ( DataSourceStreamReader, ) from pyspark.sql.streaming.datasource import ( - SupportsAdmissionControl, SupportsTriggerAvailableNow, ) from pyspark.sql.datasource_internal import ( @@ -145,10 +144,18 @@ def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None: if isinstance(reader, _SimpleStreamReaderWrapper): # We consider the method of `read` in simple_reader to already have admission control # into it. + support_flags |= SUPPORTS_ADMISSION_CONTROL if isinstance(reader.simple_reader, SupportsTriggerAvailableNow): support_flags |= SUPPORTS_TRIGGER_AVAILABLE_NOW else: - if isinstance(reader, SupportsAdmissionControl): + import inspect + sig = inspect.signature(reader.latestOffset) + if len(sig.parameters) == 0: + # old signature of latestOffset() + pass + else: + # we don't check the number/type of parameters here strictly - we leave the python to + # raise error when calling the method if the types do not match. support_flags |= SUPPORTS_ADMISSION_CONTROL if isinstance(reader, SupportsTriggerAvailableNow): support_flags |= SUPPORTS_TRIGGER_AVAILABLE_NOW @@ -186,11 +193,7 @@ def latest_offset_admission_control_func( def get_default_read_limit_func(reader: DataSourceStreamReader, outfile: IO) -> None: - if isinstance(reader, SupportsAdmissionControl): - limit = reader.getDefaultReadLimit() - else: - limit = ReadAllAvailable() - + limit = reader.getDefaultReadLimit() write_with_length(json.dumps(limit.dump()).encode("utf-8"), outfile) @@ -199,14 +202,11 @@ def report_latest_offset_func(reader: DataSourceStreamReader, outfile: IO) -> No # We do not consider providing latest offset on simple stream reader. write_int(0, outfile) else: - if isinstance(reader, SupportsAdmissionControl): - offset = reader.reportLatestOffset() - if offset is None: - write_int(0, outfile) - else: - write_with_length(json.dumps(offset).encode("utf-8"), outfile) - else: + offset = reader.reportLatestOffset() + if offset is None: write_int(0, outfile) + else: + write_with_length(json.dumps(offset).encode("utf-8"), outfile) def main(infile: IO, outfile: IO) -> None: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala index eb5f7475636f..5aef7a6f0a10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.execution.python.streaming import java.io.File import java.util.concurrent.CountDownLatch - import scala.concurrent.duration._ - import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} import org.apache.spark.sql.connector.read.streaming.ReadLimit -import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonMicroBatchStreamWithAdmissionControl, PythonStreamingSourceOffset} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonMicroBatchStreamWithAdmissionControl, PythonStreamingSourceOffset, PythonStreamingSourceReadLimit} import org.apache.spark.sql.execution.python.PythonDataSourceSuiteBase import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, OffsetSeqLog} @@ -685,12 +683,10 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { | ReadAllAvailable, | ReadLimit, | ReadMaxRows, - | SupportsAdmissionControl, |) | |class TestDataStreamReader( | DataSourceStreamReader, - | SupportsAdmissionControl |): | def initialOffset(self): | return {"partition-1": 0} @@ -732,13 +728,11 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { | ReadAllAvailable, | ReadLimit, | ReadMaxRows, - | SupportsAdmissionControl, | SupportsTriggerAvailableNow |) | |class TestDataStreamReader( | DataSourceStreamReader, - | SupportsAdmissionControl, | SupportsTriggerAvailableNow |): | def initialOffset(self): @@ -967,13 +961,14 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { pythonDs.setShortName("ErrorDataSource") def testMicroBatchStreamError(action: String, msg: String)( - func: PythonMicroBatchStream => Unit): Unit = { + func: PythonMicroBatchStreamWithAdmissionControl => Unit): Unit = { val options = CaseInsensitiveStringMap.empty() val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner( pythonDs, errorDataSourceName, inputSchema, options) runner.init() - val stream = new PythonMicroBatchStream( + // New default for python stream reader is with Admission Control + val stream = new PythonMicroBatchStreamWithAdmissionControl( pythonDs, errorDataSourceName, inputSchema, @@ -1002,12 +997,14 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { stream.initialOffset() } + val offset = PythonStreamingSourceOffset("{\"offset\": \"2\"}") testMicroBatchStreamError("latestOffset", "[NOT_IMPLEMENTED] latestOffset is not implemented") { stream => - stream.latestOffset() + val readLimit = PythonStreamingSourceReadLimit( + PythonStreamingSourceRunner.READ_ALL_AVAILABLE_JSON) + stream.latestOffset(offset, readLimit) } - val offset = PythonStreamingSourceOffset("{\"offset\": \"2\"}") testMicroBatchStreamError("planPartitions", "[NOT_IMPLEMENTED] partitions is not implemented") { stream => stream.planInputPartitions(offset, offset) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
