This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 155124dde2b fix: Improve logging and timeouts in OL helpers (#53139)
155124dde2b is described below

commit 155124dde2b14d00d6d59e3e6ad5d23431bf1ba6
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Jul 10 21:26:59 2025 +0200

    fix: Improve logging and timeouts in OL helpers (#53139)
---
 .../providers/databricks/utils/openlineage.py      | 31 ++++++++++------------
 .../unit/databricks/utils/test_openlineage.py      | 22 ++++++++++-----
 .../providers/snowflake/utils/openlineage.py       | 14 ++++++++--
 .../tests/unit/snowflake/utils/test_openlineage.py | 22 ++++++++-------
 4 files changed, 54 insertions(+), 35 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py 
b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
index 57c964d9842..50df7efd0e8 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
@@ -124,22 +124,15 @@ def _get_parent_run_facet(task_instance):
 
 def _run_api_call(hook: DatabricksSqlHook | DatabricksHook, query_ids: 
list[str]) -> list[dict]:
     """Retrieve execution details for specific queries from Databricks's query 
history API."""
-    try:
-        token = hook._get_token(raise_error=True)
-        # https://docs.databricks.com/api/azure/workspace/queryhistory/list
-        response = requests.get(
-            url=f"https://{hook.host}/api/2.0/sql/history/queries";,
-            headers={"Authorization": f"Bearer {token}"},
-            data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
-            timeout=2,
-        )
-        response.raise_for_status()
-    except Exception as e:
-        log.warning(
-            "OpenLineage could not retrieve Databricks queries details. Error 
received: `%s`.",
-            e,
-        )
-        return []
+    token = hook._get_token(raise_error=True)
+    # https://docs.databricks.com/api/azure/workspace/queryhistory/list
+    response = requests.get(
+        url=f"https://{hook.host}/api/2.0/sql/history/queries";,
+        headers={"Authorization": f"Bearer {token}"},
+        data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
+        timeout=3,
+    )
+    response.raise_for_status()
 
     return response.json()["res"]
 
@@ -176,7 +169,11 @@ def _get_queries_details_from_databricks(
             if query_info["query_id"]
         }
     except Exception as e:
-        log.warning("OpenLineage could not retrieve extra metadata from 
Databricks. Error encountered: %s", e)
+        log.info(
+            "OpenLineage encountered an error while retrieving additional 
metadata about SQL queries"
+            " from Databricks. The process will continue with default values. 
Error details: %s",
+            e,
+        )
 
     return query_details
 
diff --git 
a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py 
b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
index 6d427e0ba77..24a8f96df25 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
@@ -121,9 +121,8 @@ def test_run_api_call_request_error():
     mock_response.status_code = 200
 
     with mock.patch("requests.get", side_effect=RuntimeError("request error")):
-        result = _run_api_call(mock_hook, ["123"])
-
-    assert result == []
+        with pytest.raises(RuntimeError):
+            _run_api_call(mock_hook, ["123"])
 
 
 def test_run_api_call_token_error():
@@ -135,9 +134,8 @@ def test_run_api_call_token_error():
     mock_response.status_code = 200
 
     with mock.patch("requests.get", return_value=mock_response):
-        result = _run_api_call(mock_hook, ["123"])
-
-    assert result == []
+        with pytest.raises(RuntimeError):
+            _run_api_call(mock_hook, ["123"])
 
 
 def test_process_data_from_api():
@@ -194,6 +192,18 @@ def 
test_get_queries_details_from_databricks_empty_query_ids():
     assert details == {}
 
 
[email protected]("airflow.providers.databricks.utils.openlineage._run_api_call")
+def test_get_queries_details_from_databricks_error(mock_api_call):
+    mock_api_call.side_effect = RuntimeError("Token error")
+
+    hook = DatabricksSqlHook()
+    query_ids = ["ABC"]
+
+    details = _get_queries_details_from_databricks(hook, query_ids)
+    mock_api_call.assert_called_once_with(hook=hook, query_ids=query_ids)
+    assert details == {}
+
+
 @mock.patch("airflow.providers.databricks.utils.openlineage._run_api_call")
 def test_get_queries_details_from_databricks(mock_api_call):
     hook = DatabricksSqlHook()
diff --git 
a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py 
b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
index aa954c0444e..88c5caaa45e 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
@@ -199,6 +199,7 @@ def _run_single_query_with_hook(hook: SnowflakeHook, sql: 
str) -> list[dict]:
     with closing(hook.get_conn()) as conn:
         hook.set_autocommit(conn, False)
         with hook._get_cursor(conn, return_dictionaries=True) as cur:
+            cur.execute("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;") 
 # only for this session
             cur.execute(sql)
             result = cur.fetchall()
         conn.commit()
@@ -232,11 +233,16 @@ def _get_queries_details_from_snowflake(
     if not query_ids:
         return {}
     query_condition = f"IN {tuple(query_ids)}" if len(query_ids) > 1 else f"= 
'{query_ids[0]}'"
+    # 
https://docs.snowflake.com/en/sql-reference/account-usage#differences-between-account-usage-and-information-schema
+    # INFORMATION_SCHEMA.QUERY_HISTORY has no latency, so it's better than 
ACCOUNT_USAGE.QUERY_HISTORY
+    # https://docs.snowflake.com/en/sql-reference/functions/query_history
+    # SNOWFLAKE.INFORMATION_SCHEMA.QUERY_HISTORY() function seems the most 
suitable function for the job,
+    # we get history of queries executed by the user, and we're using the same 
credentials.
     query = (
         "SELECT "
         "QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
         "FROM "
-        "table(information_schema.query_history()) "
+        "table(snowflake.information_schema.query_history()) "
         f"WHERE "
         f"QUERY_ID {query_condition}"
         f";"
@@ -250,7 +256,11 @@ def _get_queries_details_from_snowflake(
         else:
             result = _run_single_query_with_hook(hook=hook, sql=query)
     except Exception as e:
-        log.warning("OpenLineage could not retrieve extra metadata from 
Snowflake. Error encountered: %s", e)
+        log.info(
+            "OpenLineage encountered an error while retrieving additional 
metadata about SQL queries"
+            " from Snowflake. The process will continue with default values. 
Error details: %s",
+            e,
+        )
         result = []
 
     return {row["QUERY_ID"]: row for row in result} if result else {}
diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py 
b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
index ec83cabc4fb..dd9820de21c 100644
--- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
+++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
@@ -221,7 +221,9 @@ def test_run_single_query_with_hook(mock_get_cursor, 
mock_set_autocommit, mock_g
     sql_query = "SELECT * FROM test_table;"
     result = _run_single_query_with_hook(hook, sql_query)
 
-    mock_cursor.execute.assert_called_once_with(sql_query)
+    mock_cursor.execute.assert_has_calls(
+        [mock.call("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;"), 
mock.call(sql_query)]
+    )
     assert result == [{"col1": "value1"}, {"col2": "value2"}]
 
 
@@ -302,7 +304,7 @@ def 
test_get_queries_details_from_snowflake_single_query(mock_run_single_query):
     details = _get_queries_details_from_snowflake(hook, query_ids)
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         "WHERE QUERY_ID = 'ABC';"
     )
     mock_run_single_query.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -330,7 +332,7 @@ def 
test_get_queries_details_from_snowflake_single_query_api_hook(mock_run_singl
 
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         "WHERE QUERY_ID = 'ABC';"
     )
     expected_details = {
@@ -377,7 +379,7 @@ def 
test_get_queries_details_from_snowflake_multiple_queries(mock_run_single_que
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -415,7 +417,7 @@ def 
test_get_queries_details_from_snowflake_multiple_queries_api_hook(mock_run_s
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     expected_details = [
@@ -453,7 +455,7 @@ def 
test_get_queries_details_from_snowflake_no_data_found(mock_run_single_query)
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -471,7 +473,7 @@ def 
test_get_queries_details_from_snowflake_no_data_found_api_hook(mock_run_sing
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query_api.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -489,7 +491,7 @@ def 
test_get_queries_details_from_snowflake_error(mock_run_single_query):
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -507,7 +509,7 @@ def 
test_get_queries_details_from_snowflake_error_api_hook(mock_run_single_query
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query_api.assert_called_once_with(hook=hook, 
sql=expected_query)
@@ -529,7 +531,7 @@ def 
test_get_queries_details_from_snowflake_error_api_hook_process_data(
     expected_query_condition = f"IN {tuple(query_ids)}"
     expected_query = (
         "SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, 
ERROR_CODE, ERROR_MESSAGE "
-        "FROM table(information_schema.query_history()) "
+        "FROM table(snowflake.information_schema.query_history()) "
         f"WHERE QUERY_ID {expected_query_condition};"
     )
     mock_run_single_query_api.assert_called_once_with(hook=hook, 
sql=expected_query)

Reply via email to