Fabrice Dossin created AIRFLOW-2136:
---------------------------------------
Summary: MySqlOperator does not comply well with sql scripts with
multiple statements with MySQL warnings
Key: AIRFLOW-2136
URL: https://issues.apache.org/jira/browse/AIRFLOW-2136
Project: Apache Airflow
Issue Type: Bug
Components: core
Affects Versions: 1.10.0
Reporter: Fabrice Dossin
Hello,
Take a simple sql script that will produce warnings :
{code:java}
CREATE TABLE IF NOT EXISTS test_table (a BIGINT NOT NULL PRIMARY KEY);
CREATE TABLE IF NOT EXISTS test_table2 (b BIGINT NOT NULL PRIMARY KEY);
CREATE TABLE IF NOT EXISTS test_table3 (b BIGINT NOT NULL PRIMARY KEY);
{code}
On the second execution with MySqlOperator, it fails because of the warnings.
{noformat}
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1514,
in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.6/dist-packages/airflow/operators/mysql_operator.py",
line 55, in execute
parameters=self.parameters)
File "/usr/local/lib/python3.6/dist-packages/airflow/hooks/dbapi_hook.py",
line 165, in run
cur.execute(s)
File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
self.thing.close()
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 84, in
close
while self.nextset():
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 177,
in nextset
self._warning_check()
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 148,
in _warning_check
warnings = db.show_warnings()
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line
381, in show_warnings
self.query("SHOW WARNINGS")
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line
277, in query
_mysql.connection.query(self, query)
_mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run
this command now"){noformat}
The truth is that mysqldb cursor is made for the execution of a list of
statements while the use of a SQL file as input is sent as a single string. I
guess warnings during the script are read by mysqldb and we cannot do some
fetchall between statements so it fails with the common out of sync.
I fix this by splitting statement with the use of sqlparse library
([https://github.com/andialbrecht/sqlparse)] in a extended class of
MySqlOperator :
{code:java}
from airflow.operators.mysql_operator import MySqlOperator
import sqlparse
class MySqlSplitOperator(MySqlOperator):
def execute(self, context):
if isinstance(self.sql, str):
self.sql = [self.sql]
splited_list = []
for s in self.sql:
splited_list.extend(sqlparse.split(s))
self.sql = splited_list
super().execute(context)
{code}
I do not know if your are interested I bring it back to main code in a PR.
Just ask if your are interested and tell me where to integrate it.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)