This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8c91fcec50a Fix error file not found. tmp file is deleted before
inserting rows to DB in VerticaToMySQLOperator bulk (#44028)
8c91fcec50a is described below
commit 8c91fcec50ab18ddabaaf97d2461a32f8ce4fb44
Author: bareketamir <[email protected]>
AuthorDate: Tue Nov 19 23:26:59 2024 +0200
Fix error file not found. tmp file is deleted before inserting rows to DB
in VerticaToMySQLOperator bulk (#44028)
* Fix error file not found. tmp file is deleted before inserting rows to DB
in VerticaToMySQLOperator bulk .
* fix the mock_get_conn function , so it will really mock the data and will
return empty results
* fix ruff-format
---------
Co-authored-by: Amir.Ba <[email protected]>
---
.../providers/mysql/transfers/vertica_to_mysql.py | 29 +++++++++++-----------
.../tests/mysql/transfers/test_vertica_to_mysql.py | 26 +++++++++++--------
2 files changed, 30 insertions(+), 25 deletions(-)
diff --git
a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
index ee821b38da4..e13ee7bddad 100644
--- a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
+++ b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
@@ -141,21 +141,20 @@ class VerticaToMySqlOperator(BaseOperator):
count += 1
tmpfile.flush()
- self._run_preoperator(mysql)
- try:
- self.log.info("Bulk inserting rows into MySQL...")
- with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as
cursor:
- cursor.execute(
- f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
- f"INTO TABLE {self.mysql_table} "
- f"LINES TERMINATED BY '\r\n' ({',
'.join(selected_columns)})"
- )
- conn.commit()
- tmpfile.close()
- self.log.info("Inserted rows into MySQL %s", count)
- except (MySQLdb.Error, MySQLdb.Warning):
- self.log.info("Inserted rows into MySQL 0")
- raise
+ self._run_preoperator(mysql)
+ try:
+ self.log.info("Bulk inserting rows into MySQL...")
+ with closing(mysql.get_conn()) as conn,
closing(conn.cursor()) as cursor:
+ cursor.execute(
+ f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
+ f"INTO TABLE {self.mysql_table} "
+ f"LINES TERMINATED BY '\r\n' ({',
'.join(selected_columns)})"
+ )
+ conn.commit()
+ self.log.info("Inserted rows into MySQL %s", count)
+ except (MySQLdb.Error, MySQLdb.Warning):
+ self.log.info("Inserted rows into MySQL 0")
+ raise
def _run_preoperator(self, mysql):
if self.mysql_preoperator:
diff --git a/providers/tests/mysql/transfers/test_vertica_to_mysql.py
b/providers/tests/mysql/transfers/test_vertica_to_mysql.py
index 7656a036449..da43d38a74e 100644
--- a/providers/tests/mysql/transfers/test_vertica_to_mysql.py
+++ b/providers/tests/mysql/transfers/test_vertica_to_mysql.py
@@ -31,17 +31,23 @@ except ImportError:
def mock_get_conn():
+ class MockCol:
+ def __init__(self, name):
+ self.name = name
+
+ col_a = MockCol(name="a")
+ col_b = MockCol(name="b")
+ col_c = MockCol(name="c")
+
commit_mock = mock.MagicMock()
- cursor_mock = mock.MagicMock(
- execute=[],
- fetchall=[["1", "2", "3"]],
- description=["a", "b", "c"],
- iterate=[["1", "2", "3"]],
- )
- conn_mock = mock.MagicMock(
- commit=commit_mock,
- cursor=cursor_mock,
- )
+ cursor_mock = mock.MagicMock(description=[col_a, col_b, col_c])
+ cursor_mock.execute.return_value = []
+ cursor_mock.fetchall.return_value = [["1", "2", "3"]]
+ cursor_mock.iterate.return_value = [["1", "2", "3"]]
+ conn_mock = mock.MagicMock()
+ conn_mock.commit.return_value = commit_mock
+ conn_mock.cursor.return_value = cursor_mock
+
return conn_mock