This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch revert-60968-lineage-to-sdk
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 33ff3e6dec41d1e80aaf105dcc4a3e319f38ef0d
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Jan 27 22:19:11 2026 +0100

    Revert "Move lineage from airflow core to task sdk (#60968)"
    
    This reverts commit e871af6270e97cbdf6be3fd1626d655773770f5d.
---
 .github/boring-cyborg.yml                          |  4 ++--
 airflow-core/src/airflow/lineage/__init__.py       | 11 ---------
 .../src/airflow/lineage/hook.py                    | 12 ++++------
 airflow-core/src/airflow/plugins_manager.py        | 12 ++++++++++
 .../airflow => tests/unit}/lineage/__init__.py     | 11 ---------
 .../tests/unit/lineage/test_hook.py                | 15 +++++++-----
 .../tests/test_pytest_args_for_test_types.py       |  1 +
 devel-common/src/tests_common/pytest_plugin.py     |  2 +-
 .../src/tests_common/test_utils/mock_plugins.py    | 14 +----------
 .../providers/common/compat/lineage/hook.py        |  6 ++---
 .../src/airflow/providers/common/compat/sdk.py     | 27 +---------------------
 .../tests/unit/common/compat/lineage/test_hook.py  |  4 ++--
 .../providers/openlineage/plugins/openlineage.py   |  2 +-
 .../unit/openlineage/extractors/test_manager.py    | 20 ++++++----------
 task-sdk/src/airflow/sdk/io/path.py                |  6 ++---
 task-sdk/src/airflow/sdk/plugins_manager.py        | 12 ----------
 task-sdk/tests/task_sdk/docs/test_public_api.py    |  1 -
 17 files changed, 47 insertions(+), 113 deletions(-)

diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index f484a482e74..9f42441f1c3 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -442,8 +442,8 @@ labelPRBasedOnFilePath:
     - airflow-core/docs/howto/usage-cli.rst
 
   area:Lineage:
-    - task-sdk/src/airflow/sdk/lineage.py
-    - task-sdk/tests/task_sdk/test_lineage.py
+    - airflow-core/src/airflow/lineage/**/*
+    - airflow-core/tests/unit/lineage/**/*
     - airflow-core/docs/administration-and-deployment/lineage.rst
 
   area:Logging:
diff --git a/airflow-core/src/airflow/lineage/__init__.py 
b/airflow-core/src/airflow/lineage/__init__.py
index 15afa1a75de..217e5db9607 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/src/airflow/lineage/__init__.py
@@ -15,14 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-from airflow.utils.deprecation_tools import add_deprecated_classes
-
-add_deprecated_classes(
-    {
-        __name__: {"hook": "airflow.sdk.lineage"},
-        "hook": {"*": "airflow.sdk.lineage"},
-    },
-    package=__name__,
-)
diff --git a/task-sdk/src/airflow/sdk/lineage.py 
b/airflow-core/src/airflow/lineage/hook.py
similarity index 97%
rename from task-sdk/src/airflow/sdk/lineage.py
rename to airflow-core/src/airflow/lineage/hook.py
index acdf284a596..7c22a367006 100644
--- a/task-sdk/src/airflow/sdk/lineage.py
+++ b/airflow-core/src/airflow/lineage/hook.py
@@ -24,11 +24,10 @@ from functools import cache
 from typing import TYPE_CHECKING, Any, TypeAlias
 
 import attr
-import structlog
 
-from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
+from airflow.providers_manager import ProvidersManager
 from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
     from pydantic.types import JsonValue
@@ -39,9 +38,6 @@ if TYPE_CHECKING:
     LineageContext: TypeAlias = BaseHook | ObjectStoragePath
 
 
-log = structlog.getLogger(__name__)
-
-
 # Maximum number of assets input or output that can be collected in a single 
hook execution.
 # Input assets and output assets are collected separately.
 MAX_COLLECTED_ASSETS = 100
@@ -110,7 +106,7 @@ class HookLineageCollector(LoggingMixin):
         self._outputs: dict[str, tuple[Asset, LineageContext]] = {}
         self._input_counts: dict[str, int] = defaultdict(int)
         self._output_counts: dict[str, int] = defaultdict(int)
