aviramst opened a new issue, #37331:
URL: https://github.com/apache/airflow/issues/37331

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==10.9.0
   
   ### Apache Airflow version
   
   2.7.2
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When executing S3ToGCSOperator, it creates the data transfer job 
successfully but fails to get the job status because it looks for GCP default 
credentials rather than using the provided `gcp_conn_id`.
   
   ```
   [2024-02-11, 12:48:21 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:21 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:23 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:23 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:24 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:24 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:26 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:26 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 1000 files
   [2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - 
Created job transferJobs/XXX
   [2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job 
transferJobs/XXX to transfer 530 files
   [2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:330}} INFO - Overall submitted 10 
jobs to transfer 9530 files
   [2024-02-11, 12:48:27 UTC] {{taskinstance.py:1526}} INFO - Pausing task as 
DEFERRED. dag_id=transfer_files, task_id=s3_to_gcs_example, 
execution_date=20240211T124244, start_date=20240211T124816
   [2024-02-11, 12:48:27 UTC] {{local_task_job_runner.py:225}} INFO - Task 
exited with return code 100 (task deferral)
   [2024-02-11, 12:48:28 UTC] {{cloud_storage_transfer_service.py:64}} INFO - 
Attempting to request jobs statuses
   [2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine 
Metadata server unavailable on attempt 1 of 3. Reason: [Errno 22] Invalid 
argument
   [2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine 
Metadata server unavailable on attempt 2 of 3. Reason: [Errno 22] Invalid 
argument
   [2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine 
Metadata server unavailable on attempt 3 of 3. Reason: [Errno 22] Invalid 
argument
   [2024-02-11, 12:48:28 UTC] {{_default.py:338}} WARNING - Authentication 
failed using Compute Engine authentication due to unavailable metadata server.
   [2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
   [2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
   [2024-02-11, 12:48:31 UTC] {{taskinstance.py:1359}} INFO - Resuming after 
deferral
   [2024-02-11, 12:48:31 UTC] {{taskinstance.py:1382}} INFO - Executing 
<Task(S3ToGCSOperator): s3_to_gcs_example> on 2024-02-11 12:42:44+00:00
   [2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:57}} INFO - Started 
process 24431 to run task
   [2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:84}} INFO - Running: 
['airflow', 'tasks', 'run', 'transfer_files', 's3_to_gcs_example', 
'manual__2024-02-11T12:42:44+00:00', '--job-id', '170059', '--raw', '--subdir', 
'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpr6aa0mal']
   [2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:85}} INFO - Job 170059: 
Subtask s3_to_gcs_example
   
   [2024-02-11, 12:48:32 UTC] {{baseoperator.py:1600}} ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 526, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 598, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py",
 line 67, in run
       jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py",
 line 520, in get_jobs
       client = self.get_conn()
                ^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py",
 line 510, in get_conn
       self._client = StorageTransferServiceAsyncClient()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/async_client.py",
 line 225, in __init__
       self._client = StorageTransferServiceClient(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/client.py",
 line 441, in __init__
       self._transport = Transport(
                         ^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/grpc_asyncio.py",
 line 198, in __init__
       super().__init__(
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/base.py",
 line 99, in __init__
       credentials, _ = google.auth.default(
                        ^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/google/auth/_default.py",
 line 691, in default
       raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
   
   google.auth.exceptions.DefaultCredentialsError: Your default credentials 
were not found. To set up Application Default Credentials, see 
https://cloud.google.com/docs/authentication/external/set-up-adc for more 
information.
   ```
   
   ### What you think should happen instead
   
   I think that S3ToGCSOperator should create the storage API object with the 
GCP credentials provided (`gcp_conn_id`)
   
   ### How to reproduce
   
   ```
   from airflow import DAG
   from airflow.providers.google.cloud.transfers.s3_to_gcs import 
S3ToGCSOperator
   from datetime import datetime
   from airflow.operators.python import PythonOperator
   
   default_args = {
       'owner': 'myself',
       'depends_on_past': False,
       'start_date': datetime(2024, 1, 1),
       'retries': 1,
   }
   
   my_dag = DAG(
       's3_to_gcs_transfer',
       default_args=default_args,
       description='Transfer files from S3 to GCS',
       schedule_interval=None, 
       catchup=False,
   )
   
   s3_to_gcs_op = S3ToGCSOperator(
       task_id="s3_to_gcs_example",
       bucket="my-s3-bucket",
       prefix="my-prefix",
       apply_gcs_prefix=True,
       gcp_conn_id="my-gcp-conn-id",
       aws_conn_id="my-aws-conn-id",
       dest_gcs="gs://my-gcs-bucket/",
       replace=False,
       deferrable=True,
       dag=my_dag,
   )
   
   s3_to_gcs_op
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to