shahar1 commented on PR #61611:
URL: https://github.com/apache/airflow/pull/61611#issuecomment-3865681760
I created a compact Dag for testing E2E:
<details>
<summary>Click here to to see its code</summary>
```python
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from airflow.models.dag import DAG
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import (
AWS_S3_DATA_SOURCE,
BUCKET_NAME,
DESCRIPTION,
GCS_DATA_SINK,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
TRANSFER_JOB,
TRANSFER_JOB_FIELD_MASK,
TRANSFER_SPEC,
GcpTransferJobsStatus,
)
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service
import (
CloudDataTransferServiceCreateJobOperator,
CloudDataTransferServiceDeleteJobOperator,
CloudDataTransferServiceGetOperationOperator,
CloudDataTransferServiceListOperationsOperator,
CloudDataTransferServiceRunJobOperator,
CloudDataTransferServiceUpdateJobOperator,
)
# Configuration
GCP_PROJECT_ID = "my-gcp-project"
S3_SOURCE_BUCKET = "test-s3-bucket"
GCS_DEST_BUCKET = "test-gcs-bucket"
AWS_CONN_ID = "aws_default"
# Transfer job body: S3 to GCS transfer
s3_to_gcs_transfer_body = {
DESCRIPTION: "Example S3 to GCS transfer",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2025, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2025, 12, 31).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) +
timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
},
}
# Update body for the transfer job.
# The patch request wraps the TransferJob under TRANSFER_JOB key.
# When the inner TransferJob contains a transferSpec with awsS3DataSource,
# the operator now correctly injects AWS credentials (fix for #22021).
update_body = {
PROJECT_ID: GCP_PROJECT_ID,
TRANSFER_JOB: {
DESCRIPTION: "Updated S3 to GCS transfer",
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
},
},
TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}
with DAG(
"example_gcp_storage_transfer",
description="Example DAG for Google Cloud Storage Transfer Service",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "gcp", "transfer"],
) as dag:
create_transfer_job = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job",
body=s3_to_gcs_transfer_body,
aws_conn_id=AWS_CONN_ID,
)
update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name']
}}",
body=update_body,
aws_conn_id=AWS_CONN_ID,
)
run_transfer_job = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name']
}}",
project_id=GCP_PROJECT_ID,
)
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={"project_id": GCP_PROJECT_ID},
)
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation",
operation_name="{{
task_instance.xcom_pull('list_operations')[0]['name'] }}",
)
delete_transfer_job = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name']
}}",
project_id=GCP_PROJECT_ID,
)
create_transfer_job >> update_transfer_job >> run_transfer_job >> [
list_operations,
# delete_transfer_job,
]
list_operations >> get_operation
```
</details>
<img width="2559" height="893" alt="image"
src="https://github.com/user-attachments/assets/54a29fe9-22bf-4e54-99f6-2b128aec7da2"
/>
- 1st run - E2E after applying the change to ensure that it works
- 2nd run - E2E without the deletion step, to check that the files actually
present
- 3rd run - from `main`, expected to fail on the update step
---
Based on the compact Dag, I updated the existing sytem test (I assume that
it's not part of [Google's automated
runs](https://storage.googleapis.com/providers-dashboard-html/dashboard.html))
and ran it manually:
<img width="920" height="769" alt="image"
src="https://github.com/user-attachments/assets/31bd8b39-b571-4425-844b-c7b1c11e3dbf"
/>
Also updated docs to reflect the changes.
CC: @VladaZakharova @MaksYermak - FYI, if you don't have any objections it
will be merged by Tuesday for the upcoming release.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]