Talvarenga opened a new issue, #32260:
URL: https://github.com/apache/airflow/issues/32260
### Apache Airflow version
2.6.2
### What happened
We are trying to use dynamic task mapping with SQLExecuteQueryOperator on
Trino. Our use case is to expand the sql parameter to the operator by calling
some SQL files.
Without dynamic task mapping it works perfectly, but when used with the
dynamic task mapping, it is unable to recognize the Path, and instead tries to
execute the path as query.
I believe it has some relation with the template_searchpath parameter.
### What you think should happen instead
It should have worked similar with or without dynamic task mapping.
### How to reproduce
Deployed the following DAG in Airflow
```
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.providers.common.sql.operators.sql import
SQLExecuteQueryOperator
DEFAULT_ARGS = {
'start_date': datetime(2023, 7, 16),
}
with DAG (dag_id= 'trino_dinamic_map',
template_searchpath = '/opt/airflow',
description = "Esta é um dag para o projeto exemplo",
schedule = None,
default_args = DEFAULT_ARGS,
catchup=False,
) as dag:
trino_call = SQLExecuteQueryOperator(
task_id= 'trino_call',
conn_id='con_id',
sql = 'queries/insert_delta_dp_raw_table1.sql',
handler=list
)
trino_insert = SQLExecuteQueryOperator.partial(
task_id="trino_insert_table",
conn_id='con_id',
handler=list
).expand_kwargs([{'sql': 'queries/insert_delta_dp_raw_table1.sql'},
{'sql': 'queries/insert_delta_dp_raw_table2.sql'}, {'sql':
'queries/insert_delta_dp_raw_table3.sql'}])
trino_call >> trino_insert
```
In the sql file it can be any query, for the test I used a create table.
Queries are located in /opt/airflow/queries
```
CREATE TABLE database_config.data_base_name.TABLE_NAME (
"JOB_NAME" VARCHAR(60) NOT NULL,
"JOB_ID" DECIMAL NOT NULL,
"JOB_STATUS" VARCHAR(10),
"JOB_STARTED_DATE" VARCHAR(10),
"JOB_STARTED_TIME" VARCHAR(10),
"JOB_FINISHED_DATE" VARCHAR(10),
"JOB_FINISHED_TIME" VARCHSAR(10)
)
```
task_1 (without dynamic task mapping) completes successfully, while
task_2(with dynamic task mapping) fails.
Looking at the error logs, there was a failure when executing the query, not
recognizing the query content but the path.
Here is the traceback:
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR,
name=SYNTAX_ERROR, message="line 1:1: mismatched input 'queries'. Expecting:
'ALTER', 'ANALYZE', 'CALL', 'COMMENT', 'COMMIT', 'CREATE', 'DEALLOCATE',
'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DROP', 'EXECUTE', 'EXPLAIN', 'GRANT',
'INSERT', 'MERGE', 'PREPARE', 'REFRESH', 'RESET', 'REVOKE', 'ROLLBACK', 'SET',
'SHOW', 'START', 'TRUNCATE', 'UPDATE', 'USE', <query>",
query_id=20230629_114146_04418_qbcnd)
### Operating System
Red Hat Enterprise Linux 8.8
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]