This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c74d58e1b87 refactor: Log generated SQL-statement and passed
parameters as ERROR message when an exception occurs during insert_rows (#48932)
c74d58e1b87 is described below
commit c74d58e1b876b70775015c6ee58af89a5c679f39
Author: David Blain <[email protected]>
AuthorDate: Fri Apr 18 14:02:22 2025 +0200
refactor: Log generated SQL-statement and passed parameters as ERROR
message when an exception occurs during insert_rows (#48932)
Co-authored-by: David Blain <[email protected]>
---
.../src/airflow/providers/common/sql/hooks/sql.py | 18 +++++++++++++++--
.../sql/tests/unit/common/sql/hooks/test_dbapi.py | 23 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
index 5eb1e1420b5..8f0c46d52a2 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
@@ -890,7 +890,14 @@ class DbApiHook(BaseHook):
)
sql = self._generate_insert_sql(table, values[0],
target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
- cur.executemany(sql, values)
+
+ try:
+ cur.executemany(sql, values)
+ except Exception as e:
+ self.log.error("Generated sql: %s", sql)
+ self.log.error("Parameters: %s", values)
+ raise e
+
conn.commit()
nb_rows += len(chunked_rows)
self.log.info("Loaded %s rows into %s so far",
nb_rows, table)
@@ -899,7 +906,14 @@ class DbApiHook(BaseHook):
values = self._serialize_cells(row, conn)
sql = self._generate_insert_sql(table, values,
target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
- cur.execute(sql, values)
+
+ try:
+ cur.execute(sql, values)
+ except Exception as e:
+ self.log.error("Generated sql: %s", sql)
+ self.log.error("Parameters: %s", values)
+ raise e
+
if commit_every and i % commit_every == 0:
conn.commit()
self.log.info("Loaded %s rows into %s so far", i,
table)
diff --git a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
index e54fe98e7df..6cccd66b35d 100644
--- a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
+++ b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
@@ -267,6 +267,29 @@ class TestDbApiHook:
self.cur.executemany.assert_any_call(sql, rows)
+ def test_insert_rows_logs_generated_sql_on_exception(self, caplog):
+ table = "table"
+ rows = [("What's",), ("up",), ("world",)]
+
+ with caplog.at_level(logging.ERROR):
+ self.cur.executemany.side_effect = Exception("Boom!")
+ self.db_hook.supports_executemany = True
+
+ with pytest.raises(Exception, match="Boom!"):
+ self.db_hook.insert_rows(table, iter(rows))
+
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ assert self.conn.commit.call_count == 1
+
+ sql = f"INSERT INTO {table} VALUES (%s)"
+
+ assert len(caplog.messages) == 2
+ assert any(f"Generated sql: {sql}" in message for message in
caplog.messages)
+ assert any(f"Parameters: {rows}" in message for message in
caplog.messages)
+
+ self.cur.executemany.assert_any_call(sql, rows)
+
def test_get_uri_schema_not_none(self):
self.db_hook.get_connection = mock.MagicMock(
return_value=Connection(