ying-w opened a new issue, #30731:
URL: https://github.com/apache/airflow/issues/30731

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   google-cloud-bigquery==2.34.4
   
   ### Apache Airflow version
   
   2.5.2+astro.2
   
   ### Operating System
   
   OS X
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using `SQLExecuteQueryOperator()` it is not possible to pass in the 
`priority` argument which is [important for running jobs on 
bigquery](https://cloud.google.com/bigquery/docs/running-queries). By default, 
bigquery operators will run in 'INTERACTIVE' priority rather than 'BATCH' 
priority.
   
   The typical way to specify this is to pass dictionary through 
`configuration` or `api_resource_configs`. When using 
`SQLExecuteQueryOperator()` I could not find a satisfactory workaround other 
than switching to use the bigquery specific `BigQueryInsertJobOperator()`
   
   ### What you think should happen instead
   
   The most straightforward workaround would be to use provider specific 
QueryOperator, however, I would like to see more standardization around using 
common.
   
   Other possible solutions are
   
   1. tweak `_api_resource_configs_duplication_check` to allow priority that's 
passed in as config to overwrite
   2. remove `priority: str = "INTERACTIVE",` from `run_query` function 
definition and instead default it (if unset) right before execution in 
`insert_job` (allowing it to pass duplicate check)
   3. make config and priority more of first class parameters within bigquery 
hook/operators (it's missing in places like `get_pandas_df`) - this is prob a 
much longer term solution
   
   ### How to reproduce
   
   ```py
   SQLExecuteQueryOperator(
   ...
           hook_params={
               "use_legacy_sql": False,
               "location": "us",
               "api_resource_configs": {"query": {"useQueryCache": False, 
"priority": "BATCH"}},
           },
   )
   ```
   
   will fail with
   
   ```txt
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 260, in execute
       output = hook.run(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
 line 349, in run
       self._run_command(cur, sql_statement, parameters)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
 line 380, in _run_command
       cur.execute(sql_statement)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
 line 2701, in execute
       self.job_id = self.hook.run_query(sql)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
 line 2141, in run_query
       _api_resource_configs_duplication_check(param_name, param, 
configuration["query"])
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
 line 2942, in _api_resource_configs_duplication_check
       raise ValueError(
   ValueError: Values of priority param are duplicated. api_resource_configs 
contained priority param in `query` config and priority was also provided with 
arg to run_query() method. Please remove duplicates.
   ```
   
   Although the run function within hook has parameter for priority, the hook 
does not have this parameter so the following fails with
   
   ```py
   SQLExecuteQueryOperator(
   ...
           hook_params={
               "use_legacy_sql": False,
               "location": "us",
               "priority": "BATCH",
               "api_resource_configs": {"query": {"useQueryCache": False}},
           },
   )
   ```
   
   ```txt
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 255, in execute
       hook = self.get_db_hook()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 179, in get_db_hook
       return self._hook
     File "/usr/local/lib/python3.9/functools.py", line 993, in __get__
       val = self.func(instance)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 142, in _hook
       hook = conn.get_hook(hook_params=self.hook_params)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/connection.py", line 
344, in get_hook
       return hook_class(**{hook.connection_id_attribute_name: self.conn_id}, 
**hook_params)
   TypeError: __init__() got an unexpected keyword argument 'priority'
   ```
   
   I put in a PR to add `priority` to the hook but maybe this isn't the right 
general solution https://github.com/apache/airflow/pull/30655
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to