chriscugliotta opened a new issue, #29958: URL: https://github.com/apache/airflow/issues/29958
### Apache Airflow Provider(s) google ### Versions of Apache Airflow Providers apache-airflow-providers-google==8.10.0 ### Apache Airflow version 2.3.4 ### Operating System Ubuntu 18.04.6 LTS ### Deployment Google Cloud Composer ### Deployment details Google Cloud Composer 2.1.2 ### What happened [`GCSToBigQueryOperator`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L58) does not respect the BigQuery project ID specified in [`destination_project_dataset_table`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L74-L77) argument. Instead, it prioritizes the project ID defined in the [Airflow connection](https://i.imgur.com/1tTIlQF.png). ### What you think should happen instead The project ID specified via [`destination_project_dataset_table`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L74-L77) should be respected. **Use case:** Suppose our Composer environment and service account (SA) live in `project-A`, and we want to transfer data into foreign projects `B`, `C`, and `D`. We don't have credentials (and thus don't have Airflow connections defined) for projects `B`, `C`, and `D`. Instead, all transfers are executed by our singular SA in `project-A`. (Assume this SA has cross-project IAM policies). Thus, we want to use a _single_ SA and _single_ [Airflow connection](https://i.imgur.com/1tTIlQF.png) (i.e. `gcp_conn_id=google_cloud_default`) to send data into 3+ destination projects. I imagine this is a fairly common setup for sending data across GCP projects. **Root cause:** I've been studying the source code, and I believe the bug is caused by [line 309](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L309). Experimentally, I have verified that `hook.project_id` traces back to the [Airflow connection's project ID](https://i.imgur.com/1tTIlQF.png). If no destination project ID is explicitly specified, then it makes sense to _fall back_ on the connection's project. However, if the destination project is explicitly provided, surely the operator should honor that. I think this bug can be fixed by amending line 309 as follows: ```python project=passed_in_project or hook.project_id ``` This pattern is used successfully in many other areas of the repo: [example](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/operators/gcs.py#L154). ### How to reproduce Admittedly, this bug is difficult to reproduce, because it requires two GCP projects, i.e. a service account in `project-A`, and inbound GCS files and a destination BigQuery table in `project-B`. Also, you need an Airflow server with a `google_cloud_default` connection that points to `project-A` like [this](https://i.imgur.com/1tTIlQF.png). Assuming all that exists, the bug can be reproduced via the following Airflow DAG: ```python from airflow import DAG from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from datetime import datetime GCS_BUCKET='my_bucket' GCS_PREFIX='path/to/*.json' BQ_PROJECT='project-B' BQ_DATASET='my_dataset' BQ_TABLE='my_table' SERVICE_ACCOUNT='[email protected]' with DAG( dag_id='my_dag', start_date=datetime(2023, 1, 1), schedule_interval=None, ) as dag: task = GCSToBigQueryOperator( task_id='gcs_to_bigquery', bucket=GCS_BUCKET, source_objects=GCS_PREFIX, source_format='NEWLINE_DELIMITED_JSON', destination_project_dataset_table='{}.{}.{}'.format(BQ_PROJECT, BQ_DATASET, BQ_TABLE), impersonation_chain=SERVICE_ACCOUNT, ) ``` Stack trace: ``` Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task ti.run(job_id=ti.job_id, **params) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper return func(*args, session=session, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1797, in run self._run_raw_task( File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1464, in _run_raw_task self._execute_task_with_callbacks(context, test_mode) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1612, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1673, in _execute_task result = execute_callable(context=context) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 387, in execute job = self._submit_job(self.hook, job_id) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 307, in _submit_job return hook.insert_job( File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 468, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1549, in insert_job job._begin() File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 510, in _begin api_response = client._call_api( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 782, in _call_api return call() File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func return retry_target( File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target return target() File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/_http/__init__.py", line 494, in api_request raise exceptions.from_http_response(response) google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/{project-A}/jobs?prettyPrint=false: Access Denied: Project {project-A}: User does not have bigquery.jobs.create permission in project {project-A}. ``` From the stack trace, notice the operator is (incorrectly) attempting to insert into `project-A` rather than `project-B`. ### Anything else Perhaps out-of-scope, but the inverse direction also suffers from this same problem, i.e. [BigQueryToGcsOperator](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py#L38) and [line 192](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py#L192). ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
