alexcsolomon1 opened a new issue, #39622: URL: https://github.com/apache/airflow/issues/39622
### Apache Airflow Provider(s) snowflake ### Versions of Apache Airflow Providers apache-airflow-providers-snowflake == 5.5.0 ### Apache Airflow version 2.9.1 ### Operating System mac os ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened When using the [SnowflakeSqlApiOperator](https://github.com/apache/airflow/blob/339ea508e2afa56205cfdaf90574fb42a0101eac/airflow/providers/snowflake/operators/snowflake.py#L391C7-L391C30), and specifying a DB parameter such as `schema` to override what is in the airflow connection, the DB parameters are ignored. For example, if the schema specified and retrieved via the `snowflake_conn_id` is `DEFAULT`, but the SnowflakeSqlApiOperator is instantiated with the argument `schema='OTHER'`, the query is still executing against the API with the `DEFAULT` schema. The same is true for Role, Warehouse, etc ### What you think should happen instead In this case, the query should execute with the schema specified in the arguments of [SnowflakeSQLApiOperator](https://github.com/apache/airflow/blob/339ea508e2afa56205cfdaf90574fb42a0101eac/airflow/providers/snowflake/operators/snowflake.py#L391C7-L391C30), which should take preference over what is in the snowflake_conn_id specified object. ### How to reproduce Assume in snowflake that you have a schema structure like: Database: - DEFAULT schema - Table1 - OTHER schema - Table2 Then in airflow, the connection object 'db_conn' contains the key: {.... "schema": "DEFAULT"....} Finally, DAG code contains ```python3 dag = DAG( 'repro_dag', default_args=default_args, description='A DAG to demonstrate the SnowflakeSqlApiOperator DB params bug', schedule_interval=None, start_date=datetime(2024, 5, 14) ) repro_task = SnowflakeSqlApiOperator( task_id="repro_task", sql="select * from Table2;", snowflake_conn_id="db_conn", dag=dag, schema="OTHER" ) repro_task ``` This will fail with ``` SQL compilation error: Table 'TABLE2' does not exist or not authorized. ``` Further debugging (checking the `conn_config['schema']` in hook's [execute_query](https://github.com/apache/airflow/blob/339ea508e2afa56205cfdaf90574fb42a0101eac/airflow/providers/snowflake/hooks/snowflake_sql_api.py#L162C1-L163C1)) indicates this is because schema is still on DEFAULT for post to API. ### Anything else hook_params are set in the SnowflakeSqlApiOperator __init__, a similar pattern that is used in the [base airflow SQL operator](https://github.com/apache/airflow/blob/339ea508e2afa56205cfdaf90574fb42a0101eac/airflow/providers/common/sql/operators/sql.py#L151). However, in the case of the SnowflakeSqlApiOperator, these hook_params are never used in creating the hook. I think this issue can be solved by modifying the the instantiation of the [SnowflakeSqlApiHook](https://github.com/apache/airflow/blob/339ea508e2afa56205cfdaf90574fb42a0101eac/airflow/providers/snowflake/operators/snowflake.py#L503C1-L508C10) by passing the hook_params as the kwargs: ``` self._hook = SnowflakeSqlApiHook( snowflake_conn_id=self.snowflake_conn_id, token_life_time=self.token_life_time, token_renewal_delta=self.token_renewal_delta, deferrable=self.deferrable, **self.hook_params, ) ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
