This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch WIP-python-data-source-admission-control-trigger-availablenow-change-the-method-signature in repository https://gitbox.apache.org/repos/asf/spark.git
commit decb75e2f8609d2cdb0015b3fb1c4b1ff410634c Author: Jungtaek Lim <[email protected]> AuthorDate: Wed Jan 21 15:20:16 2026 +0900 fix style --- python/pyspark/sql/datasource.py | 1 - python/pyspark/sql/datasource_internal.py | 27 ++++---- python/pyspark/sql/streaming/datasource.py | 74 ++++++++-------------- .../streaming/python_streaming_source_runner.py | 39 +++++++----- .../sql/tests/test_python_streaming_datasource.py | 13 ++-- 5 files changed, 71 insertions(+), 83 deletions(-) diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index 6adfd79f2631..f1908180a3ba 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -909,7 +909,6 @@ class SimpleDataSourceStreamReader(ABC): ... - class DataSourceWriter(ABC): """ A base class for data source writers. Data source writers are responsible for saving diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index 51ef94f9a325..79d7246bea6c 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -19,7 +19,7 @@ import json import copy from itertools import chain -from typing import Iterator, List, Optional, Sequence, Tuple, Type +from typing import Iterator, List, Optional, Sequence, Tuple, Type, Dict from pyspark.sql.datasource import ( DataSource, @@ -31,13 +31,13 @@ from pyspark.sql.streaming.datasource import ( ReadAllAvailable, ReadLimit, SupportsAdmissionControl, - CompositeReadLimit, ReadMaxBytes, ReadMaxRows, ReadMinRows, ) from pyspark.sql.types import StructType from pyspark.errors import PySparkNotImplementedError +from pyspark.errors.exceptions.base import PySparkException def _streamReader(datasource: DataSource, schema: StructType) -> "DataSourceStreamReader": @@ -101,16 +101,19 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader, SupportsAdmissionContro # We do not consider providing different read limit on simple stream reader. return ReadAllAvailable() - def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict: + def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict: # type: ignore[override] if self.current_offset is None: - assert start != None, "start offset should not be None" + assert start is not None, "start offset should not be None" self.current_offset = start else: - assert self.current_offset == start, ("start offset does not match current offset. " - f"current: {self.current_offset}, start: {start}") + assert self.current_offset == start, ( + "start offset does not match current offset. " + f"current: {self.current_offset}, start: {start}" + ) - assert isinstance(readLimit, ReadAllAvailable), ("simple stream reader does not " - "support read limit") + assert isinstance(readLimit, ReadAllAvailable), ( + "simple stream reader does not " "support read limit" + ) (iter, end) = self.simple_reader.read(self.current_offset) self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter)) @@ -169,23 +172,23 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader, SupportsAdmissionContro class ReadLimitRegistry: def __init__(self) -> None: self._registry: Dict[str, Type[ReadLimit]] = {} + # Register built-in ReadLimit types self.__register(ReadAllAvailable.type_name(), ReadAllAvailable) self.__register(ReadMinRows.type_name(), ReadMinRows) self.__register(ReadMaxRows.type_name(), ReadMaxRows) self.__register(ReadMaxBytes.type_name(), ReadMaxBytes) - self.__register(CompositeReadLimit.type_name(), CompositeReadLimit) def __register(self, type_name: str, read_limit_type: Type["ReadLimit"]) -> None: if type_name in self._registry: - # FIXME: error class? - raise Exception(f"ReadLimit type '{type_name}' is already registered.") + raise PySparkException(f"ReadLimit type '{type_name}' is already registered.") self._registry[type_name] = read_limit_type def get(self, type_name: str, params: dict) -> ReadLimit: read_limit_type = self._registry[type_name] if read_limit_type is None: - raise Exception("type_name '{}' is not registered.".format(type_name)) + raise PySparkException("type_name '{}' is not registered.".format(type_name)) + params_without_type = params.copy() del params_without_type["type"] return read_limit_type.load(params_without_type) diff --git a/python/pyspark/sql/streaming/datasource.py b/python/pyspark/sql/streaming/datasource.py index 82afbb83c583..7d01049c7c20 100644 --- a/python/pyspark/sql/streaming/datasource.py +++ b/python/pyspark/sql/streaming/datasource.py @@ -16,7 +16,8 @@ # from abc import ABC, abstractmethod -from typing import List, Optional +from typing import Optional + class ReadLimit(ABC): @classmethod @@ -116,69 +117,48 @@ class ReadMaxBytes(ReadLimit): return {"max_bytes": self.max_bytes} -class CompositeReadLimit(ReadLimit): - def __init__(self, readLimits: List[ReadLimit]) -> None: - self.readLimits = readLimits - - @classmethod - def type_name(cls) -> str: - return "CompositeReadLimit" - - @classmethod - def load(cls, params: dict) -> "CompositeReadLimit": - read_limits = [] - for rl_params in params["readLimits"]: - rl_type = rl_params["type"] - rl = READ_LIMIT_REGISTRY.get(rl_type, rl_params) - read_limits.append(rl) - return CompositeReadLimit(read_limits) - - def _dump(self) -> dict: - return {"readLimits": [rl.dump() for rl in self.readLimits]} - - class SupportsAdmissionControl(ABC): def getDefaultReadLimit(self) -> ReadLimit: """ - FIXME: docstring needed + FIXME: docstring needed - /** - * Returns the read limits potentially passed to the data source through options when creating - * the data source. - */ + /** + * Returns the read limits potentially passed to the data source through options when creating + * the data source. + */ """ return ReadAllAvailable() @abstractmethod def latestOffset(self, start: dict, readLimit: ReadLimit) -> dict: """ - FIXME: docstring needed + FIXME: docstring needed - /** - * Returns the most recent offset available given a read limit. The start offset can be used - * to figure out how much new data should be read given the limit. Users should implement this - * method instead of latestOffset for a MicroBatchStream or getOffset for Source. - * <p> - * When this method is called on a `Source`, the source can return `null` if there is no - * data to process. In addition, for the very first micro-batch, the `startOffset` will be - * null as well. - * <p> - * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset` - * for the very first micro-batch. The source can return `null` if there is no data to process. - */ + /** + * Returns the most recent offset available given a read limit. The start offset can be used + * to figure out how much new data should be read given the limit. Users should implement this + * method instead of latestOffset for a MicroBatchStream or getOffset for Source. + * <p> + * When this method is called on a `Source`, the source can return `null` if there is no + * data to process. In addition, for the very first micro-batch, the `startOffset` will be + * null as well. + * <p> + * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset` + * for the very first micro-batch. The source can return `null` if there is no data to process. + */ """ pass def reportLatestOffset(self) -> Optional[dict]: """ - FIXME: docstring needed + FIXME: docstring needed - /** - * Returns the most recent offset available. - * <p> - * The source can return `null`, if there is no data to process or the source does not support - * to this method. - */ + /** + * Returns the most recent offset available. + * <p> + * The source can return `null`, if there is no data to process or the source does not support + * to this method. + */ """ return None diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index 192f729a6fb9..83f955f06cd7 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -22,6 +22,7 @@ from typing import IO, Iterator, Tuple from pyspark.accumulators import _accumulatorRegistry from pyspark.errors import IllegalArgumentException, PySparkAssertionError +from pyspark.errors.exceptions.base import PySparkException from pyspark.serializers import ( read_int, write_int, @@ -33,11 +34,14 @@ from pyspark.sql.datasource import ( DataSourceStreamReader, ) from pyspark.sql.streaming.datasource import ( - ReadAllAvailable, SupportsAdmissionControl, SupportsTriggerAvailableNow, ) -from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper, _streamReader, ReadLimitRegistry +from pyspark.sql.datasource_internal import ( + _SimpleStreamReaderWrapper, + _streamReader, + ReadLimitRegistry, +) from pyspark.sql.pandas.serializers import ArrowStreamSerializer from pyspark.sql.types import ( _parse_datatype_json_string, @@ -55,6 +59,8 @@ from pyspark.worker_util import ( utf8_deserializer, ) +from pyspark.sql.streaming.datasource import ReadAllAvailable + INITIAL_OFFSET_FUNC_ID = 884 LATEST_OFFSET_FUNC_ID = 885 PARTITIONS_FUNC_ID = 886 @@ -77,7 +83,6 @@ READ_LIMIT_REGISTRY = ReadLimitRegistry() def initial_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None: offset = reader.initialOffset() - # raise Exception(f"Debug info for initial offset: offset: {offset}, json: {json.dumps(offset).encode('utf-8')}") write_with_length(json.dumps(offset).encode("utf-8"), outfile) @@ -135,7 +140,7 @@ def send_batch_func( write_int(EMPTY_PYARROW_RECORD_BATCHES, outfile) -def check_support_func(reader, outfile): +def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None: support_flags = 0 if isinstance(reader, _SimpleStreamReaderWrapper): # We consider the method of `read` in simple_reader to already have admission control @@ -151,44 +156,46 @@ def check_support_func(reader, outfile): write_int(support_flags, outfile) -def prepare_for_trigger_available_now_func(reader, outfile): +def prepare_for_trigger_available_now_func(reader: DataSourceStreamReader, outfile: IO) -> None: if isinstance(reader, _SimpleStreamReaderWrapper): if isinstance(reader.simple_reader, SupportsTriggerAvailableNow): reader.simple_reader.prepareForTriggerAvailableNow() else: - # FIXME: code for not supported? or should it be assertion? - raise Exception("prepareForTriggerAvailableNow is not supported by the " - "underlying simple reader.") + raise PySparkException( + "prepareForTriggerAvailableNow is not supported by the underlying simple reader." + ) else: if isinstance(reader, SupportsTriggerAvailableNow): reader.prepareForTriggerAvailableNow() else: - # FIXME: code for not supported? or should it be assertion? - raise Exception("prepareForTriggerAvailableNow is not supported by the " - "stream reader.") + raise PySparkException( + "prepareForTriggerAvailableNow is not supported by the stream reader." + ) write_int(0, outfile) -def latest_offset_admission_control_func(reader, infile, outfile): +def latest_offset_admission_control_func( + reader: DataSourceStreamReader, infile: IO, outfile: IO +) -> None: start_offset_dict = json.loads(utf8_deserializer.loads(infile)) limit = json.loads(utf8_deserializer.loads(infile)) limit_obj = READ_LIMIT_REGISTRY.get(limit["type"], limit) - offset = reader.latestOffset(start_offset_dict, limit_obj) + offset = reader.latestOffset(start_offset_dict, limit_obj) # type: ignore[call-arg] write_with_length(json.dumps(offset).encode("utf-8"), outfile) -def get_default_read_limit_func(reader, outfile): +def get_default_read_limit_func(reader: DataSourceStreamReader, outfile: IO) -> None: if isinstance(reader, SupportsAdmissionControl): limit = reader.getDefaultReadLimit() else: - limit = READ_ALL_AVAILABLE + limit = ReadAllAvailable() write_with_length(json.dumps(limit.dump()).encode("utf-8"), outfile) -def report_latest_offset_func(reader, outfile): +def report_latest_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None: if isinstance(reader, _SimpleStreamReaderWrapper): # We do not consider providing latest offset on simple stream reader. write_int(0, outfile) diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index b894a5fcfec6..911f7fc81007 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -148,10 +148,7 @@ class BasePythonStreamingDataSourceTestsMixin: SupportsAdmissionControl, ) - class TestDataStreamReader( - DataSourceStreamReader, - SupportsAdmissionControl - ): + class TestDataStreamReader(DataSourceStreamReader, SupportsAdmissionControl): def initialOffset(self): return {"partition-1": 0} @@ -163,8 +160,9 @@ class BasePythonStreamingDataSourceTestsMixin: if isinstance(readLimit, ReadAllAvailable): end_offset = start_idx + 10 else: - assert isinstance(readLimit, ReadMaxRows), ("Expected ReadMaxRows read limit but got " - + str(type(readLimit))) + assert isinstance( + readLimit, ReadMaxRows + ), "Expected ReadMaxRows read limit but got " + str(type(readLimit)) end_offset = start_idx + readLimit.max_rows return {"partition-1": end_offset} @@ -182,6 +180,7 @@ class BasePythonStreamingDataSourceTestsMixin: class TestDataSource(DataSource): def schema(self) -> str: return "id INT" + def streamReader(self, schema): return TestDataStreamReader() @@ -336,7 +335,7 @@ class BasePythonStreamingDataSourceTestsMixin: def read(self, start: dict): start_idx = start["offset"] end_offset = min(start_idx + 2, self.desired_end_offset["offset"]) - it = iter([(i, ) for i in range(start_idx, end_offset)]) + it = iter([(i,) for i in range(start_idx, end_offset)]) return (it, {"offset": end_offset}) def commit(self, end): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