-        self._asset_factories = ProvidersManagerTaskRuntime().asset_factories
+        self._asset_factories = ProvidersManager().asset_factories
         self._extra_counts: dict[str, int] = defaultdict(int)
         self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
 
@@ -339,7 +335,7 @@ class HookLineageReader(LoggingMixin):
 @cache
 def get_hook_lineage_collector() -> HookLineageCollector:
     """Get singleton lineage collector."""
-    from airflow.sdk import plugins_manager
+    from airflow import plugins_manager
 
     if plugins_manager.get_hook_lineage_readers_plugins():
         return HookLineageCollector()
diff --git a/airflow-core/src/airflow/plugins_manager.py 
b/airflow-core/src/airflow/plugins_manager.py
index f26878fa4ee..af05f1aacd2 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -38,6 +38,7 @@ from airflow._shared.plugins_manager import (
 from airflow.configuration import conf
 
 if TYPE_CHECKING:
+    from airflow.lineage.hook import HookLineageReader
     from airflow.listeners.listener import ListenerManager
     from airflow.partition_mapper.base import PartitionMapper
     from airflow.task.priority_strategy import PriorityWeightStrategy
@@ -282,6 +283,17 @@ def get_partition_mapper_plugins() -> dict[str, 
type[PartitionMapper]]:
     }
 
 
+@cache
+def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
+    """Collect and get hook lineage reader classes registered by plugins."""
+    log.debug("Initialize hook lineage readers plugins")
+    result: list[type[HookLineageReader]] = []
+
+    for plugin in _get_plugins()[0]:
+        result.extend(plugin.hook_lineage_readers)
+    return result
+
+
 @cache
 def integrate_macros_plugins() -> None:
     """Integrates macro plugins."""
diff --git a/airflow-core/src/airflow/lineage/__init__.py 
b/airflow-core/tests/unit/lineage/__init__.py
similarity index 75%
copy from airflow-core/src/airflow/lineage/__init__.py
copy to airflow-core/tests/unit/lineage/__init__.py
index 15afa1a75de..217e5db9607 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/tests/unit/lineage/__init__.py
@@ -15,14 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-from airflow.utils.deprecation_tools import add_deprecated_classes
-
-add_deprecated_classes(
-    {
-        __name__: {"hook": "airflow.sdk.lineage"},
-        "hook": {"*": "airflow.sdk.lineage"},
-    },
-    package=__name__,
-)
diff --git a/task-sdk/tests/task_sdk/test_lineage.py 
b/airflow-core/tests/unit/lineage/test_hook.py
similarity index 98%
rename from task-sdk/tests/task_sdk/test_lineage.py
rename to airflow-core/tests/unit/lineage/test_hook.py
index 3bf706cabb4..f5403159247 100644
--- a/task-sdk/tests/task_sdk/test_lineage.py
+++ b/airflow-core/tests/unit/lineage/test_hook.py
@@ -21,8 +21,9 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from airflow.sdk import Asset, BaseHook, plugins_manager
-from airflow.sdk.lineage import (
+from airflow import plugins_manager
+from airflow.lineage import hook
+from airflow.lineage.hook import (
     AssetLineageInfo,
     HookLineage,
     HookLineageCollector,
@@ -30,6 +31,8 @@ from airflow.sdk.lineage import (
     NoOpCollector,
     get_hook_lineage_collector,
 )
+from airflow.sdk import BaseHook
+from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.mock_plugins import mock_plugin_manager
 
@@ -134,7 +137,7 @@ class TestHookLineageCollector:
             ],
         )
 
-    @patch("airflow.sdk.lineage.Asset")
+    @patch("airflow.lineage.hook.Asset")
     def test_add_input_asset(self, mock_asset, collector):
         asset = MagicMock(spec=Asset, extra={})
         mock_asset.return_value = asset
@@ -193,7 +196,7 @@ class TestHookLineageCollector:
             uri="myscheme://value_1/value_2", name="asset-value_1", 
group="test", extra={"key": "value"}
         )
 
