This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch 
WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature
in repository https://gitbox.apache.org/repos/asf/spark.git

commit a035e4e26b3ea2328440f1d928738cead2fc6c3a
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Jan 26 11:17:45 2026 +0900

    Melt SupportsAdmissionControl into stream reader
---
 python/pyspark/sql/datasource.py                   | 56 +++++++++++++++++-----
 python/pyspark/sql/datasource_internal.py          |  3 +-
 python/pyspark/sql/streaming/datasource.py         | 46 ------------------
 .../streaming/python_streaming_source_runner.py    | 28 +++++------
 .../streaming/PythonStreamingDataSourceSuite.scala | 19 ++++----
 5 files changed, 66 insertions(+), 86 deletions(-)

diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index f1908180a3ba..ed8c20e862b4 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -714,26 +714,56 @@ class DataSourceStreamReader(ABC):
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
-        """
-        Returns the most recent offset available.
+    from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadLimit
 
-        Returns
-        -------
-        dict
-            A dict or recursive dict whose key and value are primitive types, 
which includes
-            Integer, String and Boolean.
-
-        Examples
-        --------
-        >>> def latestOffset(self):
-        ...     return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+    # FIXME: Will this work without abstractmethod marker?
+    # @abstractmethod
+    def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict:
+        """
+        FIXME: docstring needed
+
+        /**
+        * Returns the most recent offset available given a read limit. The 
start offset can be used
+        * to figure out how much new data should be read given the limit. 
Users should implement this
+        * method instead of latestOffset for a MicroBatchStream or getOffset 
for Source.
+        * <p>
+        * When this method is called on a `Source`, the source can return 
`null` if there is no
+        * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
+        * null as well.
+        * <p>
+        * When this method is called on a MicroBatchStream, the `startOffset` 
will be `initialOffset`
+        * for the very first micro-batch. The source can return `null` if 
there is no data to process.
+        */
         """
         raise PySparkNotImplementedError(
             errorClass="NOT_IMPLEMENTED",
             messageParameters={"feature": "latestOffset"},
         )
 
+    def getDefaultReadLimit(self) -> ReadLimit:
+        """
+          FIXME: docstring needed
+
+        /**
+         * Returns the read limits potentially passed to the data source 
through options when creating
+         * the data source.
+         */
+        """
+        return ReadAllAvailable()
+
+    def reportLatestOffset(self) -> Optional[dict]:
+        """
+          FIXME: docstring needed
+
+        /**
+         * Returns the most recent offset available.
+         * <p>
+         * The source can return `null`, if there is no data to process or the 
source does not support
+         * to this method.
+         */
+        """
+        return None
+
     def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
         """
         Returns a list of InputPartition given the start and end offsets. Each 
