[ 
https://issues.apache.org/jira/browse/SPARK-54305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-54305:
----------------------------------
    Fix Version/s:     (was: 4.0.0)
                       (was: 4.1.0)
                       (was: 4.2.0)

>       Add admission control support to Python DataSource streaming API
> -----------------------------------------------------------------
>
>                 Key: SPARK-54305
>                 URL: https://issues.apache.org/jira/browse/SPARK-54305
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL, Structured Streaming
>    Affects Versions: 4.1.0, 4.0.0, 4.0.1, 4.2.0
>            Reporter: Jitesh Soni
>            Priority: Major
>   Original Estimate: 16h
>  Remaining Estimate: 16h
>
> h2. Problem
> Python streaming data sources cannot control microbatch sizes because 
> DataSourceStreamReader.latestOffset() has no parameters to receive configured 
> limits. This forces Python sources to either process all available data 
> (unpredictable resource usage) or artificially limit offsets (risking data 
> loss).
> Scala sources can implement SupportsAdmissionControl to properly control 
> batch sizes, but Python sources lack this capability.
> h2. Solution
> Extend the Python DataSource API to support admission control by:
> 1. Enhanced Python API: Update DataSourceStreamReader.latestOffset() to 
> accept optional start_offset and read_limit parameters
> 2. Scala Bridge: Modify PythonMicroBatchStream to implement 
> SupportsAdmissionControl
> 3. Serialization: Add ReadLimit serialization in PythonStreamingSourceRunner
> 4. Python Worker: Enhance python_streaming_source_runner.py to deserialize 
> and pass parameters
> 5. Monitoring: Add optional reportLatestOffset() method for observability
> h2. Benefits
>  - Predictable performance with controlled batch sizes
>  - Rate limiting and backpressure support
>  - Feature parity with Scala DataSource capabilities
>  - Full backward compatibility (all parameters optional)
>  - Production-ready streaming sources
> h2. Architectural Limitations in PySpark's DataSource API for Microbatch Size
> The core problem is an architectural flaw in PySpark's DataSource API: its[ 
> latestOffset()|https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/datasource.html?]
>  method is *"blind."* When Spark asks for the latest available data, this 
> method has no context about the stream's current position or the configured 
> batch size (maxBlocksPerTrigger). It can only report the absolute latest data 
> on the source.
> The Scala MicroBatchStream API, however, is *"context-aware."* Its 
> latestOffset() method receives the startOffset for the batch and the 
> configured limit, giving it the necessary information to control the batch 
> size.
> h3. *Why This Matters: The Checkpointing Trap*
> This difference becomes a major problem due to Spark's rigid processing order:
>  # Spark calls latestOffset() to get the end boundary for the batch.
>  # It *immediately saves this* *end* \{*}offset to the checkpoint log{*}. 
> This is the point of no return.
>  # {_}Then{_}, Spark calls partitions(start, end) where you must plan the 
> work for the _entire_ range from start to end.
> This flow creates a trap. If you try to process only a subset of the data in 
> partitions() (e.g., start + limit blocks), you create a mismatch with the 
> checkpoint. Spark will use the _original_ large end offset to start the next 
> batch, causing permanent data loss.
> Since data loss is not an option, you are left with no choice: you *must* 
> process the entire range from the start offset to whatever end offset was 
> returned by the blind latestOffset() method.
> {*}The Result{*}: You cannot control the size of your micro-batches in 
> Python. Your stream is forced to process potentially massive, unpredictable 
> chunks of data at a time, making it unscalable and extremely fragile during 
> failures and backfills.
> The official Spark-Kafka connector, written in Scala, correctly uses the 
> startOffset and limit parameters to calculate a properly capped end offset 
> _before_ Spark checkpoints it. This functionality is critically missing from 
> the Python API.
> h2.  
> h2. Pull Request
> PR: [https://github.com/apache/spark/pull/53001]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to