[ 
https://issues.apache.org/jira/browse/SPARK-55450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55450:
-----------------------------------
    Labels: pull-request-available  (was: )

> Document how to use Admission Control and Trigger.AvailableNow in PySpark 
> custom streaming data sources   
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55450
>                 URL: https://issues.apache.org/jira/browse/SPARK-55450
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jitesh Soni
>            Priority: Minor
>              Labels: pull-request-available
>
> Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
> Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
> sources, bringing feature parity with Scala implementations.
> This ticket tracks the creation of user-facing documentation with practical 
> examples showing how to implement these features in custom Python data 
> sources.
> h2. Key Features to Document
>  * *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
> {{ReadLimit}} parameters.
>  * *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
> data consumption limits.
>  * *Optional {{reportLatestOffset()}}* – Enables tracking available data 
> without consumption.
>  * *ReadLimit framework* – Built-in implementations for controlling data 
> volume.
>  * *{{Trigger.AvailableNow}} support* – Via the 
> {{SupportsTriggerAvailableNow}} mixin interface.
> h2. Usage Examples
> h3. Example 1: Basic Streaming Reader with Admission Control
>  
> {{from pyspark.sql.datasource import ( 
> DataSourceStreamReader, 
> InputPartition, 
> ReadLimit, 
> )
> class MyStreamReader(DataSourceStreamReader): 
> """ 
> Custom streaming reader with admission control support. 
> """
> def initialOffset(self): 
> """Return the initial offset for the stream.""" 
> return \{"offset": 0}
> def latestOffset(self, start, limit): 
> """ 
> Get the latest offset respecting the read limit.
> Args: 
> start: The starting offset. 
> limit: ReadLimit object controlling data volume. 
> """ 
> current_offset = start.get("offset", 0)
>  # Check available data
> available_records = self._count_available_records()
>  # Apply limit if specified
> if isinstance(limit, ReadLimit): 
> if hasattr(limit, "maxRows"): 
> records_to_read = min(available_records, limit.maxRows()) 
> elif hasattr(limit, "maxFiles"): 
> records_to_read = min( 
> available_records, 
> limit.maxFiles() * self.records_per_file, 
> ) 
> else: 
> records_to_read = available_records 
> else: 
> records_to_read = available_records
> return \{"offset": current_offset + records_to_read}
> def getDefaultReadLimit(self): 
> """ 
> Optional: Specify default read limit for this source. 
> """ 
> from pyspark.sql.datasource import ReadMaxRows
>  # Read at most 1000 rows per batch
> return ReadMaxRows(1000)
> def read(self, start, end): 
> """Read data between start and end offsets.""" 
> start_offset = start.get("offset", 0) 
> end_offset = end.get("offset", 0)
>  # Return InputPartition instances for the data range
> return [MyInputPartition(start_offset, end_offset)] }}
> h3. Example 2: Using `Trigger.AvailableNow`
>  
> {{from pyspark.sql.datasource import ( 
> DataSourceStreamReader, 
> SupportsTriggerAvailableNow, 
> )
> class SnapshotStreamReader(DataSourceStreamReader, 
> SupportsTriggerAvailableNow): 
> """ 
> Streaming reader supporting Trigger.AvailableNow for snapshot processing. 
> """
> def prepareForTriggerAvailableNow(self): 
> """ 
> Prepare source for snapshot-based trigger.
> Called once when Trigger.AvailableNow is used. 
> """
>  # Capture snapshot of available data
> self.snapshot_offset = self._capture_current_state() 
> print(f"Snapshot captured at offset: \{self.snapshot_offset}")
> def latestOffset(self, start, limit): 
> """ 
> When using Trigger.AvailableNow, this should return the snapshot offset. 
> """ 
> if hasattr(self, "snapshot_offset"):
>  # Return snapshot boundary
> return \{"offset": self.snapshot_offset} 
> else:
>  # Normal streaming mode
> return \{"offset": self._get_current_offset()}
>  # ... other required methods ...
>  # Using the source with Trigger.AvailableNow
> df = ( 
> spark.readStream 
> .format("mySnapshotSource") 
> .load() 
> )
> query = ( 
> df.writeStream 
> .trigger(availableNow=True) 
> .format("console") 
> .start() 
> )
> query.awaitTermination() }}
> h3. Example 3: Using `reportLatestOffset` for Monitoring
>  
> {{from pyspark.sql.datasource import DataSourceStreamReader
> class MonitoredStreamReader(DataSourceStreamReader): 
> """ 
> Reader that reports available data without consuming it. 
> """
> def reportLatestOffset(self): 
> """ 
> Optional: Report latest available offset without side effects. 
> Used for monitoring and metrics. 
> """ 
> return \{"offset": self._peek_latest_offset()}
> def latestOffset(self, start, limit): 
> """ 
> Actual offset method that may have side effects. 
> """ 
> latest = self._fetch_and_mark_offset(start, limit) 
> return \{"offset": latest} }}
> h3. Example 4: `ReadLimit` Types
>  
> {{from pyspark.sql.streaming import ReadLimit
>  # Available ReadLimit implementations:
>  # 1. ReadAllAvailable() - Read all available data.
>  # 2. ReadMinRows(n) - Read at least n rows.
>  # 3. ReadMaxRows(n) - Read at most n rows.
>  # 4. ReadMaxFiles(n) - Read at most n files.
>  # 5. ReadMaxBytes(n) - Read at most n bytes.
>  # Configure read limits in stream options
> query = ( 
> spark.readStream 
> .format("mySource") 
> .option("maxFilesPerTrigger", "100") 
> .option("maxBytesPerTrigger", "10mb") 
> .load() 
> .writeStream 
> .format("console") 
> .start() 
> ) }}
> h2. Backward Compatibility Note
> The implementation automatically detects old-style {{latestOffset()}} methods 
> (without parameters) using Python introspection, ensuring existing 
> implementations continue to work without modification.
>  
> {{# Old style – still supported 
> def latestOffset(self): 
> return \{"offset": 100}
>  # New style – recommended
> def latestOffset(self, start, limit): 
> return \{"offset": 100} }}
> h2. Reference
>  * PR: [https://github.com/apache/spark/pull/54085]
>  * JIRA: SPARK-55304
> h2. Acceptance Criteria
>  * Code examples tested and validated.
>  * Examples added to PySpark examples directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to