uranusjr commented on code in PR #38715:
URL: https://github.com/apache/airflow/pull/38715#discussion_r1561554648


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -550,47 +557,48 @@ def insert_rows(
         :param commit_every: The maximum number of rows to insert in one
             transaction. Set to 0 to insert all rows in one transaction.
         :param replace: Whether to replace instead of insert
-        :param executemany: Insert all rows at once in chunks defined by the 
commit_every parameter, only
-            works if all rows have same number of column names but leads to 
better performance
+        :param executemany: (Deprecated) If True, all rows are inserted at 
once in
+            chunks defined by the commit_every parameter. This only works if 
all rows
+            have same number of column names, but leads to better performance.
         """
-        i = 0
-        with closing(self.get_conn()) as conn:
-            if self.supports_autocommit:
-                self.set_autocommit(conn, False)
+        if executemany:
+            warnings.warn(
+                "executemany parameter is deprecated, override 
supports_executemany instead.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
 
+        with self._create_autocommit_connection() as conn:
             conn.commit()
-
             with closing(conn.cursor()) as cur:
-                if executemany:
+                if self.supports_executemany or executemany:
                     for chunked_rows in chunked(rows, commit_every):
                         values = list(
                             map(
-                                lambda row: tuple(map(lambda cell: 
self._serialize_cell(cell, conn), row)),
+                                lambda row: self._serialize_cells(row, conn),
                                 chunked_rows,
                             )
                         )
                         sql = self._generate_insert_sql(table, values[0], 
target_fields, replace, **kwargs)
                         self.log.debug("Generated sql: %s", sql)
-                        cur.fast_executemany = True
                         cur.executemany(sql, values)
                         conn.commit()
                         self.log.info("Loaded %s rows into %s so far", 
len(chunked_rows), table)
                 else:
                     for i, row in enumerate(rows, 1):
-                        lst = []
-                        for cell in row:
-                            lst.append(self._serialize_cell(cell, conn))
-                        values = tuple(lst)
+                        values = self._serialize_cells(row, conn)
                         sql = self._generate_insert_sql(table, values, 
target_fields, replace, **kwargs)
                         self.log.debug("Generated sql: %s", sql)
                         cur.execute(sql, values)
                         if commit_every and i % commit_every == 0:
                             conn.commit()
                             self.log.info("Loaded %s rows into %s so far", i, 
table)
+                    conn.commit()
+        self.log.info("Done loading. Loaded a total of %s rows into %s", 
len(rows), table)
 
-            if not executemany:
-                conn.commit()
-        self.log.info("Done loading. Loaded a total of %s rows into %s", i, 
table)
+    @classmethod
+    def _serialize_cells(cls, row, conn=None):
+        return tuple(map(lambda cell: cls._serialize_cell(cell, conn), row))

Review Comment:
   ```suggestion
           return tuple(cls._serialize_cell(cell, conn) for cell in row)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to