[
https://issues.apache.org/jira/browse/AIRFLOW-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653216#comment-16653216
]
jack commented on AIRFLOW-2136:
-------------------------------
You should submit your code with PR. If there are notes on this you will see it
on the review by the committers.
> MySqlOperator does not comply well with sql scripts with multiple statements
> with MySQL warnings
> ------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-2136
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2136
> Project: Apache Airflow
> Issue Type: Bug
> Components: core
> Affects Versions: 1.10.0
> Reporter: Fabrice Dossin
> Priority: Major
>
> Hello,
> Take a simple sql script that will produce warnings :
>
> {code:java}
> CREATE TABLE IF NOT EXISTS test_table (a BIGINT NOT NULL PRIMARY KEY);
> CREATE TABLE IF NOT EXISTS test_table2 (b BIGINT NOT NULL PRIMARY KEY);
> CREATE TABLE IF NOT EXISTS test_table3 (b BIGINT NOT NULL PRIMARY KEY);
> {code}
> On the second execution with MySqlOperator, it fails because of the warnings.
>
>
> {noformat}
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1514,
> in _run_raw_task
> result = task_copy.execute(context=context)
> File
> "/usr/local/lib/python3.6/dist-packages/airflow/operators/mysql_operator.py",
> line 55, in execute
> parameters=self.parameters)
> File "/usr/local/lib/python3.6/dist-packages/airflow/hooks/dbapi_hook.py",
> line 165, in run
> cur.execute(s)
> File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
> self.thing.close()
> File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 84,
> in close
> while self.nextset():
> File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 177,
> in nextset
> self._warning_check()
> File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 148,
> in _warning_check
> warnings = db.show_warnings()
> File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line
> 381, in show_warnings
> self.query("SHOW WARNINGS")
> File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line
> 277, in query
> _mysql.connection.query(self, query)
> _mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't
> run this command now"){noformat}
> The truth is that mysqldb cursor is made for the execution of a list of
> statements while the use of a SQL file as input is sent as a single string. I
> guess warnings during the script are read by mysqldb and we cannot do some
> fetchall between statements so it fails with the common out of sync.
>
> I fix this by splitting statement with the use of sqlparse library
> ([https://github.com/andialbrecht/sqlparse)] in a extended class of
> MySqlOperator :
> {code:java}
> from airflow.operators.mysql_operator import MySqlOperator
> import sqlparse
> class MySqlSplitOperator(MySqlOperator):
>
> def execute(self, context):
> if isinstance(self.sql, str):
> self.sql = [self.sql]
> splited_list = []
> for s in self.sql:
> splited_list.extend(sqlparse.split(s))
> self.sql = splited_list
> super().execute(context)
> {code}
> I do not know if your are interested I bring it back to main code in a PR.
> Just ask if your are interested and tell me where to integrate it.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)