This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch 
WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature
in repository https://gitbox.apache.org/repos/asf/spark.git

commit decb75e2f8609d2cdb0015b3fb1c4b1ff410634c
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Jan 21 15:20:16 2026 +0900

    fix style
---
 python/pyspark/sql/datasource.py                   |  1 -
 python/pyspark/sql/datasource_internal.py          | 27 ++++----
 python/pyspark/sql/streaming/datasource.py         | 74 ++++++++--------------
 .../streaming/python_streaming_source_runner.py    | 39 +++++++-----
 .../sql/tests/test_python_streaming_datasource.py  | 13 ++--
 5 files changed, 71 insertions(+), 83 deletions(-)

diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index 6adfd79f2631..f1908180a3ba 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -909,7 +909,6 @@ class SimpleDataSourceStreamReader(ABC):
         ...
 
 
-
 class DataSourceWriter(ABC):
     """
     A base class for data source writers. Data source writers are responsible 
for saving
diff --git a/python/pyspark/sql/datasource_internal.py 
b/python/pyspark/sql/datasource_internal.py
index 51ef94f9a325..79d7246bea6c 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -19,7 +19,7 @@
 import json
 import copy
 from itertools import chain
-from typing import Iterator, List, Optional, Sequence, Tuple, Type
+from typing import Iterator, List, Optional, Sequence, Tuple, Type, Dict
 
 from pyspark.sql.datasource import (
     DataSource,
@@ -31,13 +31,13 @@ from pyspark.sql.streaming.datasource import (
     ReadAllAvailable,
     ReadLimit,
     SupportsAdmissionControl,
-    CompositeReadLimit,
     ReadMaxBytes,
     ReadMaxRows,
     ReadMinRows,
 )
 from pyspark.sql.types import StructType
 from pyspark.errors import PySparkNotImplementedError
+from pyspark.errors.exceptions.base import PySparkException
 
 
 def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
@@ -101,16 +101,19 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader, 
SupportsAdmissionContro
         # We do not consider providing different read limit on simple stream 
reader.
         return ReadAllAvailable()
 
-    def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict:
+    def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict:  # 
type: ignore[override]
         if self.current_offset is None:
-            assert start != None, "start offset should not be None"
+            assert start is not None, "start offset should not be None"
             self.current_offset = start
         else:
-            assert self.current_offset == start, ("start offset does not match 
current offset. "
-                   f"current: {self.current_offset}, start: {start}")
+            assert self.current_offset == start, (
+                "start offset does not match current offset. "
+                f"current: {self.current_offset}, start: {start}"
+            )
 
-        assert isinstance(readLimit, ReadAllAvailable), ("simple stream reader 
does not "
-                                                         "support read limit")
+        assert isinstance(readLimit, ReadAllAvailable), (
+            "simple stream reader does not " "support read limit"
+        )
 
         (iter, end) = self.simple_reader.read(self.current_offset)
         self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
@@ -169,23 +172,23 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader, 
SupportsAdmissionContro
 class ReadLimitRegistry:
     def __init__(self) -> None:
         self._registry: Dict[str, Type[ReadLimit]] = {}
+        # Register built-in ReadLimit types
         self.__register(ReadAllAvailable.type_name(), ReadAllAvailable)
         self.__register(ReadMinRows.type_name(), ReadMinRows)
         self.__register(ReadMaxRows.type_name(), ReadMaxRows)
         self.__register(ReadMaxBytes.type_name(), ReadMaxBytes)
-        self.__register(CompositeReadLimit.type_name(), CompositeReadLimit)
 
     def __register(self, type_name: str, read_limit_type: Type["ReadLimit"]) 
-> None:
         if type_name in self._registry:
-            # FIXME: error class?
-            raise Exception(f"ReadLimit type '{type_name}' is already 
registered.")
+            raise PySparkException(f"ReadLimit type '{type_name}' is already 
registered.")
 
         self._registry[type_name] = read_limit_type
 
     def get(self, type_name: str, params: dict) -> ReadLimit:
         read_limit_type = self._registry[type_name]
         if read_limit_type is None:
-            raise Exception("type_name '{}' is not 
registered.".format(type_name))
+            raise PySparkException("type_name '{}' is not 
registered.".format(type_name))
+
         params_without_type = params.copy()
         del params_without_type["type"]
         return read_limit_type.load(params_without_type)
diff --git a/python/pyspark/sql/streaming/datasource.py 
b/python/pyspark/sql/streaming/datasource.py
index 82afbb83c583..7d01049c7c20 100644
--- a/python/pyspark/sql/streaming/datasource.py
+++ b/python/pyspark/sql/streaming/datasource.py
@@ -16,7 +16,8 @@
 #
 
 from abc import ABC, abstractmethod
-from typing import List, Optional
+from typing import Optional
+
 
 class ReadLimit(ABC):
     @classmethod
@@ -116,69 +117,48 @@ class ReadMaxBytes(ReadLimit):
         return {"max_bytes": self.max_bytes}
 
 
-class CompositeReadLimit(ReadLimit):
-    def __init__(self, readLimits: List[ReadLimit]) -> None:
-        self.readLimits = readLimits
-
-    @classmethod
-    def type_name(cls) -> str:
-        return "CompositeReadLimit"
-
-    @classmethod
-    def load(cls, params: dict) -> "CompositeReadLimit":
-        read_limits = []
-        for rl_params in params["readLimits"]:
-            rl_type = rl_params["type"]
-            rl = READ_LIMIT_REGISTRY.get(rl_type, rl_params)
-            read_limits.append(rl)
-        return CompositeReadLimit(read_limits)
-
-    def _dump(self) -> dict:
-        return {"readLimits": [rl.dump() for rl in self.readLimits]}
-
-
 class SupportsAdmissionControl(ABC):
     def getDefaultReadLimit(self) -> ReadLimit:
         """
