This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c74d58e1b87 refactor: Log generated SQL-statement and passed 
parameters as ERROR message when an exception occurs during insert_rows (#48932)
c74d58e1b87 is described below

commit c74d58e1b876b70775015c6ee58af89a5c679f39
Author: David Blain <[email protected]>
AuthorDate: Fri Apr 18 14:02:22 2025 +0200

    refactor: Log generated SQL-statement and passed parameters as ERROR 
message when an exception occurs during insert_rows (#48932)
    
    Co-authored-by: David Blain <[email protected]>
---
 .../src/airflow/providers/common/sql/hooks/sql.py  | 18 +++++++++++++++--
 .../sql/tests/unit/common/sql/hooks/test_dbapi.py  | 23 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py 
b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
index 5eb1e1420b5..8f0c46d52a2 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
@@ -890,7 +890,14 @@ class DbApiHook(BaseHook):
                         )
                         sql = self._generate_insert_sql(table, values[0], 
target_fields, replace, **kwargs)
                         self.log.debug("Generated sql: %s", sql)
-                        cur.executemany(sql, values)
+
+                        try:
+                            cur.executemany(sql, values)
+                        except Exception as e:
+                            self.log.error("Generated sql: %s", sql)
+                            self.log.error("Parameters: %s", values)
+                            raise e
+
                         conn.commit()
                         nb_rows += len(chunked_rows)
                         self.log.info("Loaded %s rows into %s so far", 
nb_rows, table)
@@ -899,7 +906,14 @@ class DbApiHook(BaseHook):
                         values = self._serialize_cells(row, conn)
                         sql = self._generate_insert_sql(table, values, 
target_fields, replace, **kwargs)
                         self.log.debug("Generated sql: %s", sql)
-                        cur.execute(sql, values)
+
+                        try:
+                            cur.execute(sql, values)
+                        except Exception as e:
+                            self.log.error("Generated sql: %s", sql)
+                            self.log.error("Parameters: %s", values)
+                            raise e
+
                         if commit_every and i % commit_every == 0:
                             conn.commit()
                             self.log.info("Loaded %s rows into %s so far", i, 
table)
diff --git a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py 
b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
index e54fe98e7df..6cccd66b35d 100644
--- a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
+++ b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
@@ -267,6 +267,29 @@ class TestDbApiHook:
 
         self.cur.executemany.assert_any_call(sql, rows)
 
+    def test_insert_rows_logs_generated_sql_on_exception(self, caplog):
+        table = "table"
+        rows = [("What's",), ("up",), ("world",)]
+
+        with caplog.at_level(logging.ERROR):
+            self.cur.executemany.side_effect = Exception("Boom!")
+            self.db_hook.supports_executemany = True
+
+            with pytest.raises(Exception, match="Boom!"):
+                self.db_hook.insert_rows(table, iter(rows))
+
+        assert self.conn.close.call_count == 1
+        assert self.cur.close.call_count == 1
+        assert self.conn.commit.call_count == 1
+
+        sql = f"INSERT INTO {table}  VALUES (%s)"
+
+        assert len(caplog.messages) == 2
+        assert any(f"Generated sql: {sql}" in message for message in 
caplog.messages)
+        assert any(f"Parameters: {rows}" in message for message in 
caplog.messages)
+
+        self.cur.executemany.assert_any_call(sql, rows)
+
     def test_get_uri_schema_not_none(self):
         self.db_hook.get_connection = mock.MagicMock(
             return_value=Connection(

Reply via email to