williesb opened a new issue, #23431:
URL: https://github.com/apache/airflow/issues/23431

   ### Apache Airflow Provider(s)
   
   trino
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-trino==2.1.2
   
   ### Apache Airflow version
   
   2.2.0
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   You cannot pass a list of statements to ```trinohook.run``` because the 
method expects a string value for the ```sql``` argument and it calls a static 
method to call ```strip()``` and ```rstrip(";")``` to remove trailing spaces 
and semi-colons.  If you pass a list of statements (strings) you get this 
error: ```AttributeError: 'list' object has no attribute 'strip'```
   
   ### What you think should happen instead
   
   The method should be updated to accept a string or a list of strings for the 
argument ```sql``` to match ```dbapi```.  It could still do the trailing spaces 
and semi-colon stripping by checking whether the argument is a scalar or a list 
and looping through the strings when needed.  The ```dbapi.run``` method 
already handles running the statements in a loop which is needed in order to be 
able to run all the statements in a single connection/session and support 
```SET SESSION``` statements before an ```insert``` or ```select``` statement.  
Without this every statement runs in its own session.
   
   ### How to reproduce
   
   Using this operator/plugin
   
   ```
   from airflow.models.baseoperator import BaseOperator
   from airflow.utils.decorators import apply_defaults
   from airflow.providers.trino.hooks.trino import TrinoHook
   import logging
   from typing import Sequence
   
   def handler(cur):
       cur.fetchall()
   
   class SEPRunOperator(BaseOperator):
   
       template_fields: Sequence[str] = ('sql',)
   
       @apply_defaults
       def __init__(self, trino_conn_id: str, sql, **kwargs) -> None:
           super().__init__(**kwargs)
           self.trino_conn_id = trino_conn_id
           self.sql = sql
   
       def execute(self, context):
           task_instance = context['task']
   
           logging.info('Creating Trino connection')
           hook = TrinoHook(trino_conn_id=self.trino_conn_id)
   
           logging.info('Running sql statement(s)')
           return hook.run(sql=self.sql, autocommit=False, parameters=None, 
handler=handler)
   ```
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to