This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch mobuchowski/cache-connection in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0ae0d6afa754a225c8f2237227d3c4a032917bd7 Author: Maciej Obuchowski <[email protected]> AuthorDate: Tue Apr 7 16:22:16 2026 +0200 OpenLineage: cache connection per conn_id in emit_lineage_from_sql_extras Signed-off-by: Maciej Obuchowski <[email protected]> --- .../providers/openlineage/extractors/manager.py | 20 +++++- .../providers/openlineage/plugins/listener.py | 17 ++++- .../src/airflow/providers/openlineage/sqlparser.py | 27 +++++--- .../openlineage/utils/sql_hook_lineage.py | 62 ++++++++++-------- .../tests/unit/openlineage/test_sqlparser.py | 67 ++++++++++++++++++++ .../openlineage/utils/test_sql_hook_lineage.py | 73 +++++++++------------- 6 files changed, 187 insertions(+), 79 deletions(-) diff --git a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py index f587550bef7..276636e129e 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py +++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py @@ -143,9 +143,25 @@ class ExtractorManager(LoggingMixin): task_info, ) self.log.debug("OpenLineage extraction failure details:", exc_info=True) - elif (hook_lineage := self.get_hook_lineage(task_instance, task_instance_state)) is not None: - return hook_lineage else: + # No extractor found — fall back to hook lineage. This call must be wrapped in + # try/except: it runs emit_lineage_from_sql_extras → _create_ol_event_pair which + # is not guarded internally. An uncaught exception here would propagate up to the + # listener's @print_warning decorator, silently suppressing the task-level event. + try: + hook_lineage = self.get_hook_lineage(task_instance, task_instance_state) + except Exception as e: + self.log.warning( + "Failed to extract hook lineage %s: %s. Task event will be emitted without lineage.", + task_info, + e, + ) + self.log.debug("OpenLineage hook lineage failure details:", exc_info=True) + hook_lineage = None + + if hook_lineage is not None: + return hook_lineage + self.log.debug("Unable to find an extractor %s", task_info) # Only include the unknownSourceAttribute facet if there is no extractor diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 6ab1944c393..0f4b0f3ca23 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -855,8 +855,21 @@ class OpenLineageListener: if not AIRFLOW_V_3_0_PLUS: configure_orm(disable_connection_pool=True) self.log.debug("Executing OpenLineage process - %s - pid %s", callable_name, os.getpid()) - callable() - self.log.debug("Process with current pid finishes after %s", callable_name) + try: + callable() + self.log.debug("Process with current pid finishes after %s", callable_name) + except Exception: + self.log.warning( + "OpenLineage %s process failed. This has no impact on actual task execution status.", + callable_name, + exc_info=True, + ) + finally: + # os._exit(0) bypasses Python's atexit/stdio flush. Explicitly shut down + # logging so buffered records (including any warnings above) are flushed + # before the process exits. Without this, the final log lines are silently + # dropped, making failures invisible. + logging.shutdown() os._exit(0) @property diff --git a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py index 3b82300207c..8ce25f342f4 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py +++ b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py @@ -18,7 +18,7 @@ from __future__ import annotations import logging from collections.abc import Callable -from typing import TYPE_CHECKING, TypedDict +from typing import TYPE_CHECKING, Any, TypedDict import sqlparse from attrs import define @@ -473,13 +473,26 @@ class SQLParser(LoggingMixin): def get_openlineage_facets_with_sql( - hook: DbApiHook, sql: str | list[str], conn_id: str, database: str | None, use_connection: bool = True + hook: DbApiHook, + sql: str | list[str], + conn_id: str, + database: str | None, + use_connection: bool = True, + *, + connection: Any | None = None, + database_info: Any | None = None, ) -> OperatorLineage | None: - connection = hook.get_connection(conn_id) - try: - database_info = hook.get_openlineage_database_info(connection) - except AttributeError: - database_info = None + # Accept pre-fetched connection and database_info to avoid redundant hook.get_connection() + # calls when processing multiple SQL extras from the same hook. Each get_connection() call + # hits SecretsManager (miss) then the Airflow API server — passing these in avoids that cost. + if connection is None: + connection = hook.get_connection(conn_id) + + if database_info is None: + try: + database_info = hook.get_openlineage_database_info(connection) + except AttributeError: + database_info = None if database_info is None: log.debug("%s has no database info provided", hook) diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py index ce8331c9a59..065fea24779 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py @@ -19,7 +19,8 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING +from functools import cache +from typing import TYPE_CHECKING, Any from urllib.parse import urlparse from openlineage.client.event_v2 import Dataset, Job, Run, RunEvent, RunState @@ -112,6 +113,34 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: list, is_successful: events: list[RunEvent] = [] query_count = 0 + # Build hook identity -> (hook, conn_id) mapping before iterating. + # Using id(hook) as cache key instead of conn_id ensures distinct hook instances + # with the same conn_id but different params are cached separately. + _hook_info: dict[int, tuple[Any, str | None]] = {} + for e in sql_extras: + hid = id(e.context) + if hid not in _hook_info: + _hook_info[hid] = (e.context, _get_hook_conn_id(e.context)) + + @cache + def _get_connection(hook_id: int): + hook, conn_id = _hook_info[hook_id] + return hook.get_connection(conn_id) + + @cache + def _get_database_info(hook_id: int): + hook, conn_id = _hook_info[hook_id] + try: + return hook.get_openlineage_database_info(_get_connection(hook_id)) + except Exception as e: + log.debug("Failed to get OpenLineage database info for %s: %s", conn_id, e) + return None + + @cache + def _get_namespace(hook_id: int) -> str | None: + db_info = _get_database_info(hook_id) + return SQLParser.create_namespace(db_info) if db_info is not None else None + for extra_info in sql_extras: value = extra_info.value @@ -124,12 +153,13 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: list, is_successful: query_count += 1 hook = extra_info.context - conn_id = _get_hook_conn_id(hook) - namespace = _resolve_namespace(hook, conn_id) + hook_id = id(hook) + conn_id = _hook_info[hook_id][1] # Parse SQL to obtain lineage (inputs, outputs, facets) query_lineage: OperatorLineage | None = None - if sql_text and conn_id: + database_info = _get_database_info(hook_id) if conn_id else None + if sql_text and conn_id and database_info is not None: try: query_lineage = get_openlineage_facets_with_sql( hook=hook, @@ -137,6 +167,8 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: list, is_successful: conn_id=conn_id, database=value.get(SqlJobHookLineageExtra.VALUE__DEFAULT_DB.value), use_connection=False, # Temporary solution before we figure out timeouts for queries + connection=_get_connection(hook_id), + database_info=database_info, ) except Exception as e: log.debug("Failed to parse SQL for query %s: %s", query_count, e) @@ -149,6 +181,7 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: list, is_successful: query_lineage = OperatorLineage(job_facets=job_facets) # Enrich run facets with external query info when available. + namespace = _get_namespace(hook_id) if conn_id else None if job_id and namespace: query_lineage.run_facets.setdefault( "externalQuery", @@ -183,27 +216,6 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: list, is_successful: return None -def _resolve_namespace(hook, conn_id: str | None) -> str | None: - """ - Resolve the OpenLineage namespace from a hook. - - Tries ``hook.get_openlineage_database_info`` to build the namespace. - Returns ``None`` when the hook does not expose this method. - """ - if conn_id: - try: - connection = hook.get_connection(conn_id) - database_info = hook.get_openlineage_database_info(connection) - except Exception as e: - log.debug("Failed to get OpenLineage database info: %s", e) - database_info = None - - if database_info is not None: - return SQLParser.create_namespace(database_info) - - return None - - def _get_hook_conn_id(hook) -> str | None: """ Try to extract the connection ID from a hook instance. diff --git a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py index 07162d10532..a10cf1446ef 100644 --- a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py +++ b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py @@ -460,3 +460,70 @@ class TestGetOpenlineageFacetsWithSql: ) hook.get_sqlalchemy_engine.assert_called_once() + + @mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql") + def test_connection_provided_skips_get_connection(self, mock_generate): + """When a pre-fetched connection is passed, hook.get_connection is not called.""" + hook = MagicMock() + db_info = DatabaseInfo(scheme="myscheme", authority="host:port") + hook.get_openlineage_database_info.return_value = db_info + hook.get_openlineage_database_dialect.return_value = "generic" + hook.get_openlineage_default_schema.return_value = "public" + mock_generate.return_value = MagicMock() + + prefetched_connection = MagicMock() + get_openlineage_facets_with_sql( + hook=hook, + sql="SELECT 1", + conn_id="conn", + database=None, + use_connection=False, + connection=prefetched_connection, + ) + + hook.get_connection.assert_not_called() + hook.get_openlineage_database_info.assert_called_once_with(prefetched_connection) + + @mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql") + def test_database_info_provided_skips_get_openlineage_database_info(self, mock_generate): + """When pre-fetched database_info is passed, hook.get_openlineage_database_info is not called.""" + hook = MagicMock() + prefetched_db_info = DatabaseInfo(scheme="myscheme", authority="host:port") + hook.get_openlineage_database_dialect.return_value = "generic" + hook.get_openlineage_default_schema.return_value = "public" + mock_generate.return_value = MagicMock() + + get_openlineage_facets_with_sql( + hook=hook, + sql="SELECT 1", + conn_id="conn", + database=None, + use_connection=False, + database_info=prefetched_db_info, + ) + + hook.get_connection.assert_called_once_with("conn") + hook.get_openlineage_database_info.assert_not_called() + + @mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql") + def test_both_connection_and_database_info_provided_skips_hook_lookups(self, mock_generate): + """When both are passed, neither hook.get_connection nor get_openlineage_database_info is called.""" + hook = MagicMock() + hook.get_openlineage_database_dialect.return_value = "generic" + hook.get_openlineage_default_schema.return_value = "public" + mock_generate.return_value = MagicMock() + + prefetched_connection = MagicMock() + prefetched_db_info = DatabaseInfo(scheme="myscheme", authority="host:port") + get_openlineage_facets_with_sql( + hook=hook, + sql="SELECT 1", + conn_id="conn", + database=None, + use_connection=False, + connection=prefetched_connection, + database_info=prefetched_db_info, + ) + + hook.get_connection.assert_not_called() + hook.get_openlineage_database_info.assert_not_called() diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py index 835766b3f93..f2e97c568ca 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py @@ -28,7 +28,6 @@ from airflow.providers.openlineage.extractors.base import OperatorLineage from airflow.providers.openlineage.sqlparser import SQLParser from airflow.providers.openlineage.utils.sql_hook_lineage import ( _get_hook_conn_id, - _resolve_namespace, emit_lineage_from_sql_extras, ) @@ -67,51 +66,12 @@ class TestGetHookConnId: assert _get_hook_conn_id(hook) is None -class TestResolveNamespace: - def test_from_ol_database_info(self): - hook = mock.MagicMock() - connection = mock.MagicMock() - hook.get_connection.return_value = connection - database_info = mock.MagicMock() - hook.get_openlineage_database_info.return_value = database_info - - with mock.patch( - "airflow.providers.openlineage.utils.sql_hook_lineage.SQLParser.create_namespace", - return_value="postgres://host:5432/mydb", - ) as mock_create_ns: - result = _resolve_namespace(hook, "my_conn") - - hook.get_connection.assert_called_once_with("my_conn") - hook.get_openlineage_database_info.assert_called_once_with(connection) - mock_create_ns.assert_called_once_with(database_info) - assert result == "postgres://host:5432/mydb" - - def test_returns_none_when_no_namespace_available(self): - hook = mock.MagicMock() - hook.__class__.__name__ = "SomeUnknownHook" - hook.get_connection.side_effect = Exception("no method") - - with mock.patch.dict("sys.modules"): - result = _resolve_namespace(hook, "my_conn") - - assert result is None - - def test_returns_none_when_no_conn_id(self): - hook = mock.MagicMock() - hook.__class__.__name__ = "SomeUnknownHook" - - with mock.patch.dict("sys.modules"): - result = _resolve_namespace(hook, None) - - assert result is None - - class TestEmitLineageFromSqlExtras: @pytest.fixture(autouse=True) def _patch_deps(self): with ( mock.patch(f"{_MODULE}._get_hook_conn_id", return_value="my_conn") as mock_conn_id, - mock.patch(f"{_MODULE}._resolve_namespace") as mock_ns, + mock.patch(f"{_MODULE}.SQLParser.create_namespace") as mock_ns, mock.patch(f"{_MODULE}.get_openlineage_facets_with_sql") as mock_facets_fn, mock.patch(f"{_MODULE}._create_ol_event_pair") as mock_build, mock.patch(f"{_MODULE}.get_openlineage_listener") as mock_listener, @@ -142,6 +102,7 @@ class TestEmitLineageFromSqlExtras: ) assert result is None self.mock_build.assert_not_called() + self.mock_facets_fn.assert_not_called() self.mock_listener.assert_not_called() def test_single_query_delegates_to_create_ol_event_pair(self): @@ -278,7 +239,6 @@ class TestEmitLineageFromSqlExtras: def test_job_id_only_extra_is_processed(self): """An extra with only job_id (no SQL text) still builds and emits an event pair.""" self.mock_conn_id.return_value = None - self.mock_ns.return_value = "ns" self.mock_facets_fn.return_value = None mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id") @@ -287,9 +247,10 @@ class TestEmitLineageFromSqlExtras: sql_extras=[_make_extra(sql="", job_id="external-123")], ) + # conn_id is None → namespace cannot be resolved → no externalQuery facet self.mock_build.assert_called_once() call = self.mock_build.call_args - assert call.kwargs["run_facets"]["externalQuery"].externalQueryId == "external-123" + assert "externalQuery" not in call.kwargs["run_facets"] assert "sql" not in call.kwargs["job_facets"] def test_parser_run_facets_preserved_over_external_query(self): @@ -312,3 +273,29 @@ class TestEmitLineageFromSqlExtras: call = self.mock_build.call_args assert call.kwargs["run_facets"]["externalQuery"] is parser_ext_query + + def test_different_hooks_same_conn_id_get_separate_db_info(self): + """Two hooks sharing a conn_id but returning different database info are cached separately.""" + mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id") + + hook_a = mock.MagicMock() + hook_b = mock.MagicMock() + + db_info_a = mock.MagicMock() + db_info_b = mock.MagicMock() + hook_a.get_openlineage_database_info.return_value = db_info_a + hook_b.get_openlineage_database_info.return_value = db_info_b + + self.mock_conn_id.return_value = "same_conn" + self.mock_ns.side_effect = lambda db_info: f"ns_{id(db_info)}" + self.mock_facets_fn.return_value = OperatorLineage() + + extras = [ + _make_extra(sql="SELECT 1", hook=hook_a), + _make_extra(sql="SELECT 2", hook=hook_b), + ] + emit_lineage_from_sql_extras(task_instance=mock_ti, sql_extras=extras) + + # Both hooks should have had get_openlineage_database_info called + hook_a.get_openlineage_database_info.assert_called_once() + hook_b.get_openlineage_database_info.assert_called_once()
