This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2c394e3c85d Feature: Added fast_executemany parameter to insert_rows
of DbApiHook (#43357)
2c394e3c85d is described below
commit 2c394e3c85d77a3a0331687186dfcee89e286035
Author: David Blain <[email protected]>
AuthorDate: Fri Oct 25 18:41:17 2024 +0200
Feature: Added fast_executemany parameter to insert_rows of DbApiHook
(#43357)
* refactor: Added the fast_executemany parameter to the insert_rows method
of the DbApiHook
* refactor: Added unit test using the fast_executemany parameter in the
DbApiHook
* docs: Put fast_executemany and executemany between single quotes to avoid
spelling check
---------
Co-authored-by: David Blain <[email protected]>
---
providers/src/airflow/providers/common/sql/hooks/sql.py | 12 ++++++++++++
providers/tests/common/sql/hooks/test_dbapi.py | 16 ++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/providers/src/airflow/providers/common/sql/hooks/sql.py
b/providers/src/airflow/providers/common/sql/hooks/sql.py
index afb66ddd13a..60c659e340f 100644
--- a/providers/src/airflow/providers/common/sql/hooks/sql.py
+++ b/providers/src/airflow/providers/common/sql/hooks/sql.py
@@ -620,6 +620,7 @@ class DbApiHook(BaseHook):
replace=False,
*,
executemany=False,
+ fast_executemany=False,
autocommit=False,
**kwargs,
):
@@ -638,6 +639,8 @@ class DbApiHook(BaseHook):
:param executemany: If True, all rows are inserted at once in
chunks defined by the commit_every parameter. This only works if
all rows
have same number of column names, but leads to better performance.
+ :param fast_executemany: If True, the `fast_executemany` parameter
will be set on the
+ cursor used by `executemany` which leads to better performance, if
supported by driver.
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
"""
@@ -646,6 +649,15 @@ class DbApiHook(BaseHook):
conn.commit()
with closing(conn.cursor()) as cur:
if self.supports_executemany or executemany:
+ if fast_executemany:
+ with contextlib.suppress(AttributeError):
+ # Try to set the fast_executemany attribute
+ cur.fast_executemany = True
+ self.log.info(
+ "Fast_executemany is enabled for conn_id
'%s'!",
+ self.get_conn_id(),
+ )
+
for chunked_rows in chunked(rows, commit_every):
values = list(
map(
diff --git a/providers/tests/common/sql/hooks/test_dbapi.py
b/providers/tests/common/sql/hooks/test_dbapi.py
index d94b817eeeb..a47b2856eb2 100644
--- a/providers/tests/common/sql/hooks/test_dbapi.py
+++ b/providers/tests/common/sql/hooks/test_dbapi.py
@@ -49,6 +49,7 @@ class TestDbApiHook:
def setup_method(self, **kwargs):
self.cur = mock.MagicMock(
rowcount=0,
+ fast_executemany=False,
spec=Cursor,
)
self.conn = mock.MagicMock()
@@ -188,6 +189,21 @@ class TestDbApiHook:
self.db_hook.insert_rows(table, rows, executemany=True)
assert self.conn.close.call_count == 1
+ assert not self.cur.fast_executemany
+ assert self.cur.close.call_count == 1
+ assert self.conn.commit.call_count == 2
+
+ sql = f"INSERT INTO {table} VALUES (%s)"
+ self.cur.executemany.assert_any_call(sql, rows)
+
+ def test_insert_rows_fast_executemany(self):
+ table = "table"
+ rows = [("hello",), ("world",)]
+
+ self.db_hook.insert_rows(table, rows, executemany=True,
fast_executemany=True)
+
+ assert self.conn.close.call_count == 1
+ assert self.cur.fast_executemany
assert self.cur.close.call_count == 1
assert self.conn.commit.call_count == 2