Jitesh Soni created SPARK-55450:
-----------------------------------
Summary: Document how to use Admission Control and
Trigger.AvailableNow in PySpark custom streaming data sources
Key: SPARK-55450
URL: https://issues.apache.org/jira/browse/SPARK-55450
Project: Spark
Issue Type: Improvement
Components: Structured Streaming
Affects Versions: 4.2.0
Reporter: Jitesh Soni
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports
Admission Control and Trigger.AvailableNow for custom streaming data sources,
bringing feature parity with Scala implementations.
This ticket tracks the creation of user-facing
documentation with practical examples showing how to implement these features
in custom Python data sources.
## Key Features to Document
1. ***Updated `latestOffset()` signature*** - Now accepts `start` offset and
`ReadLimit` parameters
2. ***Optional `getDefaultReadLimit()`*** - Allows sources to specify
preferred data consumption limits
3. ***Optional `reportLatestOffset()`*** - Enables tracking available data
without consumption
4. ***ReadLimit framework*** - Built-in implementations for controlling data
volume
5. ***Trigger.AvailableNow support*** - Via `SupportsTriggerAvailableNow`
mixin interface
## Usage Examples
### Example 1: Basic Streaming Reader with Admission Control
```python
from pyspark.sql.datasource import (
DataSourceStreamReader,
InputPartition,
ReadLimit
)
class MyStreamReader(DataSourceStreamReader):
"""
Custom streaming reader with admission control support
"""
def initialOffset(self):
"""Return the initial offset for the stream"""
return \{"offset": 0}
def latestOffset(self, start, limit):
"""
Get the latest offset respecting the read limit.
Args:
start: The starting offset
limit: ReadLimit object controlling data volume
"""
current_offset = start.get("offset", 0)
# Check available data
available_records = self._count_available_records()
# Apply limit if specified
if isinstance(limit, ReadLimit):
if hasattr(limit, 'maxRows'):
records_to_read = min(available_records, limit.maxRows())
elif hasattr(limit, 'maxFiles'):
records_to_read = min(available_records, limit.maxFiles() *
self.records_per_file)
else:
records_to_read = available_records
else:
records_to_read = available_records
return \{"offset": current_offset + records_to_read}
def getDefaultReadLimit(self):
"""
Optional: Specify default read limit for this source
"""
from pyspark.sql.datasource import ReadMaxRows
return ReadMaxRows(1000) # Read at most 1000 rows per batch
def read(self, start, end):
"""Read data between start and end offsets"""
start_offset = start.get("offset", 0)
end_offset = end.get("offset", 0)
# Return InputPartition instances for the data range
return [MyInputPartition(start_offset, end_offset)]
*Example 2: Using Trigger.AvailableNow*
from pyspark.sql.datasource import (
DataSourceStreamReader,
SupportsTriggerAvailableNow
)
class SnapshotStreamReader(DataSourceStreamReader,
SupportsTriggerAvailableNow):
"""
Streaming reader supporting Trigger.AvailableNow for snapshot processing
"""
def prepareForTriggerAvailableNow(self):
"""
Prepare source for snapshot-based trigger.
Called once when Trigger.AvailableNow is used.
"""
# Capture snapshot of available data
self.snapshot_offset = self._capture_current_state()
print(f"Snapshot captured at offset: \{self.snapshot_offset}")
def latestOffset(self, start, limit):
"""
When using Trigger.AvailableNow, this should return the snapshot
offset
"""
if hasattr(self, 'snapshot_offset'):
# Return snapshot boundary
return {"offset": self.snapshot_offset}
else:
# Normal streaming mode
return {"offset": self._get_current_offset()}
# ... other required methods ...
# Using the source with Trigger.AvailableNow
df = spark.readStream \
.format("mySnapshotSource") \
.load()
query = df.writeStream \
.trigger(availableNow=True) \
.format("console") \
.start()
query.awaitTermination()
*Example 3: Using reportLatestOffset for Monitoring*
class MonitoredStreamReader(DataSourceStreamReader):
"""
Reader that reports available data without consuming it
"""
def reportLatestOffset(self):
"""
Optional: Report latest available offset without side effects.
Used for monitoring and metrics.
"""
return {"offset": self._peek_latest_offset()}
def latestOffset(self, start, limit):
"""
Actual offset method that may have side effects
"""
latest = self._fetch_and_mark_offset(start, limit)
return {"offset": latest}
*Example 4: ReadLimit Types*
from pyspark.sql.streaming import ReadLimit
# Available ReadLimit implementations:
# 1. ReadAllAvailable - Read all available data
# 2. ReadMinRows(n) - Read at least n rows
# 3. ReadMaxRows(n) - Read at most n rows
# 4. ReadMaxFiles(n) - Read at most n files
# 5. ReadMaxBytes(n) - Read at most n bytes
# Configure read limits in stream options
query = spark.readStream \
.format("mySource") \
.option("maxFilesPerTrigger", "100") \
.option("maxBytesPerTrigger", "10mb") \
.load() \
.writeStream \
.format("console") \
.start()
*Backward Compatibility Note*
The implementation automatically detects old-style latestOffset() methods
(without parameters) using Python introspection, ensuring existing
implementations continue to work without modification.
# Old style - still supported
def latestOffset(self):
return {"offset": 100}
# New style - recommended
def latestOffset(self, start, limit):
return {"offset": 100}
*Reference*
- PR: https://github.com/apache/spark/pull/54085
- JIRA: SPARK-55304
*Acceptance Criteria*
- User guide documentation added to Spark documentation
- API documentation updated with parameter descriptions
- Code examples tested and validated
- Migration guide for existing implementations
- Examples added to PySpark examples directory
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
