chriscugliotta opened a new issue, #29958:
URL: https://github.com/apache/airflow/issues/29958

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==8.10.0
   
   ### Apache Airflow version
   
   2.3.4
   
   ### Operating System
   
   Ubuntu 18.04.6 LTS
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   Google Cloud Composer 2.1.2
   
   ### What happened
   
   
[`GCSToBigQueryOperator`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L58)
 does not respect the BigQuery project ID specified in 
[`destination_project_dataset_table`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L74-L77)
 argument.  Instead, it prioritizes the project ID defined in the [Airflow 
connection](https://i.imgur.com/1tTIlQF.png).
   
   ### What you think should happen instead
   
   The project ID specified via 
[`destination_project_dataset_table`](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L74-L77)
 should be respected.
   
   **Use case:**  Suppose our Composer environment and service account (SA) 
live in `project-A`, and we want to transfer data into foreign projects `B`, 
`C`, and `D`.  We don't have credentials (and thus don't have Airflow 
connections defined) for projects `B`, `C`, and `D`.  Instead, all transfers 
are executed by our singular SA in `project-A`.  (Assume this SA has 
cross-project IAM policies).  Thus, we want to use a _single_ SA and _single_ 
[Airflow connection](https://i.imgur.com/1tTIlQF.png) (i.e. 
`gcp_conn_id=google_cloud_default`) to send data into 3+ destination projects.  
I imagine this is a fairly common setup for sending data across GCP projects.
   
   **Root cause:**  I've been studying the source code, and I believe the bug 
is caused by [line 
309](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py#L309).
  Experimentally, I have verified  that `hook.project_id` traces back to the 
[Airflow connection's project ID](https://i.imgur.com/1tTIlQF.png).  If no 
destination project ID is explicitly specified, then it makes sense to _fall 
back_ on the connection's project.  However, if the destination project is 
explicitly provided, surely the operator should honor that.  I think this bug 
can be fixed by amending line 309 as follows:
   
   ```python
   project=passed_in_project or hook.project_id
   ```
   
   This pattern is used successfully in many other areas of the repo:  
[example](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/operators/gcs.py#L154).
   
   ### How to reproduce
   
   Admittedly, this bug is difficult to reproduce, because it requires two GCP 
projects, i.e. a service account in `project-A`, and inbound GCS files and a 
destination BigQuery table in `project-B`.  Also, you need an Airflow server 
with a `google_cloud_default` connection that points to `project-A` like 
[this](https://i.imgur.com/1tTIlQF.png).  Assuming all that exists, the bug can 
be reproduced via the following Airflow DAG:
   
   ```python
   from airflow import DAG
   from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
   from datetime import datetime
   
   GCS_BUCKET='my_bucket'
   GCS_PREFIX='path/to/*.json'
   BQ_PROJECT='project-B'
   BQ_DATASET='my_dataset'
   BQ_TABLE='my_table'
   SERVICE_ACCOUNT='[email protected]'
   
   
   with DAG(
           dag_id='my_dag',
           start_date=datetime(2023, 1, 1),
           schedule_interval=None,
       ) as dag:
   
       task = GCSToBigQueryOperator(
           task_id='gcs_to_bigquery',
           bucket=GCS_BUCKET,
           source_objects=GCS_PREFIX,
           source_format='NEWLINE_DELIMITED_JSON',
           destination_project_dataset_table='{}.{}.{}'.format(BQ_PROJECT, 
BQ_DATASET, BQ_TABLE),
           impersonation_chain=SERVICE_ACCOUNT,
       )
   ```
   
   Stack trace:
   
   ```
   Traceback (most recent call last):
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/executors/debug_executor.py",
 line 79, in _run_task
       ti.run(job_id=ti.job_id, **params)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, 
in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", 
line 1797, in run
       self._run_raw_task(
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, 
in wrapper
       return func(*args, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", 
line 1464, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", 
line 1612, in _execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", 
line 1673, in _execute_task
       result = execute_callable(context=context)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py",
 line 387, in execute
       job = self._submit_job(self.hook, job_id)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py",
 line 307, in _submit_job
       return hook.insert_job(
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 468, in inner_wrapper
       return func(self, *args, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
 line 1549, in insert_job
       job._begin()
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", 
line 510, in _begin
       api_response = client._call_api(
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", 
line 782, in _call_api
       return call()
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 
283, in retry_wrapped_func
       return retry_target(
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 
190, in retry_target
       return target()
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/cloud/_http/__init__.py", 
line 494, in api_request
       raise exceptions.from_http_response(response)
   google.api_core.exceptions.Forbidden: 403 POST 
https://bigquery.googleapis.com/bigquery/v2/projects/{project-A}/jobs?prettyPrint=false:
 Access Denied: Project {project-A}: User does not have bigquery.jobs.create 
permission in project {project-A}.
   ```
   
   From the stack trace, notice the operator is (incorrectly) attempting to 
insert into `project-A` rather than `project-B`.
   
   ### Anything else
   
   Perhaps out-of-scope, but the inverse direction also suffers from this same 
problem, i.e. 
[BigQueryToGcsOperator](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py#L38)
 and [line 
192](https://github.com/apache/airflow/blob/3374fdfcbddb630b4fc70ceedd5aed673e6c0a0d/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py#L192).
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to