ying-w opened a new issue, #30731: URL: https://github.com/apache/airflow/issues/30731
### Apache Airflow Provider(s) google ### Versions of Apache Airflow Providers google-cloud-bigquery==2.34.4 ### Apache Airflow version 2.5.2+astro.2 ### Operating System OS X ### Deployment Astronomer ### Deployment details _No response_ ### What happened When using `SQLExecuteQueryOperator()` it is not possible to pass in the `priority` argument which is [important for running jobs on bigquery](https://cloud.google.com/bigquery/docs/running-queries). By default, bigquery operators will run in 'INTERACTIVE' priority rather than 'BATCH' priority. The typical way to specify this is to pass dictionary through `configuration` or `api_resource_configs`. When using `SQLExecuteQueryOperator()` I could not find a satisfactory workaround other than switching to use the bigquery specific `BigQueryInsertJobOperator()` ### What you think should happen instead The most straightforward workaround would be to use provider specific QueryOperator, however, I would like to see more standardization around using common. Other possible solutions are 1. tweak `_api_resource_configs_duplication_check` to allow priority that's passed in as config to overwrite 2. remove `priority: str = "INTERACTIVE",` from `run_query` function definition and instead default it (if unset) right before execution in `insert_job` (allowing it to pass duplicate check) 3. make config and priority more of first class parameters within bigquery hook/operators (it's missing in places like `get_pandas_df`) - this is prob a much longer term solution ### How to reproduce ```py SQLExecuteQueryOperator( ... hook_params={ "use_legacy_sql": False, "location": "us", "api_resource_configs": {"query": {"useQueryCache": False, "priority": "BATCH"}}, }, ) ``` will fail with ```txt Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 260, in execute output = hook.run( File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py", line 349, in run self._run_command(cur, sql_statement, parameters) File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py", line 380, in _run_command cur.execute(sql_statement) File "/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2701, in execute self.job_id = self.hook.run_query(sql) File "/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2141, in run_query _api_resource_configs_duplication_check(param_name, param, configuration["query"]) File "/usr/local/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2942, in _api_resource_configs_duplication_check raise ValueError( ValueError: Values of priority param are duplicated. api_resource_configs contained priority param in `query` config and priority was also provided with arg to run_query() method. Please remove duplicates. ``` Although the run function within hook has parameter for priority, the hook does not have this parameter so the following fails with ```py SQLExecuteQueryOperator( ... hook_params={ "use_legacy_sql": False, "location": "us", "priority": "BATCH", "api_resource_configs": {"query": {"useQueryCache": False}}, }, ) ``` ```txt Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 255, in execute hook = self.get_db_hook() File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 179, in get_db_hook return self._hook File "/usr/local/lib/python3.9/functools.py", line 993, in __get__ val = self.func(instance) File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 142, in _hook hook = conn.get_hook(hook_params=self.hook_params) File "/usr/local/lib/python3.9/site-packages/airflow/models/connection.py", line 344, in get_hook return hook_class(**{hook.connection_id_attribute_name: self.conn_id}, **hook_params) TypeError: __init__() got an unexpected keyword argument 'priority' ``` I put in a PR to add `priority` to the hook but maybe this isn't the right general solution https://github.com/apache/airflow/pull/30655 ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