-    @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
+    @patch("airflow.lineage.hook.ProvidersManager")
     def test_create_asset_no_factory(self, mock_providers_manager, collector):
         test_scheme = "myscheme"
         mock_providers_manager.return_value.asset_factories = {}
@@ -212,7 +215,7 @@ class TestHookLineageCollector:
             is None
         )
 
-    @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
+    @patch("airflow.lineage.hook.ProvidersManager")
     def test_create_asset_factory_exception(self, mock_providers_manager, 
collector):
         def create_asset(extra=None, **kwargs):
             raise RuntimeError("Factory error")
@@ -870,7 +873,7 @@ class FakePlugin(plugins_manager.AirflowPlugin):
 )
 def test_get_hook_lineage_collector(has_readers, expected_class):
     # reset cached instance
-    get_hook_lineage_collector.cache_clear()
+    hook.get_hook_lineage_collector.cache_clear()
     plugins = [FakePlugin()] if has_readers else []
     with mock_plugin_manager(plugins=plugins):
         assert isinstance(get_hook_lineage_collector(), expected_class)
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py 
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 7e70fe6162a..982dc4e37b1 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -163,6 +163,7 @@ def _find_all_integration_folders() -> list[str]:
                 "airflow-core/tests/unit/decorators",
                 "airflow-core/tests/unit/hooks",
                 "airflow-core/tests/unit/io",
+                "airflow-core/tests/unit/lineage",
                 "airflow-core/tests/unit/listeners",
                 "airflow-core/tests/unit/logging",
                 "airflow-core/tests/unit/macros",
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index cdb5ffe183a..258d3345706 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1980,7 +1980,7 @@ def _mock_plugins(request: pytest.FixtureRequest):
 
 @pytest.fixture
 def hook_lineage_collector():
-    from airflow.providers.common.compat.sdk import HookLineageCollector
+    from airflow.lineage.hook import HookLineageCollector
 
     hlc = HookLineageCollector()
     with mock.patch(
diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py 
b/devel-common/src/tests_common/test_utils/mock_plugins.py
index d4bc391418f..dfc654d15ee 100644
--- a/devel-common/src/tests_common/test_utils/mock_plugins.py
+++ b/devel-common/src/tests_common/test_utils/mock_plugins.py
@@ -85,7 +85,6 @@ def mock_plugin_manager(plugins=None, **kwargs):
         if AIRFLOW_V_3_2_PLUS:
             # Always start the block with an non-initialized plugins, so 
ensure_plugins_loaded runs.
             from airflow import plugins_manager
-            from airflow.sdk import plugins_manager as sdk_plugins_manager
 
             plugins_manager._get_plugins.cache_clear()
             plugins_manager._get_ui_plugins.cache_clear()
@@ -93,12 +92,10 @@ def mock_plugin_manager(plugins=None, **kwargs):
             plugins_manager.get_fastapi_plugins.cache_clear()
             plugins_manager._get_extra_operators_links_plugins.cache_clear()
             plugins_manager.get_timetables_plugins.cache_clear()
+            plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
             plugins_manager.integrate_macros_plugins.cache_clear()
             plugins_manager.get_priority_weight_strategy_plugins.cache_clear()
 
-            sdk_plugins_manager.integrate_macros_plugins.cache_clear()
-            sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
-
             if plugins is not None or "import_errors" in kwargs:
                 exit_stack.enter_context(
                     mock.patch(
@@ -109,15 +106,6 @@ def mock_plugin_manager(plugins=None, **kwargs):
                         ),
                     )
                 )
