SameerMesiah97 opened a new pull request, #62196:
URL: https://github.com/apache/airflow/pull/62196

   **Description**
   
   This change adds optional parallel download and upload support to 
`GCSTimeSpanFileTransformOperator.execute` using `ThreadPoolExecutor`.
   
   When `max_download_workers > 1` or `max_upload_workers > 1`, source objects 
are downloaded and transformed files are uploaded concurrently using a bounded 
thread pool with `as_completed`.
   
   Per-object operations are encapsulated in inner helper functions within 
`execute`. A `storage.Client` (via `get_conn()`) is initialized once per phase 
and reused across submitted futures, with bucket/blob resolution performed 
inside each worker.
   
   `max_download_workers` and `max_upload_workers` must be ≥ 1 (otherwise a 
`ValueError` is raised). Error propagation semantics are unchanged: if 
`*_continue_on_fail=False`, the first `GoogleCloudError` is raised; otherwise 
failures are logged and execution continues. Logging now includes worker counts 
for both phases.
   
   
   **Rationale**
   
   The operator previously executed downloads and uploads sequentially despite 
an existing `#TODO` indicating that parallelism should be introduced. This 
implementation addresses that gap to mitigate cumulative latency from high 
request counts against GCS, which can significantly impact end-to-end runtime 
in object-heavy workloads.
   
   Parallel execution is implemented using `storage.Client` instances obtained 
via `get_conn()` rather than invoking hook methods directly inside worker 
threads. This is deliberate, as **`GCSHook` maintains internal state and is not 
designed for concurrent mutation across threads**. By resolving bucket and blob 
objects within each worker using a shared client, **we avoid shared mutable 
hook state while enabling safe concurrency**.
   
   **Documentation**
   
   The operator docstring has been updated to describe the new parallel 
execution functionality, including the behavior of the download and upload 
phases and the semantics of the new parameters `max_download_workers` and 
`max_upload_workers`.
   
   **Notes**
   
   * Directory placeholder objects (keys ending in `/`) are filtered out prior 
to download.
   * Introduced `_setup_gcs_client_chain` test helper to mock the 
`storage.Client → bucket → blob` chain for `test_execute` and the new parallel 
execution tests.
   
   **Tests**
   
   * Added unit tests verifying that parallel downloads and parallel uploads 
are executed when `max_download_workers > 1` and `max_upload_workers > 1`, 
respectively.
   * Added tests ensuring that a `GoogleCloudError` is raised when 
`download_continue_on_fail=False` or `upload_continue_on_fail=False`, 
confirming correct error propagation behavior.
   * Refactored `test_execute` to mock `storage.Client` directly and introduced 
`_setup_gcs_client_chain` to centralize client-chain mocking.
   * Extended the existing system test DAG (`gcs_transform_timespan`) to 
include a parallel execution case (`max_*_workers > 1`) to validate behavior 
against real GCS.
   
   **Backwards Compatibility**
   
   Existing behavior is preserved. Downloads and uploads remain sequential by 
default, and parallel execution is only triggered when `max_download_workers` 
or `max_upload_workers` is set to a value other than `1`.
   
   **System Test Verification**
   
   The modified system test `example_gcs_transform_timespan.py` has been run 
and has passed succesfully after implementing thse changes. Due to OpenLineage 
compatibility issues, the `check_openlineage_events` task had to be disabled 
but the current test results are sufficient to indicate that the new 
functionality is working as intend with no regressions The screenshot for these 
results has been provided below:
   
   <img width="940" height="170" alt="image" 
src="https://github.com/user-attachments/assets/4d404693-d40e-47bc-b969-0966818febfe";
 />


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to