bentorb opened a new pull request, #59042:
URL: https://github.com/apache/airflow/pull/59042

   ## Add S3CopyPrefixOperator for copying objects by prefix
   
   ### Description
   
   This PR introduces a new ```S3CopyPrefixOperator``` that enables copying all 
S3 objects under a specified prefix from a source bucket to a destination 
bucket. This operator fills a gap in the current S3 operators by providing 
prefix-based bulk copy functionality.
   
   ### What does this operator do?
   
   • Copies all objects matching a specified prefix from source to destination 
S3 bucket
   • Supports cross-bucket
   • Handles large datasets through pagination
   • Provides configurable error handling (continue on failure or stop on first 
error)
   • Integrates with OpenLineage for data lineage tracking
   • Supports Airflow templating for dynamic parameter values
   
   ### Why is this needed?
   
   Currently, Airflow's S3 operators allows copying individual objects. For use 
cases involving copying entire "directory" structures or large numbers of 
objects sharing a common prefix, users must implement custom solutions or use 
multiple operator instances. 
   This operator provides a native, efficient solution for prefix-based bulk 
operations.
   
   ### Key Features
   
   • **Pagination Support**: Automatically handles large object lists using 
S3's pagination
   • **Error Handling**: Configurable continue_on_failure parameter for 
resilient operations
   • **Template Fields**: All key parameters support Jinja templating
   • **OpenLineage Integration**: Automatic data lineage tracking for copied 
objects
   • **Standard Exception Handling**: Uses RuntimeError following new Airflow 
guidelines
   
   ### Implementation Details
   
   • **Base Class**: Based on S3CopyObjectOperator for consistency
   • **Dependencies**: Uses existing S3Hook and AWS connection infrastructure
   • **Documentation**: Updated ```providers/amazon/docs/operators/s3/s3.rst``` 
with operator documentation
   • **Error Handling**: Follows new Airflow guidelines using standard Python 
exceptions
   
   ### Testing
   
   Includes **14  new unit tests** covering:
     • Basic functionality and successful copying
     • Error scenarios and exception handling
     • Pagination configuration
     • Continue on failure behavior
     • OpenLineage integration
     • Template field functionality
   
   • **System test integration** in 
```tests/system/providers/amazon/aws/example_s3.py```
   • **All tests pass** in Breeze testing environment
   
   ### Usage Example
   
   ```python
   copy_prefix = S3CopyPrefixOperator(
       task_id='copy_data_files',                                               
                                                                                
                                                                                
                                        
       source_bucket_name='source-bucket',                                      
                                                                                
                                                                                
                                        
       source_bucket_key='data/2023/',                                          
                                                                                
                                                                                
                                        
       dest_bucket_name='dest-bucket',                                          
                                                                                
                                                                                
                                        
       dest_bucket_key='archive/data/2023/',                                    
                                                                                
                                                                                
                                        
       continue_on_failure=True,                                                
                                                                                
                                                                                
                                        
       aws_conn_id='aws_default'                                                
                                                                                
                                                                                
                                        
   )                                                                            
                                                                                
                                                                                
                                        
   ```                                                                          
                                                                                
                                                                                
                                      
   
   ### Checklist
   
   • [x] Tests included (14 comprehensive unit tests)
   • [x] Documentation updated
   • [x] Code follows project coding standards
   • [x] All static code checks pass
   • [x] Apache license headers added
   • [x] PR is focused on single feature
   • [x] Local tests pass
   • [x] No unrelated changes included
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to