-        FIXME: docstring needed
+          FIXME: docstring needed
 
-      /**
-       * Returns the read limits potentially passed to the data source through 
options when creating
-       * the data source.
-       */
+        /**
+         * Returns the read limits potentially passed to the data source 
through options when creating
+         * the data source.
+         */
         """
         return ReadAllAvailable()
 
     @abstractmethod
     def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict:
         """
-        FIXME: docstring needed
+          FIXME: docstring needed
 
-      /**
-       * Returns the most recent offset available given a read limit. The 
start offset can be used
-       * to figure out how much new data should be read given the limit. Users 
should implement this
-       * method instead of latestOffset for a MicroBatchStream or getOffset 
for Source.
-       * <p>
-       * When this method is called on a `Source`, the source can return 
`null` if there is no
-       * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
-       * null as well.
-       * <p>
-       * When this method is called on a MicroBatchStream, the `startOffset` 
will be `initialOffset`
-       * for the very first micro-batch. The source can return `null` if there 
is no data to process.
-       */
+        /**
+         * Returns the most recent offset available given a read limit. The 
start offset can be used
+         * to figure out how much new data should be read given the limit. 
Users should implement this
+         * method instead of latestOffset for a MicroBatchStream or getOffset 
for Source.
+         * <p>
+         * When this method is called on a `Source`, the source can return 
`null` if there is no
+         * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
+         * null as well.
+         * <p>
+         * When this method is called on a MicroBatchStream, the `startOffset` 
will be `initialOffset`
+         * for the very first micro-batch. The source can return `null` if 
there is no data to process.
+         */
         """
         pass
 
     def reportLatestOffset(self) -> Optional[dict]:
         """
-        FIXME: docstring needed
+          FIXME: docstring needed
 
