[ 
https://issues.apache.org/jira/browse/SPARK-55450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jitesh Soni updated SPARK-55450:
--------------------------------
    Description: 
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
sources, bringing feature parity with Scala implementations.

This ticket tracks the creation of user-facing documentation with practical 
examples showing how to implement these features in custom Python data sources.
h2. Key Features to Document
 # *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
{{ReadLimit}} parameters.

 # *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
data consumption limits.

 # *Optional {{reportLatestOffset()}}* – Enables tracking available data 
without consumption.

 # *ReadLimit framework* – Built-in implementations for controlling data volume.

 # *{{Trigger.AvailableNow}} support* – Via the {{SupportsTriggerAvailableNow}} 
mixin interface.

h2. Usage Examples
h3. Example 1: Basic Streaming Reader with Admission Control

 

{{from pyspark.sql.datasource import (  
    DataSourceStreamReader,  
    InputPartition,  
    ReadLimit,  
)  
  
  
class MyStreamReader(DataSourceStreamReader):  
    """  
    Custom streaming reader with admission control support.  
    """  
  
    def initialOffset(self):  
        """Return the initial offset for the stream."""  
        return \{"offset": 0}  
  
    def latestOffset(self, start, limit):  
        """  
        Get the latest offset respecting the read limit.  
  
        Args:  
            start: The starting offset.  
            limit: ReadLimit object controlling data volume.  
        """  
        current_offset = start.get("offset", 0)  
  
        # Check available data  
        available_records = self._count_available_records()  
  
        # Apply limit if specified  
        if isinstance(limit, ReadLimit):  
            if hasattr(limit, "maxRows"):  
                records_to_read = min(available_records, limit.maxRows())  
            elif hasattr(limit, "maxFiles"):  
                records_to_read = min(  
                    available_records,  
                    limit.maxFiles() * self.records_per_file,  
                )  
            else:  
                records_to_read = available_records  
        else:  
            records_to_read = available_records  
  
        return \{"offset": current_offset + records_to_read}  
  
    def getDefaultReadLimit(self):  
        """  
        Optional: Specify default read limit for this source.  
        """  
        from pyspark.sql.datasource import ReadMaxRows  
  
        # Read at most 1000 rows per batch  
        return ReadMaxRows(1000)  
  
    def read(self, start, end):  
        """Read data between start and end offsets."""  
        start_offset = start.get("offset", 0)  
        end_offset = end.get("offset", 0)  
  
        # Return InputPartition instances for the data range  
        return [MyInputPartition(start_offset, end_offset)]  }}
h3. Example 2: Using `Trigger.AvailableNow`

 

{{from pyspark.sql.datasource import (  
    DataSourceStreamReader,  
    SupportsTriggerAvailableNow,  
)  
  
  
class SnapshotStreamReader(DataSourceStreamReader, 
SupportsTriggerAvailableNow):  
    """  
    Streaming reader supporting Trigger.AvailableNow for snapshot processing.  
    """  
  
    def prepareForTriggerAvailableNow(self):  
        """  
        Prepare source for snapshot-based trigger.  
  
        Called once when Trigger.AvailableNow is used.  
        """  
        # Capture snapshot of available data  
        self.snapshot_offset = self._capture_current_state()  
        print(f"Snapshot captured at offset: \{self.snapshot_offset}")  
  
    def latestOffset(self, start, limit):  
        """  
        When using Trigger.AvailableNow, this should return the snapshot 
offset.  
        """  
        if hasattr(self, "snapshot_offset"):  
            # Return snapshot boundary  
            return \{"offset": self.snapshot_offset}  
        else:  
            # Normal streaming mode  
            return \{"offset": self._get_current_offset()}  
  
    # ... other required methods ...  
  
  
# Using the source with Trigger.AvailableNow  
df = (  
    spark.readStream  
        .format("mySnapshotSource")  
        .load()  
)  
  
query = (  
    df.writeStream  
        .trigger(availableNow=True)  
        .format("console")  
        .start()  
)  
  
query.awaitTermination()  }}
h3. Example 3: Using `reportLatestOffset` for Monitoring

 

{{from pyspark.sql.datasource import DataSourceStreamReader  
  
  
class MonitoredStreamReader(DataSourceStreamReader):  
    """  
    Reader that reports available data without consuming it.  
    """  
  
    def reportLatestOffset(self):  
        """  
        Optional: Report latest available offset without side effects.  
        Used for monitoring and metrics.  
        """  
        return \{"offset": self._peek_latest_offset()}  
  
    def latestOffset(self, start, limit):  
        """  
        Actual offset method that may have side effects.  
        """  
        latest = self._fetch_and_mark_offset(start, limit)  
        return \{"offset": latest}  }}
h3. Example 4: `ReadLimit` Types

 

{{from pyspark.sql.streaming import ReadLimit  
  
# Available ReadLimit implementations:  
# 1. ReadAllAvailable()        - Read all available data.  
# 2. ReadMinRows(n)            - Read at least n rows.  
# 3. ReadMaxRows(n)            - Read at most n rows.  
# 4. ReadMaxFiles(n)           - Read at most n files.  
# 5. ReadMaxBytes(n)           - Read at most n bytes.  
  
# Configure read limits in stream options  
query = (  
    spark.readStream  
        .format("mySource")  
        .option("maxFilesPerTrigger", "100")  
        .option("maxBytesPerTrigger", "10mb")  
        .load()  
        .writeStream  
        .format("console")  
        .start()  
)  }}
h2. Backward Compatibility Note

The implementation automatically detects old-style {{latestOffset()}} methods 
(without parameters) using Python introspection, ensuring existing 
implementations continue to work without modification.

 

{{# Old style – still supported  
def latestOffset(self):  
    return \{"offset": 100}  
  
# New style – recommended  
def latestOffset(self, start, limit):  
    return \{"offset": 100}  }}
h2. Reference
 * PR: https://github.com/apache/spark/pull/54085

 * JIRA: SPARK-55304

h2. Acceptance Criteria
 * User guide documentation added to Spark documentation.

 * API documentation updated with parameter descriptions.

 * Code examples tested and validated.

 * Migration guide for existing implementations.

 * Examples added to PySpark examples directory.

  was:
Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
Admission Control and Trigger.AvailableNow for custom streaming data sources, 
bringing feature parity with Scala implementations.                             
                                                    

                                                                                
                                                                                
                                                                                
                      This ticket tracks the creation of user-facing 
documentation with practical examples showing how to implement these features 
in custom Python data sources.                                                  
                                                                                
   

                                                                                
                                                                                
                                                                                
                                                  

  ## Key Features to Document                                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  1. ***Updated `latestOffset()` signature*** - Now accepts `start` offset and 
`ReadLimit` parameters                                                          
                                                                                
                                                     

  2. ***Optional `getDefaultReadLimit()`*** - Allows sources to specify 
preferred data consumption limits                                               
                                                                                
                                                            

  3. ***Optional `reportLatestOffset()`*** - Enables tracking available data 
without consumption                                                             
                                                                                
                                                       

  4. ***ReadLimit framework*** - Built-in implementations for controlling data 
volume                                                                          
                                                                                
                                                     

  5. ***Trigger.AvailableNow support*** - Via `SupportsTriggerAvailableNow` 
mixin interface                                                                 
                                                                                
                                                        

                                                                                
                                                                                
                                                                                
                                                  

  ## Usage Examples                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  ### Example 1: Basic Streaming Reader with Admission Control                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  ```python                                                                     
                                                                                
                                                                                
                                                  

  from pyspark.sql.datasource import (                                          
                                                                                
                                                                                
                                                  

      DataSourceStreamReader,                                                   
                                                                                
                                                                                
                                                  

      InputPartition,                                                           
                                                                                
                                                                                
                                                  

      ReadLimit                                                                 
                                                                                
                                                                                
                                                  

  )                                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  class MyStreamReader(DataSourceStreamReader):                                 
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

      Custom streaming reader with admission control support                    
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def initialOffset(self):                                                  
                                                                                
                                                                                
                                                  

          """Return the initial offset for the stream"""                        
                                                                                
                                                                                
                                                  

          return \{"offset": 0}                                                 
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Get the latest offset respecting the read limit.                      
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          Args:                                                                 
                                                                                
                                                                                
                                                  

              start: The starting offset                                        
                                                                                
                                                                                
                                                  

              limit: ReadLimit object controlling data volume                   
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          current_offset = start.get("offset", 0)                               
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          # Check available data                                                
                                                                                
                                                                                
                                                  

          available_records = self._count_available_records()                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          # Apply limit if specified                                            
                                                                                
                                                                                
                                                  

          if isinstance(limit, ReadLimit):                                      
                                                                                
                                                                                
                                                  

              if hasattr(limit, 'maxRows'):                                     
                                                                                
                                                                                
                                                  

                  records_to_read = min(available_records, limit.maxRows())     
                                                                                
                                                                                
                                                  

              elif hasattr(limit, 'maxFiles'):                                  
                                                                                
                                                                                
                                                  

                  records_to_read = min(available_records, limit.maxFiles() * 
self.records_per_file)                                                          
                                                                                
                                                    

              else:                                                             
                                                                                
                                                                                
                                                  

                  records_to_read = available_records                           
                                                                                
                                                                                
                                                  

          else:                                                                 
                                                                                
                                                                                
                                                  

              records_to_read = available_records                               
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          return \{"offset": current_offset + records_to_read}                  
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def getDefaultReadLimit(self):                                            
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Optional: Specify default read limit for this source                  
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          from pyspark.sql.datasource import ReadMaxRows                        
                                                                                
                                                                                
                                                  

          return ReadMaxRows(1000)  # Read at most 1000 rows per batch          
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def read(self, start, end):                                               
                                                                                
                                                                                
                                                  

          """Read data between start and end offsets"""                         
                                                                                
                                                                                
                                                  

          start_offset = start.get("offset", 0)                                 
                                                                                
                                                                                
                                                  

          end_offset = end.get("offset", 0)                                     
                                                                                
                                                                                
                                                  

          # Return InputPartition instances for the data range                  
                                                                                
                                                                                
                                                  

          return [MyInputPartition(start_offset, end_offset)]                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 2: Using Trigger.AvailableNow*                                       
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  from pyspark.sql.datasource import (                                          
                                                                                
                                                                                
                                                  

      DataSourceStreamReader,                                                   
                                                                                
                                                                                
                                                  

      SupportsTriggerAvailableNow                                               
                                                                                
                                                                                
                                                  

  )                                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  class SnapshotStreamReader(DataSourceStreamReader, 
SupportsTriggerAvailableNow):                                                   
                                                                                
                                                                             

      """                                                                       
                                                                                
                                                                                
                                                  

      Streaming reader supporting Trigger.AvailableNow for snapshot processing  
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def prepareForTriggerAvailableNow(self):                                  
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Prepare source for snapshot-based trigger.                            
                                                                                
                                                                                
                                                  

          Called once when Trigger.AvailableNow is used.                        
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          # Capture snapshot of available data                                  
                                                                                
                                                                                
                                                  

          self.snapshot_offset = self._capture_current_state()                  
                                                                                
                                                                                
                                                  

          print(f"Snapshot captured at offset: \{self.snapshot_offset}")        
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          When using Trigger.AvailableNow, this should return the snapshot 
offset                                                                          
                                                                                
                                                       

          """                                                                   
                                                                                
                                                                                
                                                  

          if hasattr(self, 'snapshot_offset'):                                  
                                                                                
                                                                                
                                                  

              # Return snapshot boundary                                        
                                                                                
                                                                                
                                                  

              return {"offset": self.snapshot_offset}                           
                                                                                
                                                                                
                                                  

          else:                                                                 
                                                                                
                                                                                
                                                  

              # Normal streaming mode                                           
                                                                                
                                                                                
                                                  

              return {"offset": self._get_current_offset()}                     
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      # ... other required methods ...                                          
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Using the source with Trigger.AvailableNow                                  
                                                                                
                                                                                
                                                  

  df = spark.readStream \                                                       
                                                                                
                                                                                
                                                  

      .format("mySnapshotSource") \                                             
                                                                                
                                                                                
                                                  

      .load()                                                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  query = df.writeStream \                                                      
                                                                                
                                                                                
                                                  

      .trigger(availableNow=True) \                                             
                                                                                
                                                                                
                                                  

      .format("console") \                                                      
                                                                                
                                                                                
                                                  

      .start()                                                                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  query.awaitTermination()                                                      
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 3: Using reportLatestOffset for Monitoring*                          
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  class MonitoredStreamReader(DataSourceStreamReader):                          
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

      Reader that reports available data without consuming it                   
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def reportLatestOffset(self):                                             
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Optional: Report latest available offset without side effects.        
                                                                                
                                                                                
                                                  

          Used for monitoring and metrics.                                      
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          return {"offset": self._peek_latest_offset()}                         
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Actual offset method that may have side effects                       
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          latest = self._fetch_and_mark_offset(start, limit)                    
                                                                                
                                                                                
                                                  

          return {"offset": latest}                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 4: ReadLimit Types*                                                  
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  from pyspark.sql.streaming import ReadLimit                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Available ReadLimit implementations:                                        
                                                                                
                                                                                
                                                  

  # 1. ReadAllAvailable - Read all available data                               
                                                                                
                                                                                
                                                  

  # 2. ReadMinRows(n) - Read at least n rows                                    
                                                                                
                                                                                
                                                  

  # 3. ReadMaxRows(n) - Read at most n rows                                     
                                                                                
                                                                                
                                                  

  # 4. ReadMaxFiles(n) - Read at most n files                                   
                                                                                
                                                                                
                                                  

  # 5. ReadMaxBytes(n) - Read at most n bytes                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Configure read limits in stream options                                     
                                                                                
                                                                                
                                                  

  query = spark.readStream \                                                    
                                                                                
                                                                                
                                                  

      .format("mySource") \                                                     
                                                                                
                                                                                
                                                  

      .option("maxFilesPerTrigger", "100") \                                    
                                                                                
                                                                                
                                                  

      .option("maxBytesPerTrigger", "10mb") \                                   
                                                                                
                                                                                
                                                  

      .load() \                                                                 
                                                                                
                                                                                
                                                  

      .writeStream \                                                            
                                                                                
                                                                                
                                                  

      .format("console") \                                                      
                                                                                
                                                                                
                                                  

      .start()                                                                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Backward Compatibility Note*                                                 
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  The implementation automatically detects old-style latestOffset() methods 
(without parameters) using Python introspection, ensuring existing 
implementations continue to work without modification.                          
                                                                   

                                                                                
                                                                                
                                                                                
                                                  

  # Old style - still supported                                                 
                                                                                
                                                                                
                                                  

  def latestOffset(self):                                                       
                                                                                
                                                                                
                                                  

      return {"offset": 100}                                                    
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # New style - recommended                                                     
                                                                                
                                                                                
                                                  

  def latestOffset(self, start, limit):                                         
                                                                                
                                                                                
                                                  

      return {"offset": 100}                                                    
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Reference*                                                                   
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  - PR: https://github.com/apache/spark/pull/54085                              
                                                                                
                                                                                
                                                  

  - JIRA: SPARK-55304                                                           
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Acceptance Criteria*                                                         
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  - User guide documentation added to Spark documentation                       
                                                                                
                                                                                
                                                  

  - API documentation updated with parameter descriptions                       
                                                                                
                                                                                
                                                  

  - Code examples tested and validated                                          
                                                                                
                                                                                
                                                  

  - Migration guide for existing implementations                                
                                                                                
                                                                                
                                                  

  - Examples added to PySpark examples directory       


> Document how to use Admission Control and Trigger.AvailableNow in PySpark 
> custom streaming data sources   
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55450
>                 URL: https://issues.apache.org/jira/browse/SPARK-55450
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jitesh Soni
>            Priority: Minor
>
> Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
> Admission Control and {{Trigger.AvailableNow}} for custom streaming data 
> sources, bringing feature parity with Scala implementations.
> This ticket tracks the creation of user-facing documentation with practical 
> examples showing how to implement these features in custom Python data 
> sources.
> h2. Key Features to Document
>  # *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and 
> {{ReadLimit}} parameters.
>  # *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred 
> data consumption limits.
>  # *Optional {{reportLatestOffset()}}* – Enables tracking available data 
> without consumption.
>  # *ReadLimit framework* – Built-in implementations for controlling data 
> volume.
>  # *{{Trigger.AvailableNow}} support* – Via the 
> {{SupportsTriggerAvailableNow}} mixin interface.
> h2. Usage Examples
> h3. Example 1: Basic Streaming Reader with Admission Control
>  
> {{from pyspark.sql.datasource import (  
>     DataSourceStreamReader,  
>     InputPartition,  
>     ReadLimit,  
> )  
>   
>   
> class MyStreamReader(DataSourceStreamReader):  
>     """  
>     Custom streaming reader with admission control support.  
>     """  
>   
>     def initialOffset(self):  
>         """Return the initial offset for the stream."""  
>         return \{"offset": 0}  
>   
>     def latestOffset(self, start, limit):  
>         """  
>         Get the latest offset respecting the read limit.  
>   
>         Args:  
>             start: The starting offset.  
>             limit: ReadLimit object controlling data volume.  
>         """  
>         current_offset = start.get("offset", 0)  
>   
>         # Check available data  
>         available_records = self._count_available_records()  
>   
>         # Apply limit if specified  
>         if isinstance(limit, ReadLimit):  
>             if hasattr(limit, "maxRows"):  
>                 records_to_read = min(available_records, limit.maxRows())  
>             elif hasattr(limit, "maxFiles"):  
>                 records_to_read = min(  
>                     available_records,  
>                     limit.maxFiles() * self.records_per_file,  
>                 )  
>             else:  
>                 records_to_read = available_records  
>         else:  
>             records_to_read = available_records  
>   
>         return \{"offset": current_offset + records_to_read}  
>   
>     def getDefaultReadLimit(self):  
>         """  
>         Optional: Specify default read limit for this source.  
>         """  
>         from pyspark.sql.datasource import ReadMaxRows  
>   
>         # Read at most 1000 rows per batch  
>         return ReadMaxRows(1000)  
>   
>     def read(self, start, end):  
>         """Read data between start and end offsets."""  
>         start_offset = start.get("offset", 0)  
>         end_offset = end.get("offset", 0)  
>   
>         # Return InputPartition instances for the data range  
>         return [MyInputPartition(start_offset, end_offset)]  }}
> h3. Example 2: Using `Trigger.AvailableNow`
>  
> {{from pyspark.sql.datasource import (  
>     DataSourceStreamReader,  
>     SupportsTriggerAvailableNow,  
> )  
>   
>   
> class SnapshotStreamReader(DataSourceStreamReader, 
> SupportsTriggerAvailableNow):  
>     """  
>     Streaming reader supporting Trigger.AvailableNow for snapshot processing. 
>  
>     """  
>   
>     def prepareForTriggerAvailableNow(self):  
>         """  
>         Prepare source for snapshot-based trigger.  
>   
>         Called once when Trigger.AvailableNow is used.  
>         """  
>         # Capture snapshot of available data  
>         self.snapshot_offset = self._capture_current_state()  
>         print(f"Snapshot captured at offset: \{self.snapshot_offset}")  
>   
>     def latestOffset(self, start, limit):  
>         """  
>         When using Trigger.AvailableNow, this should return the snapshot 
> offset.  
>         """  
>         if hasattr(self, "snapshot_offset"):  
>             # Return snapshot boundary  
>             return \{"offset": self.snapshot_offset}  
>         else:  
>             # Normal streaming mode  
>             return \{"offset": self._get_current_offset()}  
>   
>     # ... other required methods ...  
>   
>   
> # Using the source with Trigger.AvailableNow  
> df = (  
>     spark.readStream  
>         .format("mySnapshotSource")  
>         .load()  
> )  
>   
> query = (  
>     df.writeStream  
>         .trigger(availableNow=True)  
>         .format("console")  
>         .start()  
> )  
>   
> query.awaitTermination()  }}
> h3. Example 3: Using `reportLatestOffset` for Monitoring
>  
> {{from pyspark.sql.datasource import DataSourceStreamReader  
>   
>   
> class MonitoredStreamReader(DataSourceStreamReader):  
>     """  
>     Reader that reports available data without consuming it.  
>     """  
>   
>     def reportLatestOffset(self):  
>         """  
>         Optional: Report latest available offset without side effects.  
>         Used for monitoring and metrics.  
>         """  
>         return \{"offset": self._peek_latest_offset()}  
>   
>     def latestOffset(self, start, limit):  
>         """  
>         Actual offset method that may have side effects.  
>         """  
>         latest = self._fetch_and_mark_offset(start, limit)  
>         return \{"offset": latest}  }}
> h3. Example 4: `ReadLimit` Types
>  
> {{from pyspark.sql.streaming import ReadLimit  
>   
> # Available ReadLimit implementations:  
> # 1. ReadAllAvailable()        - Read all available data.  
> # 2. ReadMinRows(n)            - Read at least n rows.  
> # 3. ReadMaxRows(n)            - Read at most n rows.  
> # 4. ReadMaxFiles(n)           - Read at most n files.  
> # 5. ReadMaxBytes(n)           - Read at most n bytes.  
>   
> # Configure read limits in stream options  
> query = (  
>     spark.readStream  
>         .format("mySource")  
>         .option("maxFilesPerTrigger", "100")  
>         .option("maxBytesPerTrigger", "10mb")  
>         .load()  
>         .writeStream  
>         .format("console")  
>         .start()  
> )  }}
> h2. Backward Compatibility Note
> The implementation automatically detects old-style {{latestOffset()}} methods 
> (without parameters) using Python introspection, ensuring existing 
> implementations continue to work without modification.
>  
> {{# Old style – still supported  
> def latestOffset(self):  
>     return \{"offset": 100}  
>   
> # New style – recommended  
> def latestOffset(self, start, limit):  
>     return \{"offset": 100}  }}
> h2. Reference
>  * PR: https://github.com/apache/spark/pull/54085
>  * JIRA: SPARK-55304
> h2. Acceptance Criteria
>  * User guide documentation added to Spark documentation.
>  * API documentation updated with parameter descriptions.
>  * Code examples tested and validated.
>  * Migration guide for existing implementations.
>  * Examples added to PySpark examples directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to