Jitesh Soni created SPARK-55450:
-----------------------------------

             Summary: Document how to use Admission Control and 
Trigger.AvailableNow in PySpark custom streaming data sources   
                 Key: SPARK-55450
                 URL: https://issues.apache.org/jira/browse/SPARK-55450
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 4.2.0
            Reporter: Jitesh Soni


Following the implementation in SPARK-55304 (PR #54085), PySpark now supports 
Admission Control and Trigger.AvailableNow for custom streaming data sources, 
bringing feature parity with Scala implementations.                             
                                                    

                                                                                
                                                                                
                                                                                
                      This ticket tracks the creation of user-facing 
documentation with practical examples showing how to implement these features 
in custom Python data sources.                                                  
                                                                                
   

                                                                                
                                                                                
                                                                                
                                                  

  ## Key Features to Document                                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  1. ***Updated `latestOffset()` signature*** - Now accepts `start` offset and 
`ReadLimit` parameters                                                          
                                                                                
                                                     

  2. ***Optional `getDefaultReadLimit()`*** - Allows sources to specify 
preferred data consumption limits                                               
                                                                                
                                                            

  3. ***Optional `reportLatestOffset()`*** - Enables tracking available data 
without consumption                                                             
                                                                                
                                                       

  4. ***ReadLimit framework*** - Built-in implementations for controlling data 
volume                                                                          
                                                                                
                                                     

  5. ***Trigger.AvailableNow support*** - Via `SupportsTriggerAvailableNow` 
mixin interface                                                                 
                                                                                
                                                        

                                                                                
                                                                                
                                                                                
                                                  

  ## Usage Examples                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  ### Example 1: Basic Streaming Reader with Admission Control                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  ```python                                                                     
                                                                                
                                                                                
                                                  

  from pyspark.sql.datasource import (                                          
                                                                                
                                                                                
                                                  

      DataSourceStreamReader,                                                   
                                                                                
                                                                                
                                                  

      InputPartition,                                                           
                                                                                
                                                                                
                                                  

      ReadLimit                                                                 
                                                                                
                                                                                
                                                  

  )                                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  class MyStreamReader(DataSourceStreamReader):                                 
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

      Custom streaming reader with admission control support                    
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def initialOffset(self):                                                  
                                                                                
                                                                                
                                                  

          """Return the initial offset for the stream"""                        
                                                                                
                                                                                
                                                  

          return \{"offset": 0}                                                 
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Get the latest offset respecting the read limit.                      
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          Args:                                                                 
                                                                                
                                                                                
                                                  

              start: The starting offset                                        
                                                                                
                                                                                
                                                  

              limit: ReadLimit object controlling data volume                   
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          current_offset = start.get("offset", 0)                               
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          # Check available data                                                
                                                                                
                                                                                
                                                  

          available_records = self._count_available_records()                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          # Apply limit if specified                                            
                                                                                
                                                                                
                                                  

          if isinstance(limit, ReadLimit):                                      
                                                                                
                                                                                
                                                  

              if hasattr(limit, 'maxRows'):                                     
                                                                                
                                                                                
                                                  

                  records_to_read = min(available_records, limit.maxRows())     
                                                                                
                                                                                
                                                  

              elif hasattr(limit, 'maxFiles'):                                  
                                                                                
                                                                                
                                                  

                  records_to_read = min(available_records, limit.maxFiles() * 
self.records_per_file)                                                          
                                                                                
                                                    

              else:                                                             
                                                                                
                                                                                
                                                  

                  records_to_read = available_records                           
                                                                                
                                                                                
                                                  

          else:                                                                 
                                                                                
                                                                                
                                                  

              records_to_read = available_records                               
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

          return \{"offset": current_offset + records_to_read}                  
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def getDefaultReadLimit(self):                                            
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Optional: Specify default read limit for this source                  
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          from pyspark.sql.datasource import ReadMaxRows                        
                                                                                
                                                                                
                                                  

          return ReadMaxRows(1000)  # Read at most 1000 rows per batch          
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def read(self, start, end):                                               
                                                                                
                                                                                
                                                  

          """Read data between start and end offsets"""                         
                                                                                
                                                                                
                                                  

          start_offset = start.get("offset", 0)                                 
                                                                                
                                                                                
                                                  

          end_offset = end.get("offset", 0)                                     
                                                                                
                                                                                
                                                  

          # Return InputPartition instances for the data range                  
                                                                                
                                                                                
                                                  

          return [MyInputPartition(start_offset, end_offset)]                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 2: Using Trigger.AvailableNow*                                       
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  from pyspark.sql.datasource import (                                          
                                                                                
                                                                                
                                                  

      DataSourceStreamReader,                                                   
                                                                                
                                                                                
                                                  

      SupportsTriggerAvailableNow                                               
                                                                                
                                                                                
                                                  

  )                                                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  class SnapshotStreamReader(DataSourceStreamReader, 
SupportsTriggerAvailableNow):                                                   
                                                                                
                                                                             

      """                                                                       
                                                                                
                                                                                
                                                  

      Streaming reader supporting Trigger.AvailableNow for snapshot processing  
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def prepareForTriggerAvailableNow(self):                                  
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Prepare source for snapshot-based trigger.                            
                                                                                
                                                                                
                                                  

          Called once when Trigger.AvailableNow is used.                        
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          # Capture snapshot of available data                                  
                                                                                
                                                                                
                                                  

          self.snapshot_offset = self._capture_current_state()                  
                                                                                
                                                                                
                                                  

          print(f"Snapshot captured at offset: \{self.snapshot_offset}")        
                                                                                
                                                                                
                                                   

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          When using Trigger.AvailableNow, this should return the snapshot 
offset                                                                          
                                                                                
                                                       

          """                                                                   
                                                                                
                                                                                
                                                  

          if hasattr(self, 'snapshot_offset'):                                  
                                                                                
                                                                                
                                                  

              # Return snapshot boundary                                        
                                                                                
                                                                                
                                                  

              return {"offset": self.snapshot_offset}                           
                                                                                
                                                                                
                                                  

          else:                                                                 
                                                                                
                                                                                
                                                  

              # Normal streaming mode                                           
                                                                                
                                                                                
                                                  

              return {"offset": self._get_current_offset()}                     
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      # ... other required methods ...                                          
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Using the source with Trigger.AvailableNow                                  
                                                                                
                                                                                
                                                  

  df = spark.readStream \                                                       
                                                                                
                                                                                
                                                  

      .format("mySnapshotSource") \                                             
                                                                                
                                                                                
                                                  

      .load()                                                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  query = df.writeStream \                                                      
                                                                                
                                                                                
                                                  

      .trigger(availableNow=True) \                                             
                                                                                
                                                                                
                                                  

      .format("console") \                                                      
                                                                                
                                                                                
                                                  

      .start()                                                                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  query.awaitTermination()                                                      
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 3: Using reportLatestOffset for Monitoring*                          
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  class MonitoredStreamReader(DataSourceStreamReader):                          
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

      Reader that reports available data without consuming it                   
                                                                                
                                                                                
                                                  

      """                                                                       
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def reportLatestOffset(self):                                             
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Optional: Report latest available offset without side effects.        
                                                                                
                                                                                
                                                  

          Used for monitoring and metrics.                                      
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          return {"offset": self._peek_latest_offset()}                         
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

      def latestOffset(self, start, limit):                                     
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          Actual offset method that may have side effects                       
                                                                                
                                                                                
                                                  

          """                                                                   
                                                                                
                                                                                
                                                  

          latest = self._fetch_and_mark_offset(start, limit)                    
                                                                                
                                                                                
                                                  

          return {"offset": latest}                                             
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Example 4: ReadLimit Types*                                                  
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  from pyspark.sql.streaming import ReadLimit                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Available ReadLimit implementations:                                        
                                                                                
                                                                                
                                                  

  # 1. ReadAllAvailable - Read all available data                               
                                                                                
                                                                                
                                                  

  # 2. ReadMinRows(n) - Read at least n rows                                    
                                                                                
                                                                                
                                                  

  # 3. ReadMaxRows(n) - Read at most n rows                                     
                                                                                
                                                                                
                                                  

  # 4. ReadMaxFiles(n) - Read at most n files                                   
                                                                                
                                                                                
                                                  

  # 5. ReadMaxBytes(n) - Read at most n bytes                                   
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # Configure read limits in stream options                                     
                                                                                
                                                                                
                                                  

  query = spark.readStream \                                                    
                                                                                
                                                                                
                                                  

      .format("mySource") \                                                     
                                                                                
                                                                                
                                                  

      .option("maxFilesPerTrigger", "100") \                                    
                                                                                
                                                                                
                                                  

      .option("maxBytesPerTrigger", "10mb") \                                   
                                                                                
                                                                                
                                                  

      .load() \                                                                 
                                                                                
                                                                                
                                                  

      .writeStream \                                                            
                                                                                
                                                                                
                                                  

      .format("console") \                                                      
                                                                                
                                                                                
                                                  

      .start()                                                                  
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Backward Compatibility Note*                                                 
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  The implementation automatically detects old-style latestOffset() methods 
(without parameters) using Python introspection, ensuring existing 
implementations continue to work without modification.                          
                                                                   

                                                                                
                                                                                
                                                                                
                                                  

  # Old style - still supported                                                 
                                                                                
                                                                                
                                                  

  def latestOffset(self):                                                       
                                                                                
                                                                                
                                                  

      return {"offset": 100}                                                    
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  # New style - recommended                                                     
                                                                                
                                                                                
                                                  

  def latestOffset(self, start, limit):                                         
                                                                                
                                                                                
                                                  

      return {"offset": 100}                                                    
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Reference*                                                                   
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  - PR: https://github.com/apache/spark/pull/54085                              
                                                                                
                                                                                
                                                  

  - JIRA: SPARK-55304                                                           
                                                                                
                                                                                
                                                  

                                                                                
                                                                                
                                                                                
                                                  

  *Acceptance Criteria*                                                         
                                                                                
                                                                                
                                                    

                                                                                
                                                                                
                                                                                
                                                  

  - User guide documentation added to Spark documentation                       
                                                                                
                                                                                
                                                  

  - API documentation updated with parameter descriptions                       
                                                                                
                                                                                
                                                  

  - Code examples tested and validated                                          
                                                                                
                                                                                
                                                  

  - Migration guide for existing implementations                                
                                                                                
                                                                                
                                                  

  - Examples added to PySpark examples directory       



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to