bentorb opened a new pull request, #59042:
URL: https://github.com/apache/airflow/pull/59042
## Add S3CopyPrefixOperator for copying objects by prefix
### Description
This PR introduces a new ```S3CopyPrefixOperator``` that enables copying all
S3 objects under a specified prefix from a source bucket to a destination
bucket. This operator fills a gap in the current S3 operators by providing
prefix-based bulk copy functionality.
### What does this operator do?
• Copies all objects matching a specified prefix from source to destination
S3 bucket
• Supports cross-bucket
• Handles large datasets through pagination
• Provides configurable error handling (continue on failure or stop on first
error)
• Integrates with OpenLineage for data lineage tracking
• Supports Airflow templating for dynamic parameter values
### Why is this needed?
Currently, Airflow's S3 operators allows copying individual objects. For use
cases involving copying entire "directory" structures or large numbers of
objects sharing a common prefix, users must implement custom solutions or use
multiple operator instances.
This operator provides a native, efficient solution for prefix-based bulk
operations.
### Key Features
• **Pagination Support**: Automatically handles large object lists using
S3's pagination
• **Error Handling**: Configurable continue_on_failure parameter for
resilient operations
• **Template Fields**: All key parameters support Jinja templating
• **OpenLineage Integration**: Automatic data lineage tracking for copied
objects
• **Standard Exception Handling**: Uses RuntimeError following new Airflow
guidelines
### Implementation Details
• **Base Class**: Based on S3CopyObjectOperator for consistency
• **Dependencies**: Uses existing S3Hook and AWS connection infrastructure
• **Documentation**: Updated ```providers/amazon/docs/operators/s3/s3.rst```
with operator documentation
• **Error Handling**: Follows new Airflow guidelines using standard Python
exceptions
### Testing
Includes **14 new unit tests** covering:
• Basic functionality and successful copying
• Error scenarios and exception handling
• Pagination configuration
• Continue on failure behavior
• OpenLineage integration
• Template field functionality
• **System test integration** in
```tests/system/providers/amazon/aws/example_s3.py```
• **All tests pass** in Breeze testing environment
### Usage Example
```python
copy_prefix = S3CopyPrefixOperator(
task_id='copy_data_files',
source_bucket_name='source-bucket',
source_bucket_key='data/2023/',
dest_bucket_name='dest-bucket',
dest_bucket_key='archive/data/2023/',
continue_on_failure=True,
aws_conn_id='aws_default'
)
```
### Checklist
• [x] Tests included (14 comprehensive unit tests)
• [x] Documentation updated
• [x] Code follows project coding standards
• [x] All static code checks pass
• [x] Apache license headers added
• [x] PR is focused on single feature
• [x] Local tests pass
• [x] No unrelated changes included
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]