[ 
https://issues.apache.org/jira/browse/SPARK-55450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jitesh Soni updated SPARK-55450:
--------------------------------
    Description: 
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
sources, bringing feature parity with Scala implementations.

This ticket tracks the creation of user-facing documentation with practical 
examples showing how to implement these features in custom Python data sources.
h2. Key Features to Document
 * *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
{{ReadLimit}} parameters.
 * *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
data consumption limits.
 * *Optional {{reportLatestOffset()}}* – Enables tracking available data 
without consumption.
 * *ReadLimit framework* – Built-in implementations for controlling data volume.
 * *{{Trigger.AvailableNow}} support* – Via the {{SupportsTriggerAvailableNow}} 
mixin interface.

h2. Usage Examples
h3. Example 1: Basic Streaming Reader with Admission Control

 

{{from pyspark.sql.datasource import ( 
DataSourceStreamReader, 
InputPartition, 
ReadLimit, 
)

class MyStreamReader(DataSourceStreamReader): 
""" 
Custom streaming reader with admission control support. 
"""

def initialOffset(self): 
"""Return the initial offset for the stream.""" 
return \{"offset": 0}

def latestOffset(self, start, limit): 
""" 
Get the latest offset respecting the read limit.

Args: 
start: The starting offset. 
limit: ReadLimit object controlling data volume. 
""" 
current_offset = start.get("offset", 0)
 # Check available data
available_records = self._count_available_records()

 # Apply limit if specified
if isinstance(limit, ReadLimit): 
if hasattr(limit, "maxRows"): 
records_to_read = min(available_records, limit.maxRows()) 
elif hasattr(limit, "maxFiles"): 
records_to_read = min( 
available_records, 
limit.maxFiles() * self.records_per_file, 
) 
else: 
records_to_read = available_records 
else: 
records_to_read = available_records

return \{"offset": current_offset + records_to_read}

def getDefaultReadLimit(self): 
""" 
Optional: Specify default read limit for this source. 
""" 
from pyspark.sql.datasource import ReadMaxRows
 # Read at most 1000 rows per batch
return ReadMaxRows(1000)

def read(self, start, end): 
"""Read data between start and end offsets.""" 
start_offset = start.get("offset", 0) 
end_offset = end.get("offset", 0)
 # Return InputPartition instances for the data range
return [MyInputPartition(start_offset, end_offset)] }}
h3. Example 2: Using `Trigger.AvailableNow`

 

{{from pyspark.sql.datasource import ( 
DataSourceStreamReader, 
SupportsTriggerAvailableNow, 
)

class SnapshotStreamReader(DataSourceStreamReader, 
SupportsTriggerAvailableNow): 
""" 
Streaming reader supporting Trigger.AvailableNow for snapshot processing. 
"""

def prepareForTriggerAvailableNow(self): 
""" 
Prepare source for snapshot-based trigger.

Called once when Trigger.AvailableNow is used. 
"""
 # Capture snapshot of available data
self.snapshot_offset = self._capture_current_state() 
print(f"Snapshot captured at offset: \{self.snapshot_offset}")

def latestOffset(self, start, limit): 
""" 
When using Trigger.AvailableNow, this should return the snapshot offset. 
""" 
if hasattr(self, "snapshot_offset"):
 # Return snapshot boundary
return \{"offset": self.snapshot_offset} 
else:
 # Normal streaming mode
return \{"offset": self._get_current_offset()}

 # ... other required methods ...

 # Using the source with Trigger.AvailableNow
df = ( 
spark.readStream 
.format("mySnapshotSource") 
.load() 
)

query = ( 
df.writeStream 
.trigger(availableNow=True) 
.format("console") 
.start() 
)

query.awaitTermination() }}
h3. Example 3: Using `reportLatestOffset` for Monitoring

 

{{from pyspark.sql.datasource import DataSourceStreamReader

class MonitoredStreamReader(DataSourceStreamReader): 
""" 
Reader that reports available data without consuming it. 
"""

def reportLatestOffset(self): 
""" 
Optional: Report latest available offset without side effects. 
Used for monitoring and metrics. 
""" 
return \{"offset": self._peek_latest_offset()}

def latestOffset(self, start, limit): 
""" 
Actual offset method that may have side effects. 
""" 
latest = self._fetch_and_mark_offset(start, limit) 
return \{"offset": latest} }}
h3. Example 4: `ReadLimit` Types

 

{{from pyspark.sql.streaming import ReadLimit
 # Available ReadLimit implementations:
 # 1. ReadAllAvailable() - Read all available data.
 # 2. ReadMinRows(n) - Read at least n rows.
 # 3. ReadMaxRows(n) - Read at most n rows.
 # 4. ReadMaxFiles(n) - Read at most n files.
 # 5. ReadMaxBytes(n) - Read at most n bytes.

 # Configure read limits in stream options
query = ( 
spark.readStream 
.format("mySource") 
.option("maxFilesPerTrigger", "100") 
.option("maxBytesPerTrigger", "10mb") 
.load() 
.writeStream 
.format("console") 
.start() 
) }}
h2. Backward Compatibility Note

The implementation automatically detects old-style {{latestOffset()}} methods 
(without parameters) using Python introspection, ensuring existing 
implementations continue to work without modification.

 

{{# Old style – still supported 
def latestOffset(self): 
return \{"offset": 100}
 # New style – recommended
def latestOffset(self, start, limit): 
return \{"offset": 100} }}
h2. Reference

 * PR: [https://github.com/apache/spark/pull/54085]

 * JIRA: SPARK-55304

h2. Acceptance Criteria
 * Code examples tested and validated.
 * Examples added to PySpark examples directory.

  was:
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
sources, bringing feature parity with Scala implementations.

This ticket tracks the creation of user-facing documentation with practical 
examples showing how to implement these features in custom Python data sources.
h2. Key Features to Document
 # *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
{{ReadLimit}} parameters.

 # *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
data consumption limits.

 # *Optional {{reportLatestOffset()}}* – Enables tracking available data 
without consumption.

 # *ReadLimit framework* – Built-in implementations for controlling data volume.

 # *{{Trigger.AvailableNow}} support* – Via the {{SupportsTriggerAvailableNow}} 
mixin interface.

h2. Usage Examples
h3. Example 1: Basic Streaming Reader with Admission Control

 

{{from pyspark.sql.datasource import (  
    DataSourceStreamReader,  
    InputPartition,  
    ReadLimit,  
)  
  
  
class MyStreamReader(DataSourceStreamReader):  
    """  
    Custom streaming reader with admission control support.  
    """  
  
    def initialOffset(self):  
        """Return the initial offset for the stream."""  
        return \{"offset": 0}  
  
    def latestOffset(self, start, limit):  
        """  
        Get the latest offset respecting the read limit.  
  
        Args:  
            start: The starting offset.  
            limit: ReadLimit object controlling data volume.  
        """  
        current_offset = start.get("offset", 0)  
  
        # Check available data  
        available_records = self._count_available_records()  
  
        # Apply limit if specified  
        if isinstance(limit, ReadLimit):  
            if hasattr(limit, "maxRows"):  
                records_to_read = min(available_records, limit.maxRows())  
            elif hasattr(limit, "maxFiles"):  
                records_to_read = min(  
                    available_records,  
                    limit.maxFiles() * self.records_per_file,  
                )  
            else:  
                records_to_read = available_records  
        else:  
            records_to_read = available_records  
  
        return \{"offset": current_offset + records_to_read}  
  
    def getDefaultReadLimit(self):  
        """  
        Optional: Specify default read limit for this source.  
        """  
        from pyspark.sql.datasource import ReadMaxRows  
  
        # Read at most 1000 rows per batch  
        return ReadMaxRows(1000)  
  
    def read(self, start, end):  
        """Read data between start and end offsets."""  
        start_offset = start.get("offset", 0)  
        end_offset = end.get("offset", 0)  
  
        # Return InputPartition instances for the data range  
        return [MyInputPartition(start_offset, end_offset)]  }}
h3. Example 2: Using `Trigger.AvailableNow`

 

{{from pyspark.sql.datasource import (  
    DataSourceStreamReader,  
    SupportsTriggerAvailableNow,  
)  
  
  
class SnapshotStreamReader(DataSourceStreamReader, 
SupportsTriggerAvailableNow):  
    """  
    Streaming reader supporting Trigger.AvailableNow for snapshot processing.  
    """  
  
    def prepareForTriggerAvailableNow(self):  
        """  
        Prepare source for snapshot-based trigger.  
  
        Called once when Trigger.AvailableNow is used.  
        """  
        # Capture snapshot of available data  
        self.snapshot_offset = self._capture_current_state()  
        print(f"Snapshot captured at offset: \{self.snapshot_offset}")  
  
    def latestOffset(self, start, limit):  
        """  
        When using Trigger.AvailableNow, this should return the snapshot 
offset.  
        """  
        if hasattr(self, "snapshot_offset"):  
            # Return snapshot boundary  
            return \{"offset": self.snapshot_offset}  
        else:  
            # Normal streaming mode  
            return \{"offset": self._get_current_offset()}  
  
    # ... other required methods ...  
  
  
# Using the source with Trigger.AvailableNow  
df = (  
    spark.readStream  
        .format("mySnapshotSource")  
        .load()  
)  
  
query = (  
    df.writeStream  
        .trigger(availableNow=True)  
        .format("console")  
        .start()  
)  
  
query.awaitTermination()  }}
h3. Example 3: Using `reportLatestOffset` for Monitoring

 

{{from pyspark.sql.datasource import DataSourceStreamReader  
  
  
class MonitoredStreamReader(DataSourceStreamReader):  
    """  
    Reader that reports available data without consuming it.  
    """  
  
    def reportLatestOffset(self):  
        """  
        Optional: Report latest available offset without side effects.  
        Used for monitoring and metrics.  
        """  
        return \{"offset": self._peek_latest_offset()}  
  
    def latestOffset(self, start, limit):  
        """  
        Actual offset method that may have side effects.  
        """  
        latest = self._fetch_and_mark_offset(start, limit)  
        return \{"offset": latest}  }}
h3. Example 4: `ReadLimit` Types

 

{{from pyspark.sql.streaming import ReadLimit  
  
# Available ReadLimit implementations:  
# 1. ReadAllAvailable()        - Read all available data.  
# 2. ReadMinRows(n)            - Read at least n rows.  
# 3. ReadMaxRows(n)            - Read at most n rows.  
# 4. ReadMaxFiles(n)           - Read at most n files.  
# 5. ReadMaxBytes(n)           - Read at most n bytes.  
  
# Configure read limits in stream options  
query = (  
    spark.readStream  
        .format("mySource")  
        .option("maxFilesPerTrigger", "100")  
        .option("maxBytesPerTrigger", "10mb")  
        .load()  
        .writeStream  
        .format("console")  
        .start()  
)  }}
h2. Backward Compatibility Note

The implementation automatically detects old-style {{latestOffset()}} methods 
(without parameters) using Python introspection, ensuring existing 
implementations continue to work without modification.

 

{{# Old style – still supported  
def latestOffset(self):  
    return \{"offset": 100}  
  
# New style – recommended  
def latestOffset(self, start, limit):  
    return \{"offset": 100}  }}
h2. Reference
 * PR: https://github.com/apache/spark/pull/54085

 * JIRA: SPARK-55304

h2. Acceptance Criteria
 * User guide documentation added to Spark documentation.

 * API documentation updated with parameter descriptions.

 * Code examples tested and validated.

 * Migration guide for existing implementations.

 * Examples added to PySpark examples directory.


> Document how to use Admission Control and Trigger.AvailableNow in PySpark 
> custom streaming data sources   
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55450
>                 URL: https://issues.apache.org/jira/browse/SPARK-55450
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jitesh Soni
>            Priority: Minor
>
> Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
> Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
> sources, bringing feature parity with Scala implementations.
> This ticket tracks the creation of user-facing documentation with practical 
> examples showing how to implement these features in custom Python data 
> sources.
> h2. Key Features to Document
>  * *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
> {{ReadLimit}} parameters.
>  * *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
> data consumption limits.
>  * *Optional {{reportLatestOffset()}}* – Enables tracking available data 
> without consumption.
>  * *ReadLimit framework* – Built-in implementations for controlling data 
> volume.
>  * *{{Trigger.AvailableNow}} support* – Via the 
> {{SupportsTriggerAvailableNow}} mixin interface.
> h2. Usage Examples
> h3. Example 1: Basic Streaming Reader with Admission Control
>  
> {{from pyspark.sql.datasource import ( 
> DataSourceStreamReader, 
> InputPartition, 
> ReadLimit, 
> )
> class MyStreamReader(DataSourceStreamReader): 
> """ 
> Custom streaming reader with admission control support. 
> """
> def initialOffset(self): 
> """Return the initial offset for the stream.""" 
> return \{"offset": 0}
> def latestOffset(self, start, limit): 
> """ 
> Get the latest offset respecting the read limit.
> Args: 
> start: The starting offset. 
> limit: ReadLimit object controlling data volume. 
> """ 
> current_offset = start.get("offset", 0)
>  # Check available data
> available_records = self._count_available_records()
>  # Apply limit if specified
> if isinstance(limit, ReadLimit): 
> if hasattr(limit, "maxRows"): 
> records_to_read = min(available_records, limit.maxRows()) 
> elif hasattr(limit, "maxFiles"): 
> records_to_read = min( 
> available_records, 
> limit.maxFiles() * self.records_per_file, 
> ) 
> else: 
> records_to_read = available_records 
> else: 
> records_to_read = available_records
> return \{"offset": current_offset + records_to_read}
> def getDefaultReadLimit(self): 
> """ 
> Optional: Specify default read limit for this source. 
> """ 
> from pyspark.sql.datasource import ReadMaxRows
>  # Read at most 1000 rows per batch
> return ReadMaxRows(1000)
> def read(self, start, end): 
> """Read data between start and end offsets.""" 
> start_offset = start.get("offset", 0) 
> end_offset = end.get("offset", 0)
>  # Return InputPartition instances for the data range
> return [MyInputPartition(start_offset, end_offset)] }}
> h3. Example 2: Using `Trigger.AvailableNow`
>  
> {{from pyspark.sql.datasource import ( 
> DataSourceStreamReader, 
> SupportsTriggerAvailableNow, 
> )
> class SnapshotStreamReader(DataSourceStreamReader, 
> SupportsTriggerAvailableNow): 
> """ 
> Streaming reader supporting Trigger.AvailableNow for snapshot processing. 
> """
> def prepareForTriggerAvailableNow(self): 
> """ 
> Prepare source for snapshot-based trigger.
> Called once when Trigger.AvailableNow is used. 
> """
>  # Capture snapshot of available data
> self.snapshot_offset = self._capture_current_state() 
> print(f"Snapshot captured at offset: \{self.snapshot_offset}")
> def latestOffset(self, start, limit): 
> """ 
> When using Trigger.AvailableNow, this should return the snapshot offset. 
> """ 
> if hasattr(self, "snapshot_offset"):
>  # Return snapshot boundary
> return \{"offset": self.snapshot_offset} 
> else:
>  # Normal streaming mode
> return \{"offset": self._get_current_offset()}
>  # ... other required methods ...
>  # Using the source with Trigger.AvailableNow
> df = ( 
> spark.readStream 
> .format("mySnapshotSource") 
> .load() 
> )
> query = ( 
> df.writeStream 
> .trigger(availableNow=True) 
> .format("console") 
> .start() 
> )
> query.awaitTermination() }}
> h3. Example 3: Using `reportLatestOffset` for Monitoring
>  
> {{from pyspark.sql.datasource import DataSourceStreamReader
> class MonitoredStreamReader(DataSourceStreamReader): 
> """ 
> Reader that reports available data without consuming it. 
> """
> def reportLatestOffset(self): 
> """ 
> Optional: Report latest available offset without side effects. 
> Used for monitoring and metrics. 
> """ 
> return \{"offset": self._peek_latest_offset()}
> def latestOffset(self, start, limit): 
> """ 
> Actual offset method that may have side effects. 
> """ 
> latest = self._fetch_and_mark_offset(start, limit) 
> return \{"offset": latest} }}
> h3. Example 4: `ReadLimit` Types
>  
> {{from pyspark.sql.streaming import ReadLimit
>  # Available ReadLimit implementations:
>  # 1. ReadAllAvailable() - Read all available data.
>  # 2. ReadMinRows(n) - Read at least n rows.
>  # 3. ReadMaxRows(n) - Read at most n rows.
>  # 4. ReadMaxFiles(n) - Read at most n files.
>  # 5. ReadMaxBytes(n) - Read at most n bytes.
>  # Configure read limits in stream options
> query = ( 
> spark.readStream 
> .format("mySource") 
> .option("maxFilesPerTrigger", "100") 
> .option("maxBytesPerTrigger", "10mb") 
> .load() 
> .writeStream 
> .format("console") 
> .start() 
> ) }}
> h2. Backward Compatibility Note
> The implementation automatically detects old-style {{latestOffset()}} methods 
> (without parameters) using Python introspection, ensuring existing 
> implementations continue to work without modification.
>  
> {{# Old style – still supported 
> def latestOffset(self): 
> return \{"offset": 100}
>  # New style – recommended
> def latestOffset(self, start, limit): 
> return \{"offset": 100} }}
> h2. Reference
>  * PR: [https://github.com/apache/spark/pull/54085]
>  * JIRA: SPARK-55304
> h2. Acceptance Criteria
>  * Code examples tested and validated.
>  * Examples added to PySpark examples directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to