InputPartition
diff --git a/python/pyspark/sql/datasource_internal.py 
b/python/pyspark/sql/datasource_internal.py
index 79d7246bea6c..63505f354569 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -30,7 +30,6 @@ from pyspark.sql.datasource import (
 from pyspark.sql.streaming.datasource import (
     ReadAllAvailable,
     ReadLimit,
-    SupportsAdmissionControl,
     ReadMaxBytes,
     ReadMaxRows,
     ReadMinRows,
@@ -65,7 +64,7 @@ class PrefetchedCacheEntry:
         self.iterator = iterator
 
 
-class _SimpleStreamReaderWrapper(DataSourceStreamReader, 
SupportsAdmissionControl):
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
     """
     A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
     so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
diff --git a/python/pyspark/sql/streaming/datasource.py 
b/python/pyspark/sql/streaming/datasource.py
index 7d01049c7c20..b4ad7decd4ed 100644
--- a/python/pyspark/sql/streaming/datasource.py
+++ b/python/pyspark/sql/streaming/datasource.py
@@ -117,52 +117,6 @@ class ReadMaxBytes(ReadLimit):
         return {"max_bytes": self.max_bytes}
 
 
-class SupportsAdmissionControl(ABC):
-    def getDefaultReadLimit(self) -> ReadLimit:
-        """
-          FIXME: docstring needed
-
-        /**
-         * Returns the read limits potentially passed to the data source 
through options when creating
-         * the data source.
-         */
-        """
-        return ReadAllAvailable()
-
-    @abstractmethod
-    def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict:
-        """
-          FIXME: docstring needed
-
-        /**
-         * Returns the most recent offset available given a read limit. The 
start offset can be used
-         * to figure out how much new data should be read given the limit. 
Users should implement this
-         * method instead of latestOffset for a MicroBatchStream or getOffset 
for Source.
-         * <p>
-         * When this method is called on a `Source`, the source can return 
`null` if there is no
-         * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
-         * null as well.
-         * <p>
-         * When this method is called on a MicroBatchStream, the `startOffset` 
will be `initialOffset`
-         * for the very first micro-batch. The source can return `null` if 
there is no data to process.
-         */
-        """
-        pass
-
-    def reportLatestOffset(self) -> Optional[dict]:
-        """
-          FIXME: docstring needed
-
-        /**
-         * Returns the most recent offset available.
-         * <p>
-         * The source can return `null`, if there is no data to process or the 
source does not support
-         * to this method.
-         */
-        """
-        return None
-
-
 class SupportsTriggerAvailableNow(ABC):
     @abstractmethod
     def prepareForTriggerAvailableNow(self) -> None:
diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py 
b/python/pyspark/sql/streaming/python_streaming_source_runner.py
index c6391bceffd3..c881d4367f8d 100644
--- a/python/pyspark/sql/streaming/python_streaming_source_runner.py
+++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py
@@ -34,7 +34,6 @@ from pyspark.sql.datasource import (
     DataSourceStreamReader,
 )
 from pyspark.sql.streaming.datasource import (
-    SupportsAdmissionControl,
     SupportsTriggerAvailableNow,
 )
 from pyspark.sql.datasource_internal import (
@@ -145,10 +144,18 @@ def check_support_func(reader: DataSourceStreamReader, 
outfile: IO) -> None:
     if isinstance(reader, _SimpleStreamReaderWrapper):
         # We consider the method of `read` in simple_reader to already have 
admission control
         # into it.
+        support_flags |= SUPPORTS_ADMISSION_CONTROL
         if isinstance(reader.simple_reader, SupportsTriggerAvailableNow):
             support_flags |= SUPPORTS_TRIGGER_AVAILABLE_NOW
     else:
-        if isinstance(reader, SupportsAdmissionControl):
+        import inspect
+        sig = inspect.signature(reader.latestOffset)
+        if len(sig.parameters) == 0:
+            # old signature of latestOffset()
+            pass
+        else:
+            # we don't check the number/type of parameters here strictly - we 
leave the python to
+            # raise error when calling the method if the types do not match.
             support_flags |= SUPPORTS_ADMISSION_CONTROL
         if isinstance(reader, SupportsTriggerAvailableNow):
             support_flags |= SUPPORTS_TRIGGER_AVAILABLE_NOW
@@ -186,11 +193,7 @@ def latest_offset_admission_control_func(
 
 
 def get_default_read_limit_func(reader: DataSourceStreamReader, outfile: IO) 
-> None:
-    if isinstance(reader, SupportsAdmissionControl):
-        limit = reader.getDefaultReadLimit()
-    else:
-        limit = ReadAllAvailable()
-
+    limit = reader.getDefaultReadLimit()
     write_with_length(json.dumps(limit.dump()).encode("utf-8"), outfile)
 
 
@@ -199,14 +202,11 @@ def report_latest_offset_func(reader: 
DataSourceStreamReader, outfile: IO) -> No
         # We do not consider providing latest offset on simple stream reader.
         write_int(0, outfile)
     else:
-        if isinstance(reader, SupportsAdmissionControl):
-            offset = reader.reportLatestOffset()
-            if offset is None:
-                write_int(0, outfile)
-            else:
-                write_with_length(json.dumps(offset).encode("utf-8"), outfile)
-        else:
+        offset = reader.reportLatestOffset()
+        if offset is None:
             write_int(0, outfile)
+        else:
+            write_with_length(json.dumps(offset).encode("utf-8"), outfile)
 
 
 def main(infile: IO, outfile: IO) -> None:
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
index eb5f7475636f..5aef7a6f0a10 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
@@ -18,14 +18,12 @@ package org.apache.spark.sql.execution.python.streaming
 
 import java.io.File
 import java.util.concurrent.CountDownLatch
-
 import scala.concurrent.duration._
-
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
 import org.apache.spark.sql.connector.read.streaming.ReadLimit
-import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonMicroBatchStreamWithAdmissionControl, 
PythonStreamingSourceOffset}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonMicroBatchStreamWithAdmissionControl, 
PythonStreamingSourceOffset, PythonStreamingSourceReadLimit}
 import org.apache.spark.sql.execution.python.PythonDataSourceSuiteBase
 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
 import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, 
OffsetSeqLog}
@@ -685,12 +683,10 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
        |    ReadAllAvailable,
        |    ReadLimit,
        |    ReadMaxRows,
-       |    SupportsAdmissionControl,
        |)
        |
        |class TestDataStreamReader(
        |    DataSourceStreamReader,
-       |    SupportsAdmissionControl
        |):
        |    def initialOffset(self):
        |        return {"partition-1": 0}
@@ -732,13 +728,11 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
        |    ReadAllAvailable,
        |    ReadLimit,
        |    ReadMaxRows,
-       |    SupportsAdmissionControl,
        |    SupportsTriggerAvailableNow
        |)
        |
        |class TestDataStreamReader(
        |    DataSourceStreamReader,
-       |    SupportsAdmissionControl,
        |    SupportsTriggerAvailableNow
        |):
        |    def initialOffset(self):
@@ -967,13 +961,14 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
     pythonDs.setShortName("ErrorDataSource")
 
     def testMicroBatchStreamError(action: String, msg: String)(
-        func: PythonMicroBatchStream => Unit): Unit = {
+        func: PythonMicroBatchStreamWithAdmissionControl => Unit): Unit = {
       val options = CaseInsensitiveStringMap.empty()
       val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner(
         pythonDs, errorDataSourceName, inputSchema, options)
       runner.init()
 
-      val stream = new PythonMicroBatchStream(
+      // New default for python stream reader is with Admission Control
+      val stream = new PythonMicroBatchStreamWithAdmissionControl(
         pythonDs,
         errorDataSourceName,
         inputSchema,
@@ -1002,12 +997,14 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
         stream.initialOffset()
     }
 
+    val offset = PythonStreamingSourceOffset("{\"offset\": \"2\"}")
     testMicroBatchStreamError("latestOffset", "[NOT_IMPLEMENTED] latestOffset 
is not implemented") {
       stream =>
-        stream.latestOffset()
+        val readLimit = PythonStreamingSourceReadLimit(
+          PythonStreamingSourceRunner.READ_ALL_AVAILABLE_JSON)
+        stream.latestOffset(offset, readLimit)
     }
 
-    val offset = PythonStreamingSourceOffset("{\"offset\": \"2\"}")
     testMicroBatchStreamError("planPartitions", "[NOT_IMPLEMENTED] partitions 
is not implemented") {
       stream =>
         stream.planInputPartitions(offset, offset)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to