Fabrice Dossin created AIRFLOW-2136:
---------------------------------------

             Summary: MySqlOperator does not comply well with sql scripts with 
multiple statements with MySQL warnings
                 Key: AIRFLOW-2136
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2136
             Project: Apache Airflow
          Issue Type: Bug
          Components: core
    Affects Versions: 1.10.0
            Reporter: Fabrice Dossin


Hello,

Take a simple sql script that will produce warnings :

 
{code:java}
CREATE TABLE IF NOT EXISTS test_table  (a BIGINT NOT NULL PRIMARY KEY);
CREATE TABLE IF NOT EXISTS test_table2 (b BIGINT NOT NULL PRIMARY KEY);
CREATE TABLE IF NOT EXISTS test_table3 (b BIGINT NOT NULL PRIMARY KEY);
{code}
On the second execution with MySqlOperator, it fails because of the warnings.

 

 
{noformat}
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1514, 
in _run_raw_task
    result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python3.6/dist-packages/airflow/operators/mysql_operator.py", 
line 55, in execute
    parameters=self.parameters)
  File "/usr/local/lib/python3.6/dist-packages/airflow/hooks/dbapi_hook.py", 
line 165, in run
    cur.execute(s)
  File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
    self.thing.close()
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 84, in 
close
    while self.nextset():
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 177, 
in nextset
    self._warning_check()
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 148, 
in _warning_check
    warnings = db.show_warnings()
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 
381, in show_warnings
    self.query("SHOW WARNINGS")
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 
277, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run 
this command now"){noformat}
The truth is that mysqldb cursor is made for the execution of a list of 
statements while the use of a SQL file as input is sent as a single string. I 
guess warnings during the script are read by mysqldb and we cannot do some 
fetchall between statements so it fails with the common out of sync.

 

I fix this by splitting statement with the use of sqlparse library 
([https://github.com/andialbrecht/sqlparse)] in a extended class of 
MySqlOperator :
{code:java}
from airflow.operators.mysql_operator import MySqlOperator
import sqlparse

class MySqlSplitOperator(MySqlOperator):
    
    def execute(self, context):
        if isinstance(self.sql, str):
            self.sql = [self.sql]
        splited_list = []
        for s in self.sql:
            splited_list.extend(sqlparse.split(s))
        self.sql = splited_list
        super().execute(context)
{code}
I do not know if your are interested I bring it back to main code in a PR.

Just ask if your are interested and tell me where to integrate it.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to