nathadfield opened a new issue, #39541:
URL: https://github.com/apache/airflow/issues/39541

   ### Apache Airflow version
   
   2.9.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   After upgrading to Airflow 2.9.1, @task decorated functions that implement 
BigQuery hooks are not successfully submitting jobs but returning an error such 
as the following:
   
   ```
   google.api_core.exceptions.NotFound: 404 GET 
https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0?location=EU&prettyPrint=false:
 Not found: Job 
my-project:EU.airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0
   ```
   
   I have replicated this issue against Airflow 2.9.1 and against `main` but 
this **does not** seem to be related to the Google provider because using 
either 
[10.17](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/changelog.html#id1)
 or 
[10.16](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/changelog.html#id2)
 will result in this error.
   
   Using either Google provider with Airflow 2.8.4 does not cause this error.  
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Here's a simple DAG that will replicate the issue using Breeze.  The 
`bq_hook_test` task will fail but `bq_insert_job_test` based on 
`BigQueryInsertJobOperator` with the same configuration will succeed.
   
   ```
   breeze --python 3.10 --backend postgres start-airflow --forward-credentials 
${HOME}/.config/gcloud
   ```
   
   ```
   from datetime import datetime
   from airflow import models
   
   from airflow.decorators import task
   from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   
   with models.DAG(
       dag_id='bq_hook_test',
       start_date=datetime(2024, 1, 1),
       catchup=False,
       schedule='0 0 * * *',
   ) as dag:
   
       configuration={
           'query': {
               'query': 'SELECT 1;',
               'useLegacySql': False
           },
       }
   
       @task
       def bq_test():
           hook = BigQueryHook()
           hook.insert_job(
               location='EU',
               configuration=configuration,
           )
   
   
       bq_hook_test = bq_test()
   
       test = BigQueryInsertJobOperator(
           task_id='bq_insert_job_test',
           location='EU',
           configuration=configuration,
       )
   ```
   
   <img width="1123" alt="Screenshot 2024-05-10 at 10 38 21" 
src="https://github.com/apache/airflow/assets/967119/5cf56019-6a8a-4241-a6b9-768279144e2d";>
   
   We are using Google default credentials for authentication with the 
following environment variables:
   
   ```
   GOOGLE_CLOUD_PROJECT=my-project
   AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT=google-cloud-platform://
   ```
   
   ### Operating System
   
   n/a
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google=10.17.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   Full log exception.
   
   ```
   [2024-05-10, 08:36:29 UTC] {taskinstance.py:2910} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 478, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 441, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
       return_value = super().execute(context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 238, in execute
       return_value = self.execute_callable()
     File "/opt/airflow/airflow/operators/python.py", line 256, in 
execute_callable
       return runner.run(*self.op_args, **self.op_kwargs)
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
     File "/files/dags/test.py", line 17, in bq_test
       hook.insert_job(
     File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", 
line 524, in inner_wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 
1681, in insert_job
       job_api_repr.result(timeout=timeout, retry=retry)
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", 
line 1626, in result
       while not is_job_done():
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", 
line 1551, in is_job_done
       if self.done(retry=retry, timeout=timeout):
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", 
line 938, in done
       self.reload(retry=retry, timeout=timeout)
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", 
line 828, in reload
       api_response = client._call_api(
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 
831, in _call_api
       return call()
     File 
"/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", 
line 293, in retry_wrapped_func
       return retry_target(
     File 
"/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", 
line 153, in retry_target
       _retry_error_helper(
     File 
"/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", 
line 212, in _retry_error_helper
       raise final_exc from source_exc
     File 
"/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", 
line 144, in retry_target
       result = target()
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 
494, in api_request
       raise exceptions.from_http_response(response)
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to