This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 155124dde2b fix: Improve logging and timeouts in OL helpers (#53139)
155124dde2b is described below
commit 155124dde2b14d00d6d59e3e6ad5d23431bf1ba6
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Jul 10 21:26:59 2025 +0200
fix: Improve logging and timeouts in OL helpers (#53139)
---
.../providers/databricks/utils/openlineage.py | 31 ++++++++++------------
.../unit/databricks/utils/test_openlineage.py | 22 ++++++++++-----
.../providers/snowflake/utils/openlineage.py | 14 ++++++++--
.../tests/unit/snowflake/utils/test_openlineage.py | 22 ++++++++-------
4 files changed, 54 insertions(+), 35 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
index 57c964d9842..50df7efd0e8 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
@@ -124,22 +124,15 @@ def _get_parent_run_facet(task_instance):
def _run_api_call(hook: DatabricksSqlHook | DatabricksHook, query_ids:
list[str]) -> list[dict]:
"""Retrieve execution details for specific queries from Databricks's query
history API."""
- try:
- token = hook._get_token(raise_error=True)
- # https://docs.databricks.com/api/azure/workspace/queryhistory/list
- response = requests.get(
- url=f"https://{hook.host}/api/2.0/sql/history/queries",
- headers={"Authorization": f"Bearer {token}"},
- data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
- timeout=2,
- )
- response.raise_for_status()
- except Exception as e:
- log.warning(
- "OpenLineage could not retrieve Databricks queries details. Error
received: `%s`.",
- e,
- )
- return []
+ token = hook._get_token(raise_error=True)
+ # https://docs.databricks.com/api/azure/workspace/queryhistory/list
+ response = requests.get(
+ url=f"https://{hook.host}/api/2.0/sql/history/queries",
+ headers={"Authorization": f"Bearer {token}"},
+ data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
+ timeout=3,
+ )
+ response.raise_for_status()
return response.json()["res"]
@@ -176,7 +169,11 @@ def _get_queries_details_from_databricks(
if query_info["query_id"]
}
except Exception as e:
- log.warning("OpenLineage could not retrieve extra metadata from
Databricks. Error encountered: %s", e)
+ log.info(
+ "OpenLineage encountered an error while retrieving additional
metadata about SQL queries"
+ " from Databricks. The process will continue with default values.
Error details: %s",
+ e,
+ )
return query_details
diff --git
a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
index 6d427e0ba77..24a8f96df25 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
@@ -121,9 +121,8 @@ def test_run_api_call_request_error():
mock_response.status_code = 200
with mock.patch("requests.get", side_effect=RuntimeError("request error")):
- result = _run_api_call(mock_hook, ["123"])
-
- assert result == []
+ with pytest.raises(RuntimeError):
+ _run_api_call(mock_hook, ["123"])
def test_run_api_call_token_error():
@@ -135,9 +134,8 @@ def test_run_api_call_token_error():
mock_response.status_code = 200
with mock.patch("requests.get", return_value=mock_response):
- result = _run_api_call(mock_hook, ["123"])
-
- assert result == []
+ with pytest.raises(RuntimeError):
+ _run_api_call(mock_hook, ["123"])
def test_process_data_from_api():
@@ -194,6 +192,18 @@ def
test_get_queries_details_from_databricks_empty_query_ids():
assert details == {}
[email protected]("airflow.providers.databricks.utils.openlineage._run_api_call")
+def test_get_queries_details_from_databricks_error(mock_api_call):
+ mock_api_call.side_effect = RuntimeError("Token error")
+
+ hook = DatabricksSqlHook()
+ query_ids = ["ABC"]
+
+ details = _get_queries_details_from_databricks(hook, query_ids)
+ mock_api_call.assert_called_once_with(hook=hook, query_ids=query_ids)
+ assert details == {}
+
+
@mock.patch("airflow.providers.databricks.utils.openlineage._run_api_call")
def test_get_queries_details_from_databricks(mock_api_call):
hook = DatabricksSqlHook()
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
index aa954c0444e..88c5caaa45e 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
@@ -199,6 +199,7 @@ def _run_single_query_with_hook(hook: SnowflakeHook, sql:
str) -> list[dict]:
with closing(hook.get_conn()) as conn:
hook.set_autocommit(conn, False)
with hook._get_cursor(conn, return_dictionaries=True) as cur:
+ cur.execute("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;")
# only for this session
cur.execute(sql)
result = cur.fetchall()
conn.commit()
@@ -232,11 +233,16 @@ def _get_queries_details_from_snowflake(
if not query_ids:
return {}
query_condition = f"IN {tuple(query_ids)}" if len(query_ids) > 1 else f"=
'{query_ids[0]}'"
+ #
https://docs.snowflake.com/en/sql-reference/account-usage#differences-between-account-usage-and-information-schema
+ # INFORMATION_SCHEMA.QUERY_HISTORY has no latency, so it's better than
ACCOUNT_USAGE.QUERY_HISTORY
+ # https://docs.snowflake.com/en/sql-reference/functions/query_history
+ # SNOWFLAKE.INFORMATION_SCHEMA.QUERY_HISTORY() function seems the most
suitable function for the job,
+ # we get history of queries executed by the user, and we're using the same
credentials.
query = (
"SELECT "
"QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
"FROM "
- "table(information_schema.query_history()) "
+ "table(snowflake.information_schema.query_history()) "
f"WHERE "
f"QUERY_ID {query_condition}"
f";"
@@ -250,7 +256,11 @@ def _get_queries_details_from_snowflake(
else:
result = _run_single_query_with_hook(hook=hook, sql=query)
except Exception as e:
- log.warning("OpenLineage could not retrieve extra metadata from
Snowflake. Error encountered: %s", e)
+ log.info(
+ "OpenLineage encountered an error while retrieving additional
metadata about SQL queries"
+ " from Snowflake. The process will continue with default values.
Error details: %s",
+ e,
+ )
result = []
return {row["QUERY_ID"]: row for row in result} if result else {}
diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
index ec83cabc4fb..dd9820de21c 100644
--- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
+++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
@@ -221,7 +221,9 @@ def test_run_single_query_with_hook(mock_get_cursor,
mock_set_autocommit, mock_g
sql_query = "SELECT * FROM test_table;"
result = _run_single_query_with_hook(hook, sql_query)
- mock_cursor.execute.assert_called_once_with(sql_query)
+ mock_cursor.execute.assert_has_calls(
+ [mock.call("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;"),
mock.call(sql_query)]
+ )
assert result == [{"col1": "value1"}, {"col2": "value2"}]
@@ -302,7 +304,7 @@ def
test_get_queries_details_from_snowflake_single_query(mock_run_single_query):
details = _get_queries_details_from_snowflake(hook, query_ids)
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
"WHERE QUERY_ID = 'ABC';"
)
mock_run_single_query.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -330,7 +332,7 @@ def
test_get_queries_details_from_snowflake_single_query_api_hook(mock_run_singl
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
"WHERE QUERY_ID = 'ABC';"
)
expected_details = {
@@ -377,7 +379,7 @@ def
test_get_queries_details_from_snowflake_multiple_queries(mock_run_single_que
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -415,7 +417,7 @@ def
test_get_queries_details_from_snowflake_multiple_queries_api_hook(mock_run_s
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
expected_details = [
@@ -453,7 +455,7 @@ def
test_get_queries_details_from_snowflake_no_data_found(mock_run_single_query)
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -471,7 +473,7 @@ def
test_get_queries_details_from_snowflake_no_data_found_api_hook(mock_run_sing
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -489,7 +491,7 @@ def
test_get_queries_details_from_snowflake_error(mock_run_single_query):
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -507,7 +509,7 @@ def
test_get_queries_details_from_snowflake_error_api_hook(mock_run_single_query
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook,
sql=expected_query)
@@ -529,7 +531,7 @@ def
test_get_queries_details_from_snowflake_error_api_hook_process_data(
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT,
ERROR_CODE, ERROR_MESSAGE "
- "FROM table(information_schema.query_history()) "
+ "FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook,
sql=expected_query)