-                exit_stack.enter_context(
-                    mock.patch(
-                        "airflow.sdk.plugins_manager._get_plugins",
-                        return_value=(
-                            plugins or [],
-                            kwargs.get("import_errors", {}),
-                        ),
-                    )
-                )
             elif kwargs:
                 raise NotImplementedError(
                     "mock_plugin_manager does not support patching other 
attributes in Airflow 3.2+"
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 37b352b2cc5..2eb07f446f3 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
 if TYPE_CHECKING:
     from typing import Any
 
-    from airflow.sdk.lineage import LineageContext
+    from airflow.lineage.hook import LineageContext
 
 
 def _lacks_asset_methods(collector):
@@ -62,7 +62,7 @@ def _add_extra_polyfill(collector):
 
     import attr
 
-    from airflow.providers.common.compat.sdk import HookLineage as 
_BaseHookLineage
+    from airflow.lineage.hook import HookLineage as _BaseHookLineage
 
     # Add `extra` to HookLineage returned by `collected_assets` property
     @attr.define
@@ -229,7 +229,7 @@ def get_hook_lineage_collector():
     Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - 
apply single layer.
     Airflow 3.2+: Collector has asset-based methods and `add_extra` support - 
no action required.
     """
-    from airflow.providers.common.compat.sdk import get_hook_lineage_collector 
as get_global_collector
+    from airflow.lineage.hook import get_hook_lineage_collector as 
get_global_collector
 
     global_collector = get_global_collector()
 
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index 0cfa54dbd35..ccb4e715abf 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -95,13 +95,6 @@ if TYPE_CHECKING:
         TaskDeferred as TaskDeferred,
         XComNotFound as XComNotFound,
     )
-    from airflow.sdk.lineage import (
-        HookLineage as HookLineage,
-        HookLineageCollector as HookLineageCollector,
-        HookLineageReader as HookLineageReader,
-        NoOpCollector as NoOpCollector,
-        get_hook_lineage_collector as get_hook_lineage_collector,
-    )
     from airflow.sdk.listener import get_listener_manager as 
get_listener_manager
     from airflow.sdk.log import redact as redact
     from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin
@@ -133,10 +126,6 @@ _RENAME_MAP: dict[str, tuple[str, str, str]] = {
     "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
 }
 
-# Airflow 3-only renames (not available in Airflow 2)
-_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {}
-
-
 # Import map for classes/functions/constants
 # Format: class_name -> module_path(s)
 # - str: single module path (no fallback)
@@ -246,15 +235,6 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
     # 
============================================================================
     "XCOM_RETURN_KEY": "airflow.models.xcom",
     # 
============================================================================
-    # Lineage
-    # 
============================================================================
-    "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
-    "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"),
-    "get_hook_lineage_collector": ("airflow.sdk.lineage", 
"airflow.lineage.hook"),
-    "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"),
-    # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo -> 
AssetLineageInfo)
-    "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
-    # 
============================================================================
     # Exceptions (deprecated in airflow.exceptions, prefer SDK)
     # 
============================================================================
     # Note: AirflowException and AirflowNotFoundException are not deprecated, 
but exposing them
@@ -299,14 +279,9 @@ _AIRFLOW_3_ONLY_EXCEPTIONS: dict[str, tuple[str, ...]] = {
     "DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"),
 }
 
-# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow 
3+
+# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+
 if AIRFLOW_V_3_0_PLUS:
     _IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS)
-    _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES)
-    # AssetLineageInfo exists in 3.0+ but location changed in 3.2
-    # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo
-    # 3.2+: airflow.sdk.lineage.AssetLineageInfo
-    _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage", 
"airflow.lineage.hook")
 
 # Module map: module_name -> module_path(s)
 # For entire modules that have been moved (e.g., timezone)
diff --git 
a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py 
b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
index c4d948e569b..9045512531e 100644
--- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
+++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
@@ -27,7 +27,7 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_3_0_PLUS
 
 @pytest.fixture
 def collector():
-    from airflow.providers.common.compat.sdk import HookLineageCollector
+    from airflow.lineage.hook import HookLineageCollector
 
     # Patch the "inner" function that the compat version will call
     with mock.patch(
@@ -41,7 +41,7 @@ def collector():
 
 @pytest.fixture
 def noop_collector():
-    from airflow.providers.common.compat.sdk import NoOpCollector
+    from airflow.lineage.hook import NoOpCollector
 
     # Patch the "inner" function that the compat version will call
     with mock.patch(
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 375b0bbf2f1..374d8b2f06b 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -21,7 +21,7 @@ from airflow.providers.openlineage import conf
 
 # Conditional imports - only load expensive dependencies when plugin is enabled
 if not conf.is_disabled():
-    from airflow.providers.common.compat.sdk import HookLineageReader
+    from airflow.lineage.hook import HookLineageReader
     from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
     from airflow.providers.openlineage.plugins.macros import (
         lineage_job_name,
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 9e2b1782b81..3989144009e 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -50,25 +50,19 @@ if TYPE_CHECKING:
 
 @pytest.fixture
 def hook_lineage_collector():
-    from airflow.providers.common.compat.sdk import HookLineageCollector
+    from airflow.lineage import hook
+    from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
 
-    from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
-
-    hlc = HookLineageCollector()
-    patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
-    if AIRFLOW_V_3_2_PLUS:
-        patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
+    hlc = hook.HookLineageCollector()
     if AIRFLOW_V_3_0_PLUS:
         from unittest import mock
 
-        with mock.patch(patch_target, return_value=hlc):
-            from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
-
+        with mock.patch(
+            "airflow.lineage.hook.get_hook_lineage_collector",
+            return_value=hlc,
+        ):
             yield get_hook_lineage_collector()
     else:
-        from airflow.lineage import hook
-        from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
-
         hook._hook_lineage_collector = hlc
 
         yield get_hook_lineage_collector()
diff --git a/task-sdk/src/airflow/sdk/io/path.py 
b/task-sdk/src/airflow/sdk/io/path.py
index 3f87e6a1d95..611dfe2c8df 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -44,7 +44,7 @@ class _TrackingFileWrapper:
         self._obj = obj
 
     def __getattr__(self, name):
-        from airflow.sdk.lineage import get_hook_lineage_collector
+        from airflow.lineage.hook import get_hook_lineage_collector
 
         if not callable(attr := getattr(self._obj, name)):
             return attr
@@ -312,7 +312,7 @@ class ObjectStoragePath(CloudPath):
 
         kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
         """
-        from airflow.sdk.lineage import get_hook_lineage_collector
+        from airflow.lineage.hook import get_hook_lineage_collector
 
         if isinstance(dst, str):
             dst = ObjectStoragePath(dst)
@@ -380,7 +380,7 @@ class ObjectStoragePath(CloudPath):
 
         kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
         """
-        from airflow.sdk.lineage import get_hook_lineage_collector
+        from airflow.lineage.hook import get_hook_lineage_collector
 
         if isinstance(path, str):
             path = ObjectStoragePath(path)
diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py 
b/task-sdk/src/airflow/sdk/plugins_manager.py
index 4f4abbec68b..2cbdf1a44a4 100644
--- a/task-sdk/src/airflow/sdk/plugins_manager.py
+++ b/task-sdk/src/airflow/sdk/plugins_manager.py
@@ -39,7 +39,6 @@ from airflow.sdk.providers_manager_runtime import 
ProvidersManagerTaskRuntime
 
 if TYPE_CHECKING:
     from airflow.sdk._shared.listeners.listener import ListenerManager
-    from airflow.sdk.lineage import HookLineageReader
 
 log = logging.getLogger(__name__)
 
@@ -132,14 +131,3 @@ def integrate_listener_plugins(listener_manager: 
ListenerManager) -> None:
     """Add listeners from plugins."""
     plugins, _ = _get_plugins()
     _integrate_listener_plugins(listener_manager, plugins=plugins)
-
-
-@cache
-def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
-    """Collect and get hook lineage reader classes registered by plugins."""
-    log.debug("Initialize hook lineage readers plugins")
-    result: list[type[HookLineageReader]] = []
-
-    for plugin in _get_plugins()[0]:
-        result.extend(plugin.hook_lineage_readers)
-    return result
diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py 
b/task-sdk/tests/task_sdk/docs/test_public_api.py
index e8d7dd68e9e..f53887ec5c9 100644
--- a/task-sdk/tests/task_sdk/docs/test_public_api.py
+++ b/task-sdk/tests/task_sdk/docs/test_public_api.py
@@ -63,7 +63,6 @@ def test_airflow_sdk_no_unexpected_exports():
         "listener",
         "crypto",
         "providers_manager_runtime",
-        "lineage",
     }
     unexpected = actual - public - ignore
     assert not unexpected, f"Unexpected exports in airflow.sdk: 
{sorted(unexpected)}"

Reply via email to