This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c91fcec50a Fix error file not found. tmp file is deleted before 
inserting rows to DB in VerticaToMySQLOperator bulk  (#44028)
8c91fcec50a is described below

commit 8c91fcec50ab18ddabaaf97d2461a32f8ce4fb44
Author: bareketamir <[email protected]>
AuthorDate: Tue Nov 19 23:26:59 2024 +0200

    Fix error file not found. tmp file is deleted before inserting rows to DB 
in VerticaToMySQLOperator bulk  (#44028)
    
    * Fix error file not found. tmp file is deleted before inserting rows to DB 
in VerticaToMySQLOperator bulk .
    
    * fix the mock_get_conn function , so it will really mock the data and will 
return empty results
    
    * fix ruff-format
    
    ---------
    
    Co-authored-by: Amir.Ba <[email protected]>
---
 .../providers/mysql/transfers/vertica_to_mysql.py  | 29 +++++++++++-----------
 .../tests/mysql/transfers/test_vertica_to_mysql.py | 26 +++++++++++--------
 2 files changed, 30 insertions(+), 25 deletions(-)

diff --git 
a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py 
b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
index ee821b38da4..e13ee7bddad 100644
--- a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
+++ b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
@@ -141,21 +141,20 @@ class VerticaToMySqlOperator(BaseOperator):
                     count += 1
 
                 tmpfile.flush()
-        self._run_preoperator(mysql)
-        try:
-            self.log.info("Bulk inserting rows into MySQL...")
-            with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as 
cursor:
-                cursor.execute(
-                    f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
-                    f"INTO TABLE {self.mysql_table} "
-                    f"LINES TERMINATED BY '\r\n' ({', 
'.join(selected_columns)})"
-                )
-                conn.commit()
-            tmpfile.close()
-            self.log.info("Inserted rows into MySQL %s", count)
-        except (MySQLdb.Error, MySQLdb.Warning):
-            self.log.info("Inserted rows into MySQL 0")
-            raise
+                self._run_preoperator(mysql)
+                try:
+                    self.log.info("Bulk inserting rows into MySQL...")
+                    with closing(mysql.get_conn()) as conn, 
closing(conn.cursor()) as cursor:
+                        cursor.execute(
+                            f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
+                            f"INTO TABLE {self.mysql_table} "
+                            f"LINES TERMINATED BY '\r\n' ({', 
'.join(selected_columns)})"
+                        )
+                        conn.commit()
+                    self.log.info("Inserted rows into MySQL %s", count)
+                except (MySQLdb.Error, MySQLdb.Warning):
+                    self.log.info("Inserted rows into MySQL 0")
+                    raise
 
     def _run_preoperator(self, mysql):
         if self.mysql_preoperator:
diff --git a/providers/tests/mysql/transfers/test_vertica_to_mysql.py 
b/providers/tests/mysql/transfers/test_vertica_to_mysql.py
index 7656a036449..da43d38a74e 100644
--- a/providers/tests/mysql/transfers/test_vertica_to_mysql.py
+++ b/providers/tests/mysql/transfers/test_vertica_to_mysql.py
@@ -31,17 +31,23 @@ except ImportError:
 
 
 def mock_get_conn():
+    class MockCol:
+        def __init__(self, name):
+            self.name = name
+
+    col_a = MockCol(name="a")
+    col_b = MockCol(name="b")
+    col_c = MockCol(name="c")
+
     commit_mock = mock.MagicMock()
-    cursor_mock = mock.MagicMock(
-        execute=[],
-        fetchall=[["1", "2", "3"]],
-        description=["a", "b", "c"],
-        iterate=[["1", "2", "3"]],
-    )
-    conn_mock = mock.MagicMock(
-        commit=commit_mock,
-        cursor=cursor_mock,
-    )
+    cursor_mock = mock.MagicMock(description=[col_a, col_b, col_c])
+    cursor_mock.execute.return_value = []
+    cursor_mock.fetchall.return_value = [["1", "2", "3"]]
+    cursor_mock.iterate.return_value = [["1", "2", "3"]]
+    conn_mock = mock.MagicMock()
+    conn_mock.commit.return_value = commit_mock
+    conn_mock.cursor.return_value = cursor_mock
+
     return conn_mock
 
 

Reply via email to