-      /**
-       * Returns the most recent offset available.
-       * <p>
-       * The source can return `null`, if there is no data to process or the 
source does not support
-       * to this method.
-       */
+        /**
+         * Returns the most recent offset available.
+         * <p>
+         * The source can return `null`, if there is no data to process or the 
source does not support
+         * to this method.
+         */
         """
         return None
 
diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py 
b/python/pyspark/sql/streaming/python_streaming_source_runner.py
index 192f729a6fb9..83f955f06cd7 100644
--- a/python/pyspark/sql/streaming/python_streaming_source_runner.py
+++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py
@@ -22,6 +22,7 @@ from typing import IO, Iterator, Tuple
 
 from pyspark.accumulators import _accumulatorRegistry
 from pyspark.errors import IllegalArgumentException, PySparkAssertionError
+from pyspark.errors.exceptions.base import PySparkException
 from pyspark.serializers import (
     read_int,
     write_int,
@@ -33,11 +34,14 @@ from pyspark.sql.datasource import (
     DataSourceStreamReader,
 )
 from pyspark.sql.streaming.datasource import (
-    ReadAllAvailable,
     SupportsAdmissionControl,
     SupportsTriggerAvailableNow,
 )
-from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper, 
_streamReader, ReadLimitRegistry
+from pyspark.sql.datasource_internal import (
+    _SimpleStreamReaderWrapper,
+    _streamReader,
+    ReadLimitRegistry,
+)
 from pyspark.sql.pandas.serializers import ArrowStreamSerializer
 from pyspark.sql.types import (
     _parse_datatype_json_string,
@@ -55,6 +59,8 @@ from pyspark.worker_util import (
     utf8_deserializer,
 )
 
+from pyspark.sql.streaming.datasource import ReadAllAvailable
+
 INITIAL_OFFSET_FUNC_ID = 884
 LATEST_OFFSET_FUNC_ID = 885
 PARTITIONS_FUNC_ID = 886
@@ -77,7 +83,6 @@ READ_LIMIT_REGISTRY = ReadLimitRegistry()
 
 def initial_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None:
     offset = reader.initialOffset()
-    # raise Exception(f"Debug info for initial offset: offset: {offset}, json: 
{json.dumps(offset).encode('utf-8')}")
     write_with_length(json.dumps(offset).encode("utf-8"), outfile)
 
 
@@ -135,7 +140,7 @@ def send_batch_func(
         write_int(EMPTY_PYARROW_RECORD_BATCHES, outfile)
 
 
-def check_support_func(reader, outfile):
+def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None:
     support_flags = 0
     if isinstance(reader, _SimpleStreamReaderWrapper):
         # We consider the method of `read` in simple_reader to already have 
admission control
@@ -151,44 +156,46 @@ def check_support_func(reader, outfile):
     write_int(support_flags, outfile)
 
 
-def prepare_for_trigger_available_now_func(reader, outfile):
+def prepare_for_trigger_available_now_func(reader: DataSourceStreamReader, 
outfile: IO) -> None:
     if isinstance(reader, _SimpleStreamReaderWrapper):
         if isinstance(reader.simple_reader, SupportsTriggerAvailableNow):
             reader.simple_reader.prepareForTriggerAvailableNow()
         else:
-            # FIXME: code for not supported? or should it be assertion?
-            raise Exception("prepareForTriggerAvailableNow is not supported by 
the "
-                            "underlying simple reader.")
+            raise PySparkException(
+                "prepareForTriggerAvailableNow is not supported by the 
underlying simple reader."
+            )
     else:
         if isinstance(reader, SupportsTriggerAvailableNow):
             reader.prepareForTriggerAvailableNow()
         else:
-            # FIXME: code for not supported? or should it be assertion?
-            raise Exception("prepareForTriggerAvailableNow is not supported by 
the "
-                            "stream reader.")
+            raise PySparkException(
+                "prepareForTriggerAvailableNow is not supported by the stream 
reader."
+            )
     write_int(0, outfile)
 
 
-def latest_offset_admission_control_func(reader, infile, outfile):
+def latest_offset_admission_control_func(
+    reader: DataSourceStreamReader, infile: IO, outfile: IO
+) -> None:
     start_offset_dict = json.loads(utf8_deserializer.loads(infile))
 
     limit = json.loads(utf8_deserializer.loads(infile))
     limit_obj = READ_LIMIT_REGISTRY.get(limit["type"], limit)
 
-    offset = reader.latestOffset(start_offset_dict, limit_obj)
+    offset = reader.latestOffset(start_offset_dict, limit_obj)  # type: 
ignore[call-arg]
     write_with_length(json.dumps(offset).encode("utf-8"), outfile)
 
 
-def get_default_read_limit_func(reader, outfile):
+def get_default_read_limit_func(reader: DataSourceStreamReader, outfile: IO) 
-> None:
     if isinstance(reader, SupportsAdmissionControl):
         limit = reader.getDefaultReadLimit()
     else:
-        limit = READ_ALL_AVAILABLE
+        limit = ReadAllAvailable()
 
     write_with_length(json.dumps(limit.dump()).encode("utf-8"), outfile)
 
 
-def report_latest_offset_func(reader, outfile):
+def report_latest_offset_func(reader: DataSourceStreamReader, outfile: IO) -> 
None:
     if isinstance(reader, _SimpleStreamReaderWrapper):
         # We do not consider providing latest offset on simple stream reader.
         write_int(0, outfile)
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index b894a5fcfec6..911f7fc81007 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -148,10 +148,7 @@ class BasePythonStreamingDataSourceTestsMixin:
             SupportsAdmissionControl,
         )
 
-        class TestDataStreamReader(
-            DataSourceStreamReader,
-            SupportsAdmissionControl
-        ):
+        class TestDataStreamReader(DataSourceStreamReader, 
SupportsAdmissionControl):
             def initialOffset(self):
                 return {"partition-1": 0}
 
@@ -163,8 +160,9 @@ class BasePythonStreamingDataSourceTestsMixin:
                 if isinstance(readLimit, ReadAllAvailable):
                     end_offset = start_idx + 10
                 else:
-                    assert isinstance(readLimit, ReadMaxRows), ("Expected 
ReadMaxRows read limit but got "
-                                                                + 
str(type(readLimit)))
+                    assert isinstance(
+                        readLimit, ReadMaxRows
+                    ), "Expected ReadMaxRows read limit but got " + 
str(type(readLimit))
                     end_offset = start_idx + readLimit.max_rows
                 return {"partition-1": end_offset}
 
@@ -182,6 +180,7 @@ class BasePythonStreamingDataSourceTestsMixin:
         class TestDataSource(DataSource):
             def schema(self) -> str:
                 return "id INT"
+
             def streamReader(self, schema):
                 return TestDataStreamReader()
 
@@ -336,7 +335,7 @@ class BasePythonStreamingDataSourceTestsMixin:
             def read(self, start: dict):
                 start_idx = start["offset"]
                 end_offset = min(start_idx + 2, 
self.desired_end_offset["offset"])
-                it = iter([(i, ) for i in range(start_idx, end_offset)])
+                it = iter([(i,) for i in range(start_idx, end_offset)])
                 return (it, {"offset": end_offset})
 
             def commit(self, end):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to