[
https://issues.apache.org/jira/browse/SPARK-55450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jitesh Soni updated SPARK-55450:
--------------------------------
Description:
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports
Admission Control and {{Trigger.AvailableNow}} for custom streaming data
sources, bringing feature parity with Scala implementations.
This ticket tracks the creation of user-facing documentation with practical
examples showing how to implement these features in custom Python data sources.
h2. Key Features to Document
* *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and
{{ReadLimit}} parameters.
* *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred
data consumption limits.
* *Optional {{reportLatestOffset()}}* – Enables tracking available data
without consumption.
* *ReadLimit framework* – Built-in implementations for controlling data volume.
* *{{Trigger.AvailableNow}} support* – Via the {{SupportsTriggerAvailableNow}}
mixin interface.
h2. Usage Examples
h3. Example 1: Basic Streaming Reader with Admission Control
{{from pyspark.sql.datasource import (
DataSourceStreamReader,
InputPartition,
ReadLimit,
)
class MyStreamReader(DataSourceStreamReader):
"""
Custom streaming reader with admission control support.
"""
def initialOffset(self):
"""Return the initial offset for the stream."""
return \{"offset": 0}
def latestOffset(self, start, limit):
"""
Get the latest offset respecting the read limit.
Args:
start: The starting offset.
limit: ReadLimit object controlling data volume.
"""
current_offset = start.get("offset", 0)
# Check available data
available_records = self._count_available_records()
# Apply limit if specified
if isinstance(limit, ReadLimit):
if hasattr(limit, "maxRows"):
records_to_read = min(available_records, limit.maxRows())
elif hasattr(limit, "maxFiles"):
records_to_read = min(
available_records,
limit.maxFiles() * self.records_per_file,
)
else:
records_to_read = available_records
else:
records_to_read = available_records
return \{"offset": current_offset + records_to_read}
def getDefaultReadLimit(self):
"""
Optional: Specify default read limit for this source.
"""
from pyspark.sql.datasource import ReadMaxRows
# Read at most 1000 rows per batch
return ReadMaxRows(1000)
def read(self, start, end):
"""Read data between start and end offsets."""
start_offset = start.get("offset", 0)
end_offset = end.get("offset", 0)
# Return InputPartition instances for the data range
return [MyInputPartition(start_offset, end_offset)] }}
h3. Example 2: Using `Trigger.AvailableNow`
{{from pyspark.sql.datasource import (
DataSourceStreamReader,
SupportsTriggerAvailableNow,
)
class SnapshotStreamReader(DataSourceStreamReader,
SupportsTriggerAvailableNow):
"""
Streaming reader supporting Trigger.AvailableNow for snapshot processing.
"""
def prepareForTriggerAvailableNow(self):
"""
Prepare source for snapshot-based trigger.
Called once when Trigger.AvailableNow is used.
"""
# Capture snapshot of available data
self.snapshot_offset = self._capture_current_state()
print(f"Snapshot captured at offset: \{self.snapshot_offset}")
def latestOffset(self, start, limit):
"""
When using Trigger.AvailableNow, this should return the snapshot offset.
"""
if hasattr(self, "snapshot_offset"):
# Return snapshot boundary
return \{"offset": self.snapshot_offset}
else:
# Normal streaming mode
return \{"offset": self._get_current_offset()}
# ... other required methods ...
# Using the source with Trigger.AvailableNow
df = (
spark.readStream
.format("mySnapshotSource")
.load()
)
query = (
df.writeStream
.trigger(availableNow=True)
.format("console")
.start()
)
query.awaitTermination() }}
h3. Example 3: Using `reportLatestOffset` for Monitoring
{{from pyspark.sql.datasource import DataSourceStreamReader
class MonitoredStreamReader(DataSourceStreamReader):
"""
Reader that reports available data without consuming it.
"""
def reportLatestOffset(self):
"""
Optional: Report latest available offset without side effects.
Used for monitoring and metrics.
"""
return \{"offset": self._peek_latest_offset()}
def latestOffset(self, start, limit):
"""
Actual offset method that may have side effects.
"""
latest = self._fetch_and_mark_offset(start, limit)
return \{"offset": latest} }}
h3. Example 4: `ReadLimit` Types
{{from pyspark.sql.streaming import ReadLimit
# Available ReadLimit implementations:
# 1. ReadAllAvailable() - Read all available data.
# 2. ReadMinRows(n) - Read at least n rows.
# 3. ReadMaxRows(n) - Read at most n rows.
# 4. ReadMaxFiles(n) - Read at most n files.
# 5. ReadMaxBytes(n) - Read at most n bytes.
# Configure read limits in stream options
query = (
spark.readStream
.format("mySource")
.option("maxFilesPerTrigger", "100")
.option("maxBytesPerTrigger", "10mb")
.load()
.writeStream
.format("console")
.start()
) }}
h2. Backward Compatibility Note
The implementation automatically detects old-style {{latestOffset()}} methods
(without parameters) using Python introspection, ensuring existing
implementations continue to work without modification.
{{# Old style – still supported
def latestOffset(self):
return \{"offset": 100}
# New style – recommended
def latestOffset(self, start, limit):
return \{"offset": 100} }}
h2. Reference
* PR: [https://github.com/apache/spark/pull/54085]
* JIRA: SPARK-55304
h2. Acceptance Criteria
* Code examples tested and validated.
* Examples added to PySpark examples directory.
was:
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports
Admission Control and {{Trigger.AvailableNow}} for custom streaming data
sources, bringing feature parity with Scala implementations.
This ticket tracks the creation of user-facing documentation with practical
examples showing how to implement these features in custom Python data sources.
h2. Key Features to Document
# *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and
{{ReadLimit}} parameters.
# *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred
data consumption limits.
# *Optional {{reportLatestOffset()}}* – Enables tracking available data
without consumption.
# *ReadLimit framework* – Built-in implementations for controlling data volume.
# *{{Trigger.AvailableNow}} support* – Via the {{SupportsTriggerAvailableNow}}
mixin interface.
h2. Usage Examples
h3. Example 1: Basic Streaming Reader with Admission Control
{{from pyspark.sql.datasource import (
DataSourceStreamReader,
InputPartition,
ReadLimit,
)
class MyStreamReader(DataSourceStreamReader):
"""
Custom streaming reader with admission control support.
"""
def initialOffset(self):
"""Return the initial offset for the stream."""
return \{"offset": 0}
def latestOffset(self, start, limit):
"""
Get the latest offset respecting the read limit.
Args:
start: The starting offset.
limit: ReadLimit object controlling data volume.
"""
current_offset = start.get("offset", 0)
# Check available data
available_records = self._count_available_records()
# Apply limit if specified
if isinstance(limit, ReadLimit):
if hasattr(limit, "maxRows"):
records_to_read = min(available_records, limit.maxRows())
elif hasattr(limit, "maxFiles"):
records_to_read = min(
available_records,
limit.maxFiles() * self.records_per_file,
)
else:
records_to_read = available_records
else:
records_to_read = available_records
return \{"offset": current_offset + records_to_read}
def getDefaultReadLimit(self):
"""
Optional: Specify default read limit for this source.
"""
from pyspark.sql.datasource import ReadMaxRows
# Read at most 1000 rows per batch
return ReadMaxRows(1000)
def read(self, start, end):
"""Read data between start and end offsets."""
start_offset = start.get("offset", 0)
end_offset = end.get("offset", 0)
# Return InputPartition instances for the data range
return [MyInputPartition(start_offset, end_offset)] }}
h3. Example 2: Using `Trigger.AvailableNow`
{{from pyspark.sql.datasource import (
DataSourceStreamReader,
SupportsTriggerAvailableNow,
)
class SnapshotStreamReader(DataSourceStreamReader,
SupportsTriggerAvailableNow):
"""
Streaming reader supporting Trigger.AvailableNow for snapshot processing.
"""
def prepareForTriggerAvailableNow(self):
"""
Prepare source for snapshot-based trigger.
Called once when Trigger.AvailableNow is used.
"""
# Capture snapshot of available data
self.snapshot_offset = self._capture_current_state()
print(f"Snapshot captured at offset: \{self.snapshot_offset}")
def latestOffset(self, start, limit):
"""
When using Trigger.AvailableNow, this should return the snapshot
offset.
"""
if hasattr(self, "snapshot_offset"):
# Return snapshot boundary
return \{"offset": self.snapshot_offset}
else:
# Normal streaming mode
return \{"offset": self._get_current_offset()}
# ... other required methods ...
# Using the source with Trigger.AvailableNow
df = (
spark.readStream
.format("mySnapshotSource")
.load()
)
query = (
df.writeStream
.trigger(availableNow=True)
.format("console")
.start()
)
query.awaitTermination() }}
h3. Example 3: Using `reportLatestOffset` for Monitoring
{{from pyspark.sql.datasource import DataSourceStreamReader
class MonitoredStreamReader(DataSourceStreamReader):
"""
Reader that reports available data without consuming it.
"""
def reportLatestOffset(self):
"""
Optional: Report latest available offset without side effects.
Used for monitoring and metrics.
"""
return \{"offset": self._peek_latest_offset()}
def latestOffset(self, start, limit):
"""
Actual offset method that may have side effects.
"""
latest = self._fetch_and_mark_offset(start, limit)
return \{"offset": latest} }}
h3. Example 4: `ReadLimit` Types
{{from pyspark.sql.streaming import ReadLimit
# Available ReadLimit implementations:
# 1. ReadAllAvailable() - Read all available data.
# 2. ReadMinRows(n) - Read at least n rows.
# 3. ReadMaxRows(n) - Read at most n rows.
# 4. ReadMaxFiles(n) - Read at most n files.
# 5. ReadMaxBytes(n) - Read at most n bytes.
# Configure read limits in stream options
query = (
spark.readStream
.format("mySource")
.option("maxFilesPerTrigger", "100")
.option("maxBytesPerTrigger", "10mb")
.load()
.writeStream
.format("console")
.start()
) }}
h2. Backward Compatibility Note
The implementation automatically detects old-style {{latestOffset()}} methods
(without parameters) using Python introspection, ensuring existing
implementations continue to work without modification.
{{# Old style – still supported
def latestOffset(self):
return \{"offset": 100}
# New style – recommended
def latestOffset(self, start, limit):
return \{"offset": 100} }}
h2. Reference
* PR: https://github.com/apache/spark/pull/54085
* JIRA: SPARK-55304
h2. Acceptance Criteria
* User guide documentation added to Spark documentation.
* API documentation updated with parameter descriptions.
* Code examples tested and validated.
* Migration guide for existing implementations.
* Examples added to PySpark examples directory.
> Document how to use Admission Control and Trigger.AvailableNow in PySpark
> custom streaming data sources
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55450
> URL: https://issues.apache.org/jira/browse/SPARK-55450
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jitesh Soni
> Priority: Minor
>
> Following the implementation in SPARK-55304 (PR #54085), PySpark now supports
> Admission Control and {{Trigger.AvailableNow}} for custom streaming data
> sources, bringing feature parity with Scala implementations.
> This ticket tracks the creation of user-facing documentation with practical
> examples showing how to implement these features in custom Python data
> sources.
> h2. Key Features to Document
> * *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and
> {{ReadLimit}} parameters.
> * *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred
> data consumption limits.
> * *Optional {{reportLatestOffset()}}* – Enables tracking available data
> without consumption.
> * *ReadLimit framework* – Built-in implementations for controlling data
> volume.
> * *{{Trigger.AvailableNow}} support* – Via the
> {{SupportsTriggerAvailableNow}} mixin interface.
> h2. Usage Examples
> h3. Example 1: Basic Streaming Reader with Admission Control
>
> {{from pyspark.sql.datasource import (
> DataSourceStreamReader,
> InputPartition,
> ReadLimit,
> )
> class MyStreamReader(DataSourceStreamReader):
> """
> Custom streaming reader with admission control support.
> """
> def initialOffset(self):
> """Return the initial offset for the stream."""
> return \{"offset": 0}
> def latestOffset(self, start, limit):
> """
> Get the latest offset respecting the read limit.
> Args:
> start: The starting offset.
> limit: ReadLimit object controlling data volume.
> """
> current_offset = start.get("offset", 0)
> # Check available data
> available_records = self._count_available_records()
> # Apply limit if specified
> if isinstance(limit, ReadLimit):
> if hasattr(limit, "maxRows"):
> records_to_read = min(available_records, limit.maxRows())
> elif hasattr(limit, "maxFiles"):
> records_to_read = min(
> available_records,
> limit.maxFiles() * self.records_per_file,
> )
> else:
> records_to_read = available_records
> else:
> records_to_read = available_records
> return \{"offset": current_offset + records_to_read}
> def getDefaultReadLimit(self):
> """
> Optional: Specify default read limit for this source.
> """
> from pyspark.sql.datasource import ReadMaxRows
> # Read at most 1000 rows per batch
> return ReadMaxRows(1000)
> def read(self, start, end):
> """Read data between start and end offsets."""
> start_offset = start.get("offset", 0)
> end_offset = end.get("offset", 0)
> # Return InputPartition instances for the data range
> return [MyInputPartition(start_offset, end_offset)] }}
> h3. Example 2: Using `Trigger.AvailableNow`
>
> {{from pyspark.sql.datasource import (
> DataSourceStreamReader,
> SupportsTriggerAvailableNow,
> )
> class SnapshotStreamReader(DataSourceStreamReader,
> SupportsTriggerAvailableNow):
> """
> Streaming reader supporting Trigger.AvailableNow for snapshot processing.
> """
> def prepareForTriggerAvailableNow(self):
> """
> Prepare source for snapshot-based trigger.
> Called once when Trigger.AvailableNow is used.
> """
> # Capture snapshot of available data
> self.snapshot_offset = self._capture_current_state()
> print(f"Snapshot captured at offset: \{self.snapshot_offset}")
> def latestOffset(self, start, limit):
> """
> When using Trigger.AvailableNow, this should return the snapshot offset.
> """
> if hasattr(self, "snapshot_offset"):
> # Return snapshot boundary
> return \{"offset": self.snapshot_offset}
> else:
> # Normal streaming mode
> return \{"offset": self._get_current_offset()}
> # ... other required methods ...
> # Using the source with Trigger.AvailableNow
> df = (
> spark.readStream
> .format("mySnapshotSource")
> .load()
> )
> query = (
> df.writeStream
> .trigger(availableNow=True)
> .format("console")
> .start()
> )
> query.awaitTermination() }}
> h3. Example 3: Using `reportLatestOffset` for Monitoring
>
> {{from pyspark.sql.datasource import DataSourceStreamReader
> class MonitoredStreamReader(DataSourceStreamReader):
> """
> Reader that reports available data without consuming it.
> """
> def reportLatestOffset(self):
> """
> Optional: Report latest available offset without side effects.
> Used for monitoring and metrics.
> """
> return \{"offset": self._peek_latest_offset()}
> def latestOffset(self, start, limit):
> """
> Actual offset method that may have side effects.
> """
> latest = self._fetch_and_mark_offset(start, limit)
> return \{"offset": latest} }}
> h3. Example 4: `ReadLimit` Types
>
> {{from pyspark.sql.streaming import ReadLimit
> # Available ReadLimit implementations:
> # 1. ReadAllAvailable() - Read all available data.
> # 2. ReadMinRows(n) - Read at least n rows.
> # 3. ReadMaxRows(n) - Read at most n rows.
> # 4. ReadMaxFiles(n) - Read at most n files.
> # 5. ReadMaxBytes(n) - Read at most n bytes.
> # Configure read limits in stream options
> query = (
> spark.readStream
> .format("mySource")
> .option("maxFilesPerTrigger", "100")
> .option("maxBytesPerTrigger", "10mb")
> .load()
> .writeStream
> .format("console")
> .start()
> ) }}
> h2. Backward Compatibility Note
> The implementation automatically detects old-style {{latestOffset()}} methods
> (without parameters) using Python introspection, ensuring existing
> implementations continue to work without modification.
>
> {{# Old style – still supported
> def latestOffset(self):
> return \{"offset": 100}
> # New style – recommended
> def latestOffset(self, start, limit):
> return \{"offset": 100} }}
> h2. Reference
> * PR: [https://github.com/apache/spark/pull/54085]
> * JIRA: SPARK-55304
> h2. Acceptance Criteria
> * Code examples tested and validated.
> * Examples added to PySpark examples directory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]