This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e871af6270e Move lineage from airflow core to task sdk (#60968)
e871af6270e is described below

commit e871af6270e97cbdf6be3fd1626d655773770f5d
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jan 27 22:19:03 2026 +0530

    Move lineage from airflow core to task sdk (#60968)
    
    Lineage collection happens exclusively during task execution and is only 
used by worker processes. Server components such as the scheduler and API 
server do not use it. Move the lineage module from airflow-core to the task SDK 
to better align with the ongoing client–server separation.
---
 .github/boring-cyborg.yml                          |  4 ++--
 airflow-core/src/airflow/lineage/__init__.py       | 11 +++++++++
 airflow-core/src/airflow/plugins_manager.py        | 12 ----------
 airflow-core/tests/unit/lineage/__init__.py        | 17 --------------
 .../tests/test_pytest_args_for_test_types.py       |  1 -
 devel-common/src/tests_common/pytest_plugin.py     |  2 +-
 .../src/tests_common/test_utils/mock_plugins.py    | 14 ++++++++++-
 .../providers/common/compat/lineage/hook.py        |  6 ++---
 .../src/airflow/providers/common/compat/sdk.py     | 27 +++++++++++++++++++++-
 .../tests/unit/common/compat/lineage/test_hook.py  |  4 ++--
 .../providers/openlineage/plugins/openlineage.py   |  2 +-
 .../unit/openlineage/extractors/test_manager.py    | 20 ++++++++++------
 task-sdk/src/airflow/sdk/io/path.py                |  6 ++---
 .../hook.py => task-sdk/src/airflow/sdk/lineage.py | 12 ++++++----
 task-sdk/src/airflow/sdk/plugins_manager.py        | 12 ++++++++++
 task-sdk/tests/task_sdk/docs/test_public_api.py    |  1 +
 .../tests/task_sdk/test_lineage.py                 | 15 +++++-------
 17 files changed, 102 insertions(+), 64 deletions(-)

diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index 9f42441f1c3..f484a482e74 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -442,8 +442,8 @@ labelPRBasedOnFilePath:
     - airflow-core/docs/howto/usage-cli.rst
 
   area:Lineage:
-    - airflow-core/src/airflow/lineage/**/*
-    - airflow-core/tests/unit/lineage/**/*
+    - task-sdk/src/airflow/sdk/lineage.py
+    - task-sdk/tests/task_sdk/test_lineage.py
     - airflow-core/docs/administration-and-deployment/lineage.rst
 
   area:Logging:
diff --git a/airflow-core/src/airflow/lineage/__init__.py 
b/airflow-core/src/airflow/lineage/__init__.py
index 217e5db9607..15afa1a75de 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/src/airflow/lineage/__init__.py
@@ -15,3 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations
+
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+add_deprecated_classes(
+    {
+        __name__: {"hook": "airflow.sdk.lineage"},
+        "hook": {"*": "airflow.sdk.lineage"},
+    },
+    package=__name__,
+)
diff --git a/airflow-core/src/airflow/plugins_manager.py 
b/airflow-core/src/airflow/plugins_manager.py
index af05f1aacd2..f26878fa4ee 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -38,7 +38,6 @@ from airflow._shared.plugins_manager import (
 from airflow.configuration import conf
 
 if TYPE_CHECKING:
-    from airflow.lineage.hook import HookLineageReader
     from airflow.listeners.listener import ListenerManager
     from airflow.partition_mapper.base import PartitionMapper
     from airflow.task.priority_strategy import PriorityWeightStrategy
@@ -283,17 +282,6 @@ def get_partition_mapper_plugins() -> dict[str, 
type[PartitionMapper]]:
     }
 
 
-@cache
-def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
-    """Collect and get hook lineage reader classes registered by plugins."""
-    log.debug("Initialize hook lineage readers plugins")
-    result: list[type[HookLineageReader]] = []
-
-    for plugin in _get_plugins()[0]:
-        result.extend(plugin.hook_lineage_readers)
-    return result
-
-
 @cache
 def integrate_macros_plugins() -> None:
     """Integrates macro plugins."""
diff --git a/airflow-core/tests/unit/lineage/__init__.py 
b/airflow-core/tests/unit/lineage/__init__.py
deleted file mode 100644
index 217e5db9607..00000000000
--- a/airflow-core/tests/unit/lineage/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py 
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 982dc4e37b1..7e70fe6162a 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -163,7 +163,6 @@ def _find_all_integration_folders() -> list[str]:
                 "airflow-core/tests/unit/decorators",
                 "airflow-core/tests/unit/hooks",
                 "airflow-core/tests/unit/io",
-                "airflow-core/tests/unit/lineage",
                 "airflow-core/tests/unit/listeners",
                 "airflow-core/tests/unit/logging",
                 "airflow-core/tests/unit/macros",
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index 258d3345706..cdb5ffe183a 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1980,7 +1980,7 @@ def _mock_plugins(request: pytest.FixtureRequest):
 
 @pytest.fixture
 def hook_lineage_collector():
-    from airflow.lineage.hook import HookLineageCollector
+    from airflow.providers.common.compat.sdk import HookLineageCollector
 
     hlc = HookLineageCollector()
     with mock.patch(
diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py 
b/devel-common/src/tests_common/test_utils/mock_plugins.py
index dfc654d15ee..d4bc391418f 100644
--- a/devel-common/src/tests_common/test_utils/mock_plugins.py
+++ b/devel-common/src/tests_common/test_utils/mock_plugins.py
@@ -85,6 +85,7 @@ def mock_plugin_manager(plugins=None, **kwargs):
         if AIRFLOW_V_3_2_PLUS:
             # Always start the block with an non-initialized plugins, so 
ensure_plugins_loaded runs.
             from airflow import plugins_manager
+            from airflow.sdk import plugins_manager as sdk_plugins_manager
 
             plugins_manager._get_plugins.cache_clear()
             plugins_manager._get_ui_plugins.cache_clear()
@@ -92,10 +93,12 @@ def mock_plugin_manager(plugins=None, **kwargs):
             plugins_manager.get_fastapi_plugins.cache_clear()
             plugins_manager._get_extra_operators_links_plugins.cache_clear()
             plugins_manager.get_timetables_plugins.cache_clear()
-            plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
             plugins_manager.integrate_macros_plugins.cache_clear()
             plugins_manager.get_priority_weight_strategy_plugins.cache_clear()
 
+            sdk_plugins_manager.integrate_macros_plugins.cache_clear()
+            sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
+
             if plugins is not None or "import_errors" in kwargs:
                 exit_stack.enter_context(
                     mock.patch(
@@ -106,6 +109,15 @@ def mock_plugin_manager(plugins=None, **kwargs):
                         ),
                     )
                 )
+                exit_stack.enter_context(
+                    mock.patch(
+                        "airflow.sdk.plugins_manager._get_plugins",
+                        return_value=(
+                            plugins or [],
+                            kwargs.get("import_errors", {}),
+                        ),
+                    )
+                )
             elif kwargs:
                 raise NotImplementedError(
                     "mock_plugin_manager does not support patching other 
attributes in Airflow 3.2+"
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 2eb07f446f3..37b352b2cc5 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
 if TYPE_CHECKING:
     from typing import Any
 
-    from airflow.lineage.hook import LineageContext
+    from airflow.sdk.lineage import LineageContext
 
 
 def _lacks_asset_methods(collector):
@@ -62,7 +62,7 @@ def _add_extra_polyfill(collector):
 
     import attr
 
-    from airflow.lineage.hook import HookLineage as _BaseHookLineage
+    from airflow.providers.common.compat.sdk import HookLineage as 
_BaseHookLineage
 
     # Add `extra` to HookLineage returned by `collected_assets` property
     @attr.define
@@ -229,7 +229,7 @@ def get_hook_lineage_collector():
     Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - 
apply single layer.
     Airflow 3.2+: Collector has asset-based methods and `add_extra` support - 
no action required.
     """
-    from airflow.lineage.hook import get_hook_lineage_collector as 
get_global_collector
+    from airflow.providers.common.compat.sdk import get_hook_lineage_collector 
as get_global_collector
 
     global_collector = get_global_collector()
 
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index ccb4e715abf..0cfa54dbd35 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -95,6 +95,13 @@ if TYPE_CHECKING:
         TaskDeferred as TaskDeferred,
         XComNotFound as XComNotFound,
     )
+    from airflow.sdk.lineage import (
+        HookLineage as HookLineage,
+        HookLineageCollector as HookLineageCollector,
+        HookLineageReader as HookLineageReader,
+        NoOpCollector as NoOpCollector,
+        get_hook_lineage_collector as get_hook_lineage_collector,
+    )
     from airflow.sdk.listener import get_listener_manager as 
get_listener_manager
     from airflow.sdk.log import redact as redact
     from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin
@@ -126,6 +133,10 @@ _RENAME_MAP: dict[str, tuple[str, str, str]] = {
     "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
 }
 
+# Airflow 3-only renames (not available in Airflow 2)
+_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {}
+
+
 # Import map for classes/functions/constants
 # Format: class_name -> module_path(s)
 # - str: single module path (no fallback)
@@ -235,6 +246,15 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
     # 
============================================================================
     "XCOM_RETURN_KEY": "airflow.models.xcom",
     # 
============================================================================
+    # Lineage
+    # 
============================================================================
+    "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+    "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+    "get_hook_lineage_collector": ("airflow.sdk.lineage", 
"airflow.lineage.hook"),
+    "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+    # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo -> 
AssetLineageInfo)
+    "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+    # 
============================================================================
     # Exceptions (deprecated in airflow.exceptions, prefer SDK)
     # 
============================================================================
     # Note: AirflowException and AirflowNotFoundException are not deprecated, 
but exposing them
@@ -279,9 +299,14 @@ _AIRFLOW_3_ONLY_EXCEPTIONS: dict[str, tuple[str, ...]] = {
     "DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"),
 }
 
-# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+
+# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow 
3+
 if AIRFLOW_V_3_0_PLUS:
     _IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS)
+    _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES)
+    # AssetLineageInfo exists in 3.0+ but location changed in 3.2
+    # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo
+    # 3.2+: airflow.sdk.lineage.AssetLineageInfo
+    _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage", 
"airflow.lineage.hook")
 
 # Module map: module_name -> module_path(s)
 # For entire modules that have been moved (e.g., timezone)
diff --git 
a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py 
b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
index 9045512531e..c4d948e569b 100644
--- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
+++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
@@ -27,7 +27,7 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_3_0_PLUS
 
 @pytest.fixture
 def collector():
-    from airflow.lineage.hook import HookLineageCollector
+    from airflow.providers.common.compat.sdk import HookLineageCollector
 
     # Patch the "inner" function that the compat version will call
     with mock.patch(
@@ -41,7 +41,7 @@ def collector():
 
 @pytest.fixture
 def noop_collector():
-    from airflow.lineage.hook import NoOpCollector
+    from airflow.providers.common.compat.sdk import NoOpCollector
 
     # Patch the "inner" function that the compat version will call
     with mock.patch(
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 374d8b2f06b..375b0bbf2f1 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -21,7 +21,7 @@ from airflow.providers.openlineage import conf
 
 # Conditional imports - only load expensive dependencies when plugin is enabled
 if not conf.is_disabled():
-    from airflow.lineage.hook import HookLineageReader
+    from airflow.providers.common.compat.sdk import HookLineageReader
     from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
     from airflow.providers.openlineage.plugins.macros import (
         lineage_job_name,
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 3989144009e..9e2b1782b81 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -50,19 +50,25 @@ if TYPE_CHECKING:
 
 @pytest.fixture
 def hook_lineage_collector():
-    from airflow.lineage import hook
-    from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
+    from airflow.providers.common.compat.sdk import HookLineageCollector
 
-    hlc = hook.HookLineageCollector()
+    from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
+
+    hlc = HookLineageCollector()
+    patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+    if AIRFLOW_V_3_2_PLUS:
+        patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
     if AIRFLOW_V_3_0_PLUS:
         from unittest import mock
 
-        with mock.patch(
-            "airflow.lineage.hook.get_hook_lineage_collector",
-            return_value=hlc,
-        ):
+        with mock.patch(patch_target, return_value=hlc):
+            from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
+
             yield get_hook_lineage_collector()
     else:
+        from airflow.lineage import hook
+        from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
+
         hook._hook_lineage_collector = hlc
 
         yield get_hook_lineage_collector()
diff --git a/task-sdk/src/airflow/sdk/io/path.py 
b/task-sdk/src/airflow/sdk/io/path.py
index 611dfe2c8df..3f87e6a1d95 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -44,7 +44,7 @@ class _TrackingFileWrapper:
         self._obj = obj
 
     def __getattr__(self, name):
-        from airflow.lineage.hook import get_hook_lineage_collector
+        from airflow.sdk.lineage import get_hook_lineage_collector
 
         if not callable(attr := getattr(self._obj, name)):
             return attr
@@ -312,7 +312,7 @@ class ObjectStoragePath(CloudPath):
 
         kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
         """
-        from airflow.lineage.hook import get_hook_lineage_collector
+        from airflow.sdk.lineage import get_hook_lineage_collector
 
         if isinstance(dst, str):
             dst = ObjectStoragePath(dst)
@@ -380,7 +380,7 @@ class ObjectStoragePath(CloudPath):
 
         kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
         """
-        from airflow.lineage.hook import get_hook_lineage_collector
+        from airflow.sdk.lineage import get_hook_lineage_collector
 
         if isinstance(path, str):
             path = ObjectStoragePath(path)
diff --git a/airflow-core/src/airflow/lineage/hook.py 
b/task-sdk/src/airflow/sdk/lineage.py
similarity index 97%
rename from airflow-core/src/airflow/lineage/hook.py
rename to task-sdk/src/airflow/sdk/lineage.py
index 7c22a367006..acdf284a596 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/task-sdk/src/airflow/sdk/lineage.py
@@ -24,10 +24,11 @@ from functools import cache
 from typing import TYPE_CHECKING, Any, TypeAlias
 
 import attr
+import structlog
 
-from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
 from airflow.sdk.definitions.asset import Asset
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
 
 if TYPE_CHECKING:
     from pydantic.types import JsonValue
@@ -38,6 +39,9 @@ if TYPE_CHECKING:
     LineageContext: TypeAlias = BaseHook | ObjectStoragePath
 
 
+log = structlog.getLogger(__name__)
+
+
 # Maximum number of assets input or output that can be collected in a single 
hook execution.
 # Input assets and output assets are collected separately.
 MAX_COLLECTED_ASSETS = 100
@@ -106,7 +110,7 @@ class HookLineageCollector(LoggingMixin):
         self._outputs: dict[str, tuple[Asset, LineageContext]] = {}
         self._input_counts: dict[str, int] = defaultdict(int)
         self._output_counts: dict[str, int] = defaultdict(int)
-        self._asset_factories = ProvidersManager().asset_factories
+        self._asset_factories = ProvidersManagerTaskRuntime().asset_factories
         self._extra_counts: dict[str, int] = defaultdict(int)
         self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
 
@@ -335,7 +339,7 @@ class HookLineageReader(LoggingMixin):
 @cache
 def get_hook_lineage_collector() -> HookLineageCollector:
     """Get singleton lineage collector."""
-    from airflow import plugins_manager
+    from airflow.sdk import plugins_manager
 
     if plugins_manager.get_hook_lineage_readers_plugins():
         return HookLineageCollector()
diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py 
b/task-sdk/src/airflow/sdk/plugins_manager.py
index 2cbdf1a44a4..4f4abbec68b 100644
--- a/task-sdk/src/airflow/sdk/plugins_manager.py
+++ b/task-sdk/src/airflow/sdk/plugins_manager.py
@@ -39,6 +39,7 @@ from airflow.sdk.providers_manager_runtime import 
ProvidersManagerTaskRuntime
 
 if TYPE_CHECKING:
     from airflow.sdk._shared.listeners.listener import ListenerManager
+    from airflow.sdk.lineage import HookLineageReader
 
 log = logging.getLogger(__name__)
 
@@ -131,3 +132,14 @@ def integrate_listener_plugins(listener_manager: 
ListenerManager) -> None:
     """Add listeners from plugins."""
     plugins, _ = _get_plugins()
     _integrate_listener_plugins(listener_manager, plugins=plugins)
+
+
+@cache
+def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
+    """Collect and get hook lineage reader classes registered by plugins."""
+    log.debug("Initialize hook lineage readers plugins")
+    result: list[type[HookLineageReader]] = []
+
+    for plugin in _get_plugins()[0]:
+        result.extend(plugin.hook_lineage_readers)
+    return result
diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py 
b/task-sdk/tests/task_sdk/docs/test_public_api.py
index f53887ec5c9..e8d7dd68e9e 100644
--- a/task-sdk/tests/task_sdk/docs/test_public_api.py
+++ b/task-sdk/tests/task_sdk/docs/test_public_api.py
@@ -63,6 +63,7 @@ def test_airflow_sdk_no_unexpected_exports():
         "listener",
         "crypto",
         "providers_manager_runtime",
+        "lineage",
     }
     unexpected = actual - public - ignore
     assert not unexpected, f"Unexpected exports in airflow.sdk: 
{sorted(unexpected)}"
diff --git a/airflow-core/tests/unit/lineage/test_hook.py 
b/task-sdk/tests/task_sdk/test_lineage.py
similarity index 98%
rename from airflow-core/tests/unit/lineage/test_hook.py
rename to task-sdk/tests/task_sdk/test_lineage.py
index f5403159247..3bf706cabb4 100644
--- a/airflow-core/tests/unit/lineage/test_hook.py
+++ b/task-sdk/tests/task_sdk/test_lineage.py
@@ -21,9 +21,8 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from airflow import plugins_manager
-from airflow.lineage import hook
-from airflow.lineage.hook import (
+from airflow.sdk import Asset, BaseHook, plugins_manager
+from airflow.sdk.lineage import (
     AssetLineageInfo,
     HookLineage,
     HookLineageCollector,
@@ -31,8 +30,6 @@ from airflow.lineage.hook import (
     NoOpCollector,
     get_hook_lineage_collector,
 )
-from airflow.sdk import BaseHook
-from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.mock_plugins import mock_plugin_manager
 
@@ -137,7 +134,7 @@ class TestHookLineageCollector:
             ],
         )
 
-    @patch("airflow.lineage.hook.Asset")
+    @patch("airflow.sdk.lineage.Asset")
     def test_add_input_asset(self, mock_asset, collector):
         asset = MagicMock(spec=Asset, extra={})
         mock_asset.return_value = asset
@@ -196,7 +193,7 @@ class TestHookLineageCollector:
             uri="myscheme://value_1/value_2", name="asset-value_1", 
group="test", extra={"key": "value"}
         )
 
-    @patch("airflow.lineage.hook.ProvidersManager")
+    @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
     def test_create_asset_no_factory(self, mock_providers_manager, collector):
         test_scheme = "myscheme"
         mock_providers_manager.return_value.asset_factories = {}
@@ -215,7 +212,7 @@ class TestHookLineageCollector:
             is None
         )
 
-    @patch("airflow.lineage.hook.ProvidersManager")
+    @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
     def test_create_asset_factory_exception(self, mock_providers_manager, 
collector):
         def create_asset(extra=None, **kwargs):
             raise RuntimeError("Factory error")
@@ -873,7 +870,7 @@ class FakePlugin(plugins_manager.AirflowPlugin):
 )
 def test_get_hook_lineage_collector(has_readers, expected_class):
     # reset cached instance
-    hook.get_hook_lineage_collector.cache_clear()
+    get_hook_lineage_collector.cache_clear()
     plugins = [FakePlugin()] if has_readers else []
     with mock_plugin_manager(plugins=plugins):
         assert isinstance(get_hook_lineage_collector(), expected_class)

Reply via email to