aviramst opened a new issue, #37331:
URL: https://github.com/apache/airflow/issues/37331
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-google==10.9.0
### Apache Airflow version
2.7.2
### Operating System
Linux
### Deployment
Amazon (AWS) MWAA
### Deployment details
_No response_
### What happened
When executing S3ToGCSOperator, it creates the data transfer job
successfully but fails to get the job status because it looks for GCP default
credentials rather than using the provided `gcp_conn_id`.
```
[2024-02-11, 12:48:21 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:21 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:23 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:23 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:24 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:24 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:26 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:26 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO -
Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job
transferJobs/XXX to transfer 530 files
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:330}} INFO - Overall submitted 10
jobs to transfer 9530 files
[2024-02-11, 12:48:27 UTC] {{taskinstance.py:1526}} INFO - Pausing task as
DEFERRED. dag_id=transfer_files, task_id=s3_to_gcs_example,
execution_date=20240211T124244, start_date=20240211T124816
[2024-02-11, 12:48:27 UTC] {{local_task_job_runner.py:225}} INFO - Task
exited with return code 100 (task deferral)
[2024-02-11, 12:48:28 UTC] {{cloud_storage_transfer_service.py:64}} INFO -
Attempting to request jobs statuses
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine
Metadata server unavailable on attempt 1 of 3. Reason: [Errno 22] Invalid
argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine
Metadata server unavailable on attempt 2 of 3. Reason: [Errno 22] Invalid
argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine
Metadata server unavailable on attempt 3 of 3. Reason: [Errno 22] Invalid
argument
[2024-02-11, 12:48:28 UTC] {{_default.py:338}} WARNING - Authentication
failed using Compute Engine authentication due to unavailable metadata server.
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance:
transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1359}} INFO - Resuming after
deferral
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1382}} INFO - Executing
<Task(S3ToGCSOperator): s3_to_gcs_example> on 2024-02-11 12:42:44+00:00
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:57}} INFO - Started
process 24431 to run task
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:84}} INFO - Running:
['airflow', 'tasks', 'run', 'transfer_files', 's3_to_gcs_example',
'manual__2024-02-11T12:42:44+00:00', '--job-id', '170059', '--raw', '--subdir',
'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpr6aa0mal']
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:85}} INFO - Job 170059:
Subtask s3_to_gcs_example
[2024-02-11, 12:48:32 UTC] {{baseoperator.py:1600}} ERROR - Trigger failed:
Traceback (most recent call last):
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
line 526, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
line 598, in run_trigger
async for event in trigger.run():
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py",
line 67, in run
jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py",
line 520, in get_jobs
client = self.get_conn()
^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py",
line 510, in get_conn
self._client = StorageTransferServiceAsyncClient()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/async_client.py",
line 225, in __init__
self._client = StorageTransferServiceClient(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/client.py",
line 441, in __init__
self._transport = Transport(
^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/grpc_asyncio.py",
line 198, in __init__
super().__init__(
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/base.py",
line 99, in __init__
credentials, _ = google.auth.default(
^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/auth/_default.py",
line 691, in default
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
google.auth.exceptions.DefaultCredentialsError: Your default credentials
were not found. To set up Application Default Credentials, see
https://cloud.google.com/docs/authentication/external/set-up-adc for more
information.
```
### What you think should happen instead
I think that S3ToGCSOperator should create the storage API object with the
GCP credentials provided (`gcp_conn_id`)
### How to reproduce
```
from airflow import DAG
from airflow.providers.google.cloud.transfers.s3_to_gcs import
S3ToGCSOperator
from datetime import datetime
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'myself',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
my_dag = DAG(
's3_to_gcs_transfer',
default_args=default_args,
description='Transfer files from S3 to GCS',
schedule_interval=None,
catchup=False,
)
s3_to_gcs_op = S3ToGCSOperator(
task_id="s3_to_gcs_example",
bucket="my-s3-bucket",
prefix="my-prefix",
apply_gcs_prefix=True,
gcp_conn_id="my-gcp-conn-id",
aws_conn_id="my-aws-conn-id",
dest_gcs="gs://my-gcs-bucket/",
replace=False,
deferrable=True,
dag=my_dag,
)
s3_to_gcs_op
```
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]