This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c394e3c85d Feature: Added fast_executemany parameter to insert_rows 
of DbApiHook (#43357)
2c394e3c85d is described below

commit 2c394e3c85d77a3a0331687186dfcee89e286035
Author: David Blain <[email protected]>
AuthorDate: Fri Oct 25 18:41:17 2024 +0200

    Feature: Added fast_executemany parameter to insert_rows of DbApiHook 
(#43357)
    
    * refactor: Added the fast_executemany parameter to the insert_rows method 
of the DbApiHook
    
    * refactor: Added unit test using the fast_executemany parameter in the 
DbApiHook
    
    * docs: Put fast_executemany and executemany between single quotes to avoid 
spelling check
    
    ---------
    
    Co-authored-by: David Blain <[email protected]>
---
 providers/src/airflow/providers/common/sql/hooks/sql.py | 12 ++++++++++++
 providers/tests/common/sql/hooks/test_dbapi.py          | 16 ++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/providers/src/airflow/providers/common/sql/hooks/sql.py 
b/providers/src/airflow/providers/common/sql/hooks/sql.py
index afb66ddd13a..60c659e340f 100644
--- a/providers/src/airflow/providers/common/sql/hooks/sql.py
+++ b/providers/src/airflow/providers/common/sql/hooks/sql.py
@@ -620,6 +620,7 @@ class DbApiHook(BaseHook):
         replace=False,
         *,
         executemany=False,
+        fast_executemany=False,
         autocommit=False,
         **kwargs,
     ):
@@ -638,6 +639,8 @@ class DbApiHook(BaseHook):
         :param executemany: If True, all rows are inserted at once in
             chunks defined by the commit_every parameter. This only works if 
all rows
             have same number of column names, but leads to better performance.
+        :param fast_executemany: If True, the `fast_executemany` parameter 
will be set on the
+            cursor used by `executemany` which leads to better performance, if 
supported by driver.
         :param autocommit: What to set the connection's autocommit setting to
             before executing the query.
         """
@@ -646,6 +649,15 @@ class DbApiHook(BaseHook):
             conn.commit()
             with closing(conn.cursor()) as cur:
                 if self.supports_executemany or executemany:
+                    if fast_executemany:
+                        with contextlib.suppress(AttributeError):
+                            # Try to set the fast_executemany attribute
+                            cur.fast_executemany = True
+                            self.log.info(
+                                "Fast_executemany is enabled for conn_id 
'%s'!",
+                                self.get_conn_id(),
+                            )
+
                     for chunked_rows in chunked(rows, commit_every):
                         values = list(
                             map(
diff --git a/providers/tests/common/sql/hooks/test_dbapi.py 
b/providers/tests/common/sql/hooks/test_dbapi.py
index d94b817eeeb..a47b2856eb2 100644
--- a/providers/tests/common/sql/hooks/test_dbapi.py
+++ b/providers/tests/common/sql/hooks/test_dbapi.py
@@ -49,6 +49,7 @@ class TestDbApiHook:
     def setup_method(self, **kwargs):
         self.cur = mock.MagicMock(
             rowcount=0,
+            fast_executemany=False,
             spec=Cursor,
         )
         self.conn = mock.MagicMock()
@@ -188,6 +189,21 @@ class TestDbApiHook:
         self.db_hook.insert_rows(table, rows, executemany=True)
 
         assert self.conn.close.call_count == 1
+        assert not self.cur.fast_executemany
+        assert self.cur.close.call_count == 1
+        assert self.conn.commit.call_count == 2
+
+        sql = f"INSERT INTO {table}  VALUES (%s)"
+        self.cur.executemany.assert_any_call(sql, rows)
+
+    def test_insert_rows_fast_executemany(self):
+        table = "table"
+        rows = [("hello",), ("world",)]
+
+        self.db_hook.insert_rows(table, rows, executemany=True, 
fast_executemany=True)
+
+        assert self.conn.close.call_count == 1
+        assert self.cur.fast_executemany
         assert self.cur.close.call_count == 1
         assert self.conn.commit.call_count == 2
 

Reply via email to