potiuk commented on code in PR #27264:
URL: https://github.com/apache/airflow/pull/27264#discussion_r1199761518
##########
airflow/providers/apache/hive/transfers/mysql_to_hive.py:
##########
@@ -131,36 +132,33 @@ def type_map(cls, mysql_type: int) -> str:
def execute(self, context: Context):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id,
auth=self.hive_auth)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
-
self.log.info("Dumping MySQL query results to local file")
- conn = mysql.get_conn()
- cursor = conn.cursor()
- cursor.execute(self.sql)
- with NamedTemporaryFile("wb") as f:
- csv_writer = csv.writer(
- f,
- delimiter=self.delimiter,
- quoting=self.quoting,
- quotechar=self.quotechar,
- escapechar=self.escapechar,
- encoding="utf-8",
- )
- field_dict = OrderedDict()
- if cursor.description is not None:
- for field in cursor.description:
- field_dict[field[0]] = self.type_map(field[1])
- csv_writer.writerows(cursor)
- f.flush()
- cursor.close()
- conn.close() # type: ignore[misc]
- self.log.info("Loading file into Hive")
- hive.load_file(
- f.name,
- self.hive_table,
- field_dict=field_dict,
- create=self.create,
- partition=self.partition,
- delimiter=self.delimiter,
- recreate=self.recreate,
- tblproperties=self.tblproperties,
- )
+ with closing(mysql.get_conn()) as conn:
+ with closing(conn.cursor()) as cursor:
+ cursor.execute(self.sql)
+ with NamedTemporaryFile("wb") as f:
+ csv_writer = csv.writer(
+ f,
+ delimiter=self.delimiter,
+ quoting=self.quoting,
+ quotechar=self.quotechar if self.quoting !=
csv.QUOTE_NONE else None,
Review Comment:
This was the error. A test testing it was only run with MySQL
(mysql_to_hive.py) and it passed empty string as quotechar, and it seems one of
the small, undocumented changes in Python 3.11 is that csv writer checks that
quotechar is single-character string or None even if quoting is set to
"csv.QUOTE_NONE" (so "" is not allowed to be passed as quotechar - and this is
what ws passed in the test.
I changed it in the way that we ignore quotechar (set it to None) when
quoting = csv.QUOTE_NONE.
Additionally - the reason why the tests were hanging was that in this case
cursors were not closed and there was another cursor in finally of the test
that tried to execute query using the same cursor - which caused the cleanup
execute SQL hang indefinitely.
```
finally:
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]