This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e886a4dac7 cache connections in OpenLineage SQL hook lineage (#64843)
2e886a4dac7 is described below

commit 2e886a4dac7e35c453767e4ef7b8e05fc6adb964
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Fri May 15 14:03:39 2026 +0200

    cache connections in OpenLineage SQL hook lineage (#64843)
    
    * OpenLineage: cache connection per conn_id in emit_lineage_from_sql_extras
    
    emit_lineage_from_sql_extras called hook.get_connection() twice per SQL
    extra: once in _resolve_namespace and once inside
    get_openlineage_facets_with_sql. For N extras from the same hook this
    is N*2 round-trips (SecretsManager miss + Airflow API server hit each
    time) all returning the same object.
    
    Add _conn_cache dict keyed by conn_id so the connection, database_info,
    and namespace are resolved exactly once per unique conn_id. Introduce
    _resolve_connection_info() that returns all three from a single
    get_connection() call. Add optional connection/database_info params to
    get_openlineage_facets_with_sql so callers can pass pre-fetched values
    and skip the internal lookup entirely.
    
    Also fix two related issues found while investigating:
    
    - extractors/manager.py: wrap get_hook_lineage() in try/except in the
      no-extractor path. An unhandled exception from 
emit_lineage_from_sql_extras
      -> _create_ol_event_pair propagated to @print_warning on on_success(),
      silently suppressing the task-level COMPLETE event.
    
    - plugins/listener.py: call logging.shutdown() before os._exit(0) in the
      fork child. os._exit bypasses Python's stdio flush so any buffered log
      messages at exit (including failure warnings) were silently dropped,
      making fork failures invisible in task logs.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * refactor: use @cache local closures instead of manual cache dict
    
    Replace the _conn_cache dict and _resolve_connection_info tuple helper
    with three @functools.cache-decorated local functions keyed by conn_id:
    
      _get_connection     - one hook.get_connection() call per conn_id
      _get_database_info  - derived from _get_connection, cached separately
      _get_namespace      - derived from _get_database_info, cached separately
    
    Each concern is cached independently. No tuple packing/unpacking.
    _resolve_connection_info removed; _resolve_namespace restored to a
    simple single-lookup implementation for external callers.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * remove unused _resolve_namespace
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * fix review: make hook lineage error messages state-agnostic
    
    The comment and log message incorrectly referenced "COMPLETE event"
    but extract_metadata is called for all task states (RUNNING, SUCCESS,
    FAILED, SKIPPED).
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * fix tests: update for _resolve_namespace removal
    
    - Remove TestResolveNamespace class (function no longer exists)
    - Replace _resolve_namespace mock with SQLParser.create_namespace mock
    - Fix test_job_id_only_extra_emits_events: conn_id=None means no namespace
    - Remove over-specific mock assertions on _get_hook_conn_id calls
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * fix review: cache by hook identity instead of conn_id
    
    Use id(hook) as the @cache key instead of conn_id to ensure distinct
    hook instances sharing the same conn_id but with different params
    get separate cached database info results.
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * fix review: address Copilot feedback on connection/database_info overrides
    
    - Split conditional in get_openlineage_facets_with_sql so 
hook.get_connection
      is only called when connection is None (previously re-fetched even when
      caller passed pre-fetched connection, defeating the cache).
    - Make connection and database_info keyword-only with Any | None types for
      clarity and to prevent positional misuse.
    - Reword the explanatory comment for grammatical clarity.
    - Add unit tests covering all four override combinations.
    
    Addresses review comments on PR #64843.
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    * fix mypy: type _hook_info value as Any so attribute access type-checks
    
    mypy doesn't know hook.get_connection / get_openlineage_database_info
    exist on object. Type the value side of the (hook, conn_id) tuple as
    Any (matching how DbApiHook is treated elsewhere in this module) so the
    mypy providers check passes.
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    
    ---------
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    Signed-off-by: Maciej Obuchowski <[email protected]>
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../providers/openlineage/extractors/manager.py    | 20 +++++-
 .../providers/openlineage/plugins/listener.py      | 17 ++++-
 .../src/airflow/providers/openlineage/sqlparser.py | 27 +++++---
 .../openlineage/utils/sql_hook_lineage.py          | 62 ++++++++++--------
 .../tests/unit/openlineage/test_sqlparser.py       | 67 ++++++++++++++++++++
 .../openlineage/utils/test_sql_hook_lineage.py     | 73 +++++++++-------------
 6 files changed, 187 insertions(+), 79 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py 
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
index f587550bef7..276636e129e 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
@@ -143,9 +143,25 @@ class ExtractorManager(LoggingMixin):
                     task_info,
                 )
                 self.log.debug("OpenLineage extraction failure details:", 
exc_info=True)
-        elif (hook_lineage := self.get_hook_lineage(task_instance, 
task_instance_state)) is not None:
-            return hook_lineage
         else:
+            # No extractor found — fall back to hook lineage. This call must 
be wrapped in
+            # try/except: it runs emit_lineage_from_sql_extras → 
_create_ol_event_pair which
+            # is not guarded internally. An uncaught exception here would 
propagate up to the
+            # listener's @print_warning decorator, silently suppressing the 
task-level event.
+            try:
+                hook_lineage = self.get_hook_lineage(task_instance, 
task_instance_state)
+            except Exception as e:
+                self.log.warning(
+                    "Failed to extract hook lineage %s: %s. Task event will be 
emitted without lineage.",
+                    task_info,
+                    e,
+                )
+                self.log.debug("OpenLineage hook lineage failure details:", 
exc_info=True)
+                hook_lineage = None
+
+            if hook_lineage is not None:
+                return hook_lineage
+
             self.log.debug("Unable to find an extractor %s", task_info)
 
             # Only include the unknownSourceAttribute facet if there is no 
extractor
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 6ab1944c393..0f4b0f3ca23 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -855,8 +855,21 @@ class OpenLineageListener:
             if not AIRFLOW_V_3_0_PLUS:
                 configure_orm(disable_connection_pool=True)
             self.log.debug("Executing OpenLineage process - %s - pid %s", 
callable_name, os.getpid())
-            callable()
-            self.log.debug("Process with current pid finishes after %s", 
callable_name)
+            try:
+                callable()
+                self.log.debug("Process with current pid finishes after %s", 
callable_name)
+            except Exception:
+                self.log.warning(
+                    "OpenLineage %s process failed. This has no impact on 
actual task execution status.",
+                    callable_name,
+                    exc_info=True,
+                )
+            finally:
+                # os._exit(0) bypasses Python's atexit/stdio flush. Explicitly 
shut down
+                # logging so buffered records (including any warnings above) 
are flushed
+                # before the process exits. Without this, the final log lines 
are silently
+                # dropped, making failures invisible.
+                logging.shutdown()
             os._exit(0)
 
     @property
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py 
b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
index 3b82300207c..8ce25f342f4 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import logging
 from collections.abc import Callable
-from typing import TYPE_CHECKING, TypedDict
+from typing import TYPE_CHECKING, Any, TypedDict
 
 import sqlparse
 from attrs import define
@@ -473,13 +473,26 @@ class SQLParser(LoggingMixin):
 
 
 def get_openlineage_facets_with_sql(
-    hook: DbApiHook, sql: str | list[str], conn_id: str, database: str | None, 
use_connection: bool = True
+    hook: DbApiHook,
+    sql: str | list[str],
+    conn_id: str,
+    database: str | None,
+    use_connection: bool = True,
+    *,
+    connection: Any | None = None,
+    database_info: Any | None = None,
 ) -> OperatorLineage | None:
-    connection = hook.get_connection(conn_id)
-    try:
-        database_info = hook.get_openlineage_database_info(connection)
-    except AttributeError:
-        database_info = None
+    # Accept pre-fetched connection and database_info to avoid redundant 
hook.get_connection()
+    # calls when processing multiple SQL extras from the same hook. Each 
get_connection() call
+    # hits SecretsManager (miss) then the Airflow API server — passing these 
in avoids that cost.
+    if connection is None:
+        connection = hook.get_connection(conn_id)
+
+    if database_info is None:
+        try:
+            database_info = hook.get_openlineage_database_info(connection)
+        except AttributeError:
+            database_info = None
 
     if database_info is None:
         log.debug("%s has no database info provided", hook)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
 
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
index ce8331c9a59..065fea24779 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
@@ -19,7 +19,8 @@
 from __future__ import annotations
 
 import logging
-from typing import TYPE_CHECKING
+from functools import cache
+from typing import TYPE_CHECKING, Any
 from urllib.parse import urlparse
 
 from openlineage.client.event_v2 import Dataset, Job, Run, RunEvent, RunState
@@ -112,6 +113,34 @@ def emit_lineage_from_sql_extras(task_instance, 
sql_extras: list, is_successful:
     events: list[RunEvent] = []
     query_count = 0
 
+    # Build hook identity -> (hook, conn_id) mapping before iterating.
+    # Using id(hook) as cache key instead of conn_id ensures distinct hook 
instances
+    # with the same conn_id but different params are cached separately.
+    _hook_info: dict[int, tuple[Any, str | None]] = {}
+    for e in sql_extras:
+        hid = id(e.context)
+        if hid not in _hook_info:
+            _hook_info[hid] = (e.context, _get_hook_conn_id(e.context))
+
+    @cache
+    def _get_connection(hook_id: int):
+        hook, conn_id = _hook_info[hook_id]
+        return hook.get_connection(conn_id)
+
+    @cache
+    def _get_database_info(hook_id: int):
+        hook, conn_id = _hook_info[hook_id]
+        try:
+            return hook.get_openlineage_database_info(_get_connection(hook_id))
+        except Exception as e:
+            log.debug("Failed to get OpenLineage database info for %s: %s", 
conn_id, e)
+            return None
+
+    @cache
+    def _get_namespace(hook_id: int) -> str | None:
+        db_info = _get_database_info(hook_id)
+        return SQLParser.create_namespace(db_info) if db_info is not None else 
None
+
     for extra_info in sql_extras:
         value = extra_info.value
 
@@ -124,12 +153,13 @@ def emit_lineage_from_sql_extras(task_instance, 
sql_extras: list, is_successful:
         query_count += 1
 
         hook = extra_info.context
-        conn_id = _get_hook_conn_id(hook)
-        namespace = _resolve_namespace(hook, conn_id)
+        hook_id = id(hook)
+        conn_id = _hook_info[hook_id][1]
 
         # Parse SQL to obtain lineage (inputs, outputs, facets)
         query_lineage: OperatorLineage | None = None
-        if sql_text and conn_id:
+        database_info = _get_database_info(hook_id) if conn_id else None
+        if sql_text and conn_id and database_info is not None:
             try:
                 query_lineage = get_openlineage_facets_with_sql(
                     hook=hook,
@@ -137,6 +167,8 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: 
list, is_successful:
                     conn_id=conn_id,
                     
database=value.get(SqlJobHookLineageExtra.VALUE__DEFAULT_DB.value),
                     use_connection=False,  # Temporary solution before we 
figure out timeouts for queries
+                    connection=_get_connection(hook_id),
+                    database_info=database_info,
                 )
             except Exception as e:
                 log.debug("Failed to parse SQL for query %s: %s", query_count, 
e)
@@ -149,6 +181,7 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras: 
list, is_successful:
             query_lineage = OperatorLineage(job_facets=job_facets)
 
         # Enrich run facets with external query info when available.
+        namespace = _get_namespace(hook_id) if conn_id else None
         if job_id and namespace:
             query_lineage.run_facets.setdefault(
                 "externalQuery",
@@ -183,27 +216,6 @@ def emit_lineage_from_sql_extras(task_instance, 
sql_extras: list, is_successful:
     return None
 
 
-def _resolve_namespace(hook, conn_id: str | None) -> str | None:
-    """
-    Resolve the OpenLineage namespace from a hook.
-
-    Tries ``hook.get_openlineage_database_info`` to build the namespace.
-    Returns ``None`` when the hook does not expose this method.
-    """
-    if conn_id:
-        try:
-            connection = hook.get_connection(conn_id)
-            database_info = hook.get_openlineage_database_info(connection)
-        except Exception as e:
-            log.debug("Failed to get OpenLineage database info: %s", e)
-            database_info = None
-
-        if database_info is not None:
-            return SQLParser.create_namespace(database_info)
-
-    return None
-
-
 def _get_hook_conn_id(hook) -> str | None:
     """
     Try to extract the connection ID from a hook instance.
diff --git a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py 
b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
index 07162d10532..a10cf1446ef 100644
--- a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
+++ b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
@@ -460,3 +460,70 @@ class TestGetOpenlineageFacetsWithSql:
         )
 
         hook.get_sqlalchemy_engine.assert_called_once()
+
+    
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+    def test_connection_provided_skips_get_connection(self, mock_generate):
+        """When a pre-fetched connection is passed, hook.get_connection is not 
called."""
+        hook = MagicMock()
+        db_info = DatabaseInfo(scheme="myscheme", authority="host:port")
+        hook.get_openlineage_database_info.return_value = db_info
+        hook.get_openlineage_database_dialect.return_value = "generic"
+        hook.get_openlineage_default_schema.return_value = "public"
+        mock_generate.return_value = MagicMock()
+
+        prefetched_connection = MagicMock()
+        get_openlineage_facets_with_sql(
+            hook=hook,
+            sql="SELECT 1",
+            conn_id="conn",
+            database=None,
+            use_connection=False,
+            connection=prefetched_connection,
+        )
+
+        hook.get_connection.assert_not_called()
+        
hook.get_openlineage_database_info.assert_called_once_with(prefetched_connection)
+
+    
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+    def test_database_info_provided_skips_get_openlineage_database_info(self, 
mock_generate):
+        """When pre-fetched database_info is passed, 
hook.get_openlineage_database_info is not called."""
+        hook = MagicMock()
+        prefetched_db_info = DatabaseInfo(scheme="myscheme", 
authority="host:port")
+        hook.get_openlineage_database_dialect.return_value = "generic"
+        hook.get_openlineage_default_schema.return_value = "public"
+        mock_generate.return_value = MagicMock()
+
+        get_openlineage_facets_with_sql(
+            hook=hook,
+            sql="SELECT 1",
+            conn_id="conn",
+            database=None,
+            use_connection=False,
+            database_info=prefetched_db_info,
+        )
+
+        hook.get_connection.assert_called_once_with("conn")
+        hook.get_openlineage_database_info.assert_not_called()
+
+    
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+    def 
test_both_connection_and_database_info_provided_skips_hook_lookups(self, 
mock_generate):
+        """When both are passed, neither hook.get_connection nor 
get_openlineage_database_info is called."""
+        hook = MagicMock()
+        hook.get_openlineage_database_dialect.return_value = "generic"
+        hook.get_openlineage_default_schema.return_value = "public"
+        mock_generate.return_value = MagicMock()
+
+        prefetched_connection = MagicMock()
+        prefetched_db_info = DatabaseInfo(scheme="myscheme", 
authority="host:port")
+        get_openlineage_facets_with_sql(
+            hook=hook,
+            sql="SELECT 1",
+            conn_id="conn",
+            database=None,
+            use_connection=False,
+            connection=prefetched_connection,
+            database_info=prefetched_db_info,
+        )
+
+        hook.get_connection.assert_not_called()
+        hook.get_openlineage_database_info.assert_not_called()
diff --git 
a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
index 835766b3f93..f2e97c568ca 100644
--- 
a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
+++ 
b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
@@ -28,7 +28,6 @@ from airflow.providers.openlineage.extractors.base import 
OperatorLineage
 from airflow.providers.openlineage.sqlparser import SQLParser
 from airflow.providers.openlineage.utils.sql_hook_lineage import (
     _get_hook_conn_id,
-    _resolve_namespace,
     emit_lineage_from_sql_extras,
 )
 
@@ -67,51 +66,12 @@ class TestGetHookConnId:
         assert _get_hook_conn_id(hook) is None
 
 
-class TestResolveNamespace:
-    def test_from_ol_database_info(self):
-        hook = mock.MagicMock()
-        connection = mock.MagicMock()
-        hook.get_connection.return_value = connection
-        database_info = mock.MagicMock()
-        hook.get_openlineage_database_info.return_value = database_info
-
-        with mock.patch(
-            
"airflow.providers.openlineage.utils.sql_hook_lineage.SQLParser.create_namespace",
-            return_value="postgres://host:5432/mydb",
-        ) as mock_create_ns:
-            result = _resolve_namespace(hook, "my_conn")
-
-        hook.get_connection.assert_called_once_with("my_conn")
-        hook.get_openlineage_database_info.assert_called_once_with(connection)
-        mock_create_ns.assert_called_once_with(database_info)
-        assert result == "postgres://host:5432/mydb"
-
-    def test_returns_none_when_no_namespace_available(self):
-        hook = mock.MagicMock()
-        hook.__class__.__name__ = "SomeUnknownHook"
-        hook.get_connection.side_effect = Exception("no method")
-
-        with mock.patch.dict("sys.modules"):
-            result = _resolve_namespace(hook, "my_conn")
-
-        assert result is None
-
-    def test_returns_none_when_no_conn_id(self):
-        hook = mock.MagicMock()
-        hook.__class__.__name__ = "SomeUnknownHook"
-
-        with mock.patch.dict("sys.modules"):
-            result = _resolve_namespace(hook, None)
-
-        assert result is None
-
-
 class TestEmitLineageFromSqlExtras:
     @pytest.fixture(autouse=True)
     def _patch_deps(self):
         with (
             mock.patch(f"{_MODULE}._get_hook_conn_id", return_value="my_conn") 
as mock_conn_id,
-            mock.patch(f"{_MODULE}._resolve_namespace") as mock_ns,
+            mock.patch(f"{_MODULE}.SQLParser.create_namespace") as mock_ns,
             mock.patch(f"{_MODULE}.get_openlineage_facets_with_sql") as 
mock_facets_fn,
             mock.patch(f"{_MODULE}._create_ol_event_pair") as mock_build,
             mock.patch(f"{_MODULE}.get_openlineage_listener") as mock_listener,
@@ -142,6 +102,7 @@ class TestEmitLineageFromSqlExtras:
         )
         assert result is None
         self.mock_build.assert_not_called()
+        self.mock_facets_fn.assert_not_called()
         self.mock_listener.assert_not_called()
 
     def test_single_query_delegates_to_create_ol_event_pair(self):
@@ -278,7 +239,6 @@ class TestEmitLineageFromSqlExtras:
     def test_job_id_only_extra_is_processed(self):
         """An extra with only job_id (no SQL text) still builds and emits an 
event pair."""
         self.mock_conn_id.return_value = None
-        self.mock_ns.return_value = "ns"
         self.mock_facets_fn.return_value = None
         mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id")
 
@@ -287,9 +247,10 @@ class TestEmitLineageFromSqlExtras:
             sql_extras=[_make_extra(sql="", job_id="external-123")],
         )
 
+        # conn_id is None → namespace cannot be resolved → no externalQuery 
facet
         self.mock_build.assert_called_once()
         call = self.mock_build.call_args
-        assert call.kwargs["run_facets"]["externalQuery"].externalQueryId == 
"external-123"
+        assert "externalQuery" not in call.kwargs["run_facets"]
         assert "sql" not in call.kwargs["job_facets"]
 
     def test_parser_run_facets_preserved_over_external_query(self):
@@ -312,3 +273,29 @@ class TestEmitLineageFromSqlExtras:
 
         call = self.mock_build.call_args
         assert call.kwargs["run_facets"]["externalQuery"] is parser_ext_query
+
+    def test_different_hooks_same_conn_id_get_separate_db_info(self):
+        """Two hooks sharing a conn_id but returning different database info 
are cached separately."""
+        mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id")
+
+        hook_a = mock.MagicMock()
+        hook_b = mock.MagicMock()
+
+        db_info_a = mock.MagicMock()
+        db_info_b = mock.MagicMock()
+        hook_a.get_openlineage_database_info.return_value = db_info_a
+        hook_b.get_openlineage_database_info.return_value = db_info_b
+
+        self.mock_conn_id.return_value = "same_conn"
+        self.mock_ns.side_effect = lambda db_info: f"ns_{id(db_info)}"
+        self.mock_facets_fn.return_value = OperatorLineage()
+
+        extras = [
+            _make_extra(sql="SELECT 1", hook=hook_a),
+            _make_extra(sql="SELECT 2", hook=hook_b),
+        ]
+        emit_lineage_from_sql_extras(task_instance=mock_ti, sql_extras=extras)
+
+        # Both hooks should have had get_openlineage_database_info called
+        hook_a.get_openlineage_database_info.assert_called_once()
+        hook_b.get_openlineage_database_info.assert_called_once()

Reply via email to