[
https://issues.apache.org/jira/browse/SPARK-54305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-54305:
-----------------------------------
Labels: pull-request-available (was: )
> Add admission control support to Python DataSource streaming API
> -----------------------------------------------------------------
>
> Key: SPARK-54305
> URL: https://issues.apache.org/jira/browse/SPARK-54305
> Project: Spark
> Issue Type: Improvement
> Components: PySpark, SQL, Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jitesh Soni
> Priority: Major
> Labels: pull-request-available
> Original Estimate: 16h
> Remaining Estimate: 16h
>
> h2. Problem
> Python streaming data sources cannot control microbatch sizes because
> DataSourceStreamReader.latestOffset() has no parameters to receive configured
> limits. This forces Python sources to either process all available data
> (unpredictable resource usage) or artificially limit offsets (risking data
> loss).
> Scala sources can implement SupportsAdmissionControl to properly control
> batch sizes, but Python sources lack this capability.
> h2. Solution
> Extend the Python DataSource API to support admission control by:
> 1. Enhanced Python API: Update DataSourceStreamReader.latestOffset() to
> accept optional start_offset and read_limit parameters
> 2. Scala Bridge: Modify PythonMicroBatchStream to implement
> SupportsAdmissionControl
> 3. Serialization: Add ReadLimit serialization in PythonStreamingSourceRunner
> 4. Python Worker: Enhance python_streaming_source_runner.py to deserialize
> and pass parameters
> 5. Monitoring: Add optional reportLatestOffset() method for observability
> h2. Benefits
> - Predictable performance with controlled batch sizes
> - Rate limiting and backpressure support
> - Feature parity with Scala DataSource capabilities
> - Full backward compatibility (all parameters optional)
> - Production-ready streaming sources
> h2. Architectural Limitations in PySpark's DataSource API for Microbatch Size
> The core problem is an architectural flaw in PySpark's DataSource API: its[
> latestOffset()|https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/datasource.html?]
> method is *"blind."* When Spark asks for the latest available data, this
> method has no context about the stream's current position or the configured
> batch size (maxBlocksPerTrigger). It can only report the absolute latest data
> on the source.
> The Scala MicroBatchStream API, however, is *"context-aware."* Its
> latestOffset() method receives the startOffset for the batch and the
> configured limit, giving it the necessary information to control the batch
> size.
> h3. *Why This Matters: The Checkpointing Trap*
> This difference becomes a major problem due to Spark's rigid processing order:
> # Spark calls latestOffset() to get the end boundary for the batch.
> # It *immediately saves this* *end* \{*}offset to the checkpoint log{*}.
> This is the point of no return.
> # {_}Then{_}, Spark calls partitions(start, end) where you must plan the
> work for the _entire_ range from start to end.
> This flow creates a trap. If you try to process only a subset of the data in
> partitions() (e.g., start + limit blocks), you create a mismatch with the
> checkpoint. Spark will use the _original_ large end offset to start the next
> batch, causing permanent data loss.
> Since data loss is not an option, you are left with no choice: you *must*
> process the entire range from the start offset to whatever end offset was
> returned by the blind latestOffset() method.
> {*}The Result{*}: You cannot control the size of your micro-batches in
> Python. Your stream is forced to process potentially massive, unpredictable
> chunks of data at a time, making it unscalable and extremely fragile during
> failures and backfills.
> The official Spark-Kafka connector, written in Scala, correctly uses the
> startOffset and limit parameters to calculate a properly capped end offset
> _before_ Spark checkpoints it. This functionality is critically missing from
> the Python API.
> h2.
> h2. Pull Request
> PR: [https://github.com/apache/spark/pull/53001]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]