This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 67ad95ffcf refactor: Added 2 unit test with rows as iterator instead
of a list to assure it still works (#38986)
67ad95ffcf is described below
commit 67ad95ffcfbf6c8783dbc50117556875c2fad7e3
Author: David Blain <[email protected]>
AuthorDate: Sat Apr 13 17:26:38 2024 +0200
refactor: Added 2 unit test with rows as iterator instead of a list to
assure it still works (#38986)
Co-authored-by: David Blain <[email protected]>
---
tests/providers/common/sql/hooks/test_dbapi.py | 39 +++++++++++++++++++++++++-
1 file changed, 38 insertions(+), 1 deletion(-)
diff --git a/tests/providers/common/sql/hooks/test_dbapi.py
b/tests/providers/common/sql/hooks/test_dbapi.py
index 2c34ee133e..872b7b360b 100644
--- a/tests/providers/common/sql/hooks/test_dbapi.py
+++ b/tests/providers/common/sql/hooks/test_dbapi.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import json
+import logging
from unittest import mock
import pytest
@@ -49,7 +50,7 @@ class TestDbApiHook:
class DbApiHookMock(DbApiHook):
conn_name_attr = "test_conn_id"
- log = mock.MagicMock()
+ log = mock.MagicMock(spec=logging.Logger)
@classmethod
def get_connection(cls, conn_id: str) -> Connection:
@@ -61,6 +62,7 @@ class TestDbApiHook:
self.db_hook = DbApiHookMock(**kwargs)
self.db_hook_no_log_sql = DbApiHookMock(log_sql=False)
self.db_hook_schema_override = DbApiHookMock(schema="schema-override")
+ self.db_hook.supports_executemany = False
def test_get_records(self):
statement = "SQL"
@@ -191,6 +193,41 @@ class TestDbApiHook:
sql = f"UPSERT {table} VALUES (%s) WITH PRIMARY KEY"
self.cur.executemany.assert_any_call(sql, rows)
+ def test_insert_rows_as_generator(self):
+ table = "table"
+ rows = [("What's",), ("up",), ("world",)]
+
+ self.db_hook.insert_rows(table, iter(rows))
+
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ assert self.conn.commit.call_count == 2
+
+ sql = f"INSERT INTO {table} VALUES (%s)"
+
+ self.db_hook.log.debug.assert_called_with("Generated sql: %s", sql)
+ self.db_hook.log.info.assert_called_with("Done loading. Loaded a total
of %s rows into %s", 3, table)
+
+ for row in rows:
+ self.cur.execute.assert_any_call(sql, row)
+
+ def test_insert_rows_as_generator_supports_executemany(self):
+ table = "table"
+ rows = [("What's",), ("up",), ("world",)]
+
+ self.db_hook.supports_executemany = True
+ self.db_hook.insert_rows(table, iter(rows))
+
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ assert self.conn.commit.call_count == 2
+
+ sql = f"INSERT INTO {table} VALUES (%s)"
+
+ self.db_hook.log.debug.assert_called_with("Generated sql: %s", sql)
+ self.db_hook.log.info.assert_called_with("Done loading. Loaded a total
of %s rows into %s", 3, table)
+ self.cur.executemany.assert_any_call(sql, rows)
+
def test_get_uri_schema_not_none(self):
self.db_hook.get_connection = mock.MagicMock(
return_value=Connection(