SameerMesiah97 opened a new pull request, #62196: URL: https://github.com/apache/airflow/pull/62196
**Description** This change adds optional parallel download and upload support to `GCSTimeSpanFileTransformOperator.execute` using `ThreadPoolExecutor`. When `max_download_workers > 1` or `max_upload_workers > 1`, source objects are downloaded and transformed files are uploaded concurrently using a bounded thread pool with `as_completed`. Per-object operations are encapsulated in inner helper functions within `execute`. A `storage.Client` (via `get_conn()`) is initialized once per phase and reused across submitted futures, with bucket/blob resolution performed inside each worker. `max_download_workers` and `max_upload_workers` must be ≥ 1 (otherwise a `ValueError` is raised). Error propagation semantics are unchanged: if `*_continue_on_fail=False`, the first `GoogleCloudError` is raised; otherwise failures are logged and execution continues. Logging now includes worker counts for both phases. **Rationale** The operator previously executed downloads and uploads sequentially despite an existing `#TODO` indicating that parallelism should be introduced. This implementation addresses that gap to mitigate cumulative latency from high request counts against GCS, which can significantly impact end-to-end runtime in object-heavy workloads. Parallel execution is implemented using `storage.Client` instances obtained via `get_conn()` rather than invoking hook methods directly inside worker threads. This is deliberate, as **`GCSHook` maintains internal state and is not designed for concurrent mutation across threads**. By resolving bucket and blob objects within each worker using a shared client, **we avoid shared mutable hook state while enabling safe concurrency**. **Documentation** The operator docstring has been updated to describe the new parallel execution functionality, including the behavior of the download and upload phases and the semantics of the new parameters `max_download_workers` and `max_upload_workers`. **Notes** * Directory placeholder objects (keys ending in `/`) are filtered out prior to download. * Introduced `_setup_gcs_client_chain` test helper to mock the `storage.Client → bucket → blob` chain for `test_execute` and the new parallel execution tests. **Tests** * Added unit tests verifying that parallel downloads and parallel uploads are executed when `max_download_workers > 1` and `max_upload_workers > 1`, respectively. * Added tests ensuring that a `GoogleCloudError` is raised when `download_continue_on_fail=False` or `upload_continue_on_fail=False`, confirming correct error propagation behavior. * Refactored `test_execute` to mock `storage.Client` directly and introduced `_setup_gcs_client_chain` to centralize client-chain mocking. * Extended the existing system test DAG (`gcs_transform_timespan`) to include a parallel execution case (`max_*_workers > 1`) to validate behavior against real GCS. **Backwards Compatibility** Existing behavior is preserved. Downloads and uploads remain sequential by default, and parallel execution is only triggered when `max_download_workers` or `max_upload_workers` is set to a value other than `1`. **System Test Verification** The modified system test `example_gcs_transform_timespan.py` has been run and has passed succesfully after implementing thse changes. Due to OpenLineage compatibility issues, the `check_openlineage_events` task had to be disabled but the current test results are sufficient to indicate that the new functionality is working as intend with no regressions The screenshot for these results has been provided below: <img width="940" height="170" alt="image" src="https://github.com/user-attachments/assets/4d404693-d40e-47bc-b969-0966818febfe" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
