[
https://issues.apache.org/jira/browse/SPARK-54305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jitesh Soni updated SPARK-54305:
--------------------------------
Description:
h2. Problem
Python streaming data sources cannot control microbatch sizes because
DataSourceStreamReader.latestOffset() has no parameters to receive configured
limits. This forces Python sources to either process all available data
(unpredictable resource usage) or artificially limit offsets (risking data
loss).
Scala sources can implement SupportsAdmissionControl to properly control batch
sizes, but Python sources lack this capability.
h2. Solution
Extend the Python DataSource API to support admission control by:
1. Enhanced Python API: Update DataSourceStreamReader.latestOffset() to accept
optional start_offset and read_limit parameters
2. Scala Bridge: Modify PythonMicroBatchStream to implement
SupportsAdmissionControl
3. Serialization: Add ReadLimit serialization in PythonStreamingSourceRunner
4. Python Worker: Enhance python_streaming_source_runner.py to deserialize and
pass parameters
5. Monitoring: Add optional reportLatestOffset() method for observability
h2. Benefits
- Predictable performance with controlled batch sizes
- Rate limiting and backpressure support
- Feature parity with Scala DataSource capabilities
- Full backward compatibility (all parameters optional)
- Production-ready streaming sources
h2. Architectural Limitations in PySpark's DataSource API for Microbatch Size
The core problem is an architectural flaw in PySpark's DataSource API: its[
latestOffset()|https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/datasource.html?]
method is *"blind."* When Spark asks for the latest available data, this
method has no context about the stream's current position or the configured
batch size (maxBlocksPerTrigger). It can only report the absolute latest data
on the source.
The Scala MicroBatchStream API, however, is *"context-aware."* Its
latestOffset() method receives the startOffset for the batch and the configured
limit, giving it the necessary information to control the batch size.
h3. *Why This Matters: The Checkpointing Trap*
This difference becomes a major problem due to Spark's rigid processing order:
# Spark calls latestOffset() to get the end boundary for the batch.
# It *immediately saves this* *end* \{*}offset to the checkpoint log{*}. This
is the point of no return.
# {_}Then{_}, Spark calls partitions(start, end) where you must plan the work
for the _entire_ range from start to end.
This flow creates a trap. If you try to process only a subset of the data in
partitions() (e.g., start + limit blocks), you create a mismatch with the
checkpoint. Spark will use the _original_ large end offset to start the next
batch, causing permanent data loss.
Since data loss is not an option, you are left with no choice: you *must*
process the entire range from the start offset to whatever end offset was
returned by the blind latestOffset() method.
{*}The Result{*}: You cannot control the size of your micro-batches in Python.
Your stream is forced to process potentially massive, unpredictable chunks of
data at a time, making it unscalable and extremely fragile during failures and
backfills.
The official Spark-Kafka connector, written in Scala, correctly uses the
startOffset and limit parameters to calculate a properly capped end offset
_before_ Spark checkpoints it. This functionality is critically missing from
the Python API.
h2.
h2. Pull Request
PR: [https://github.com/apache/spark/pull/53001]
was:
h2. Problem
Python streaming data sources cannot control microbatch sizes because
DataSourceStreamReader.latestOffset() has no parameters to receive configured
limits. This forces Python sources to either process all available data
(unpredictable resource usage) or artificially limit offsets (risking data
loss).
Scala sources can implement SupportsAdmissionControl to properly control batch
sizes, but Python sources lack this capability.
h2. Solution
Extend the Python DataSource API to support admission control by:
1. Enhanced Python API: Update DataSourceStreamReader.latestOffset() to accept
optional start_offset and read_limit parameters
2. Scala Bridge: Modify PythonMicroBatchStream to implement
SupportsAdmissionControl
3. Serialization: Add ReadLimit serialization in PythonStreamingSourceRunner
4. Python Worker: Enhance python_streaming_source_runner.py to deserialize and
pass parameters
5. Monitoring: Add optional reportLatestOffset() method for observability
h2. Benefits
- Predictable performance with controlled batch sizes
- Rate limiting and backpressure support
- Feature parity with Scala DataSource capabilities
- Full backward compatibility (all parameters optional)
- Production-ready streaming sources
h2. Pull Request
PR: [https://github.com/apache/spark/pull/53001]
> Add admission control support to Python DataSource streaming API
> -----------------------------------------------------------------
>
> Key: SPARK-54305
> URL: https://issues.apache.org/jira/browse/SPARK-54305
> Project: Spark
> Issue Type: Improvement
> Components: PySpark, SQL, Structured Streaming
> Affects Versions: 4.1.0, 4.0.0, 4.0.1, 4.2.0
> Reporter: Jitesh Soni
> Priority: Major
> Fix For: 4.1.0, 4.0.0, 4.2.0
>
> Original Estimate: 16h
> Remaining Estimate: 16h
>
> h2. Problem
> Python streaming data sources cannot control microbatch sizes because
> DataSourceStreamReader.latestOffset() has no parameters to receive configured
> limits. This forces Python sources to either process all available data
> (unpredictable resource usage) or artificially limit offsets (risking data
> loss).
> Scala sources can implement SupportsAdmissionControl to properly control
> batch sizes, but Python sources lack this capability.
> h2. Solution
> Extend the Python DataSource API to support admission control by:
> 1. Enhanced Python API: Update DataSourceStreamReader.latestOffset() to
> accept optional start_offset and read_limit parameters
> 2. Scala Bridge: Modify PythonMicroBatchStream to implement
> SupportsAdmissionControl
> 3. Serialization: Add ReadLimit serialization in PythonStreamingSourceRunner
> 4. Python Worker: Enhance python_streaming_source_runner.py to deserialize
> and pass parameters
> 5. Monitoring: Add optional reportLatestOffset() method for observability
> h2. Benefits
> - Predictable performance with controlled batch sizes
> - Rate limiting and backpressure support
> - Feature parity with Scala DataSource capabilities
> - Full backward compatibility (all parameters optional)
> - Production-ready streaming sources
> h2. Architectural Limitations in PySpark's DataSource API for Microbatch Size
> The core problem is an architectural flaw in PySpark's DataSource API: its[
> latestOffset()|https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/datasource.html?]
> method is *"blind."* When Spark asks for the latest available data, this
> method has no context about the stream's current position or the configured
> batch size (maxBlocksPerTrigger). It can only report the absolute latest data
> on the source.
> The Scala MicroBatchStream API, however, is *"context-aware."* Its
> latestOffset() method receives the startOffset for the batch and the
> configured limit, giving it the necessary information to control the batch
> size.
> h3. *Why This Matters: The Checkpointing Trap*
> This difference becomes a major problem due to Spark's rigid processing order:
> # Spark calls latestOffset() to get the end boundary for the batch.
> # It *immediately saves this* *end* \{*}offset to the checkpoint log{*}.
> This is the point of no return.
> # {_}Then{_}, Spark calls partitions(start, end) where you must plan the
> work for the _entire_ range from start to end.
> This flow creates a trap. If you try to process only a subset of the data in
> partitions() (e.g., start + limit blocks), you create a mismatch with the
> checkpoint. Spark will use the _original_ large end offset to start the next
> batch, causing permanent data loss.
> Since data loss is not an option, you are left with no choice: you *must*
> process the entire range from the start offset to whatever end offset was
> returned by the blind latestOffset() method.
> {*}The Result{*}: You cannot control the size of your micro-batches in
> Python. Your stream is forced to process potentially massive, unpredictable
> chunks of data at a time, making it unscalable and extremely fragile during
> failures and backfills.
> The official Spark-Kafka connector, written in Scala, correctly uses the
> startOffset and limit parameters to calculate a properly capped end offset
> _before_ Spark checkpoints it. This functionality is critically missing from
> the Python API.
> h2.
> h2. Pull Request
> PR: [https://github.com/apache/spark/pull/53001]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]