This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e871af6270e Move lineage from airflow core to task sdk (#60968)
e871af6270e is described below
commit e871af6270e97cbdf6be3fd1626d655773770f5d
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jan 27 22:19:03 2026 +0530
Move lineage from airflow core to task sdk (#60968)
Lineage collection happens exclusively during task execution and is only
used by worker processes. Server components such as the scheduler and API
server do not use it. Move the lineage module from airflow-core to the task SDK
to better align with the ongoing client–server separation.
---
.github/boring-cyborg.yml | 4 ++--
airflow-core/src/airflow/lineage/__init__.py | 11 +++++++++
airflow-core/src/airflow/plugins_manager.py | 12 ----------
airflow-core/tests/unit/lineage/__init__.py | 17 --------------
.../tests/test_pytest_args_for_test_types.py | 1 -
devel-common/src/tests_common/pytest_plugin.py | 2 +-
.../src/tests_common/test_utils/mock_plugins.py | 14 ++++++++++-
.../providers/common/compat/lineage/hook.py | 6 ++---
.../src/airflow/providers/common/compat/sdk.py | 27 +++++++++++++++++++++-
.../tests/unit/common/compat/lineage/test_hook.py | 4 ++--
.../providers/openlineage/plugins/openlineage.py | 2 +-
.../unit/openlineage/extractors/test_manager.py | 20 ++++++++++------
task-sdk/src/airflow/sdk/io/path.py | 6 ++---
.../hook.py => task-sdk/src/airflow/sdk/lineage.py | 12 ++++++----
task-sdk/src/airflow/sdk/plugins_manager.py | 12 ++++++++++
task-sdk/tests/task_sdk/docs/test_public_api.py | 1 +
.../tests/task_sdk/test_lineage.py | 15 +++++-------
17 files changed, 102 insertions(+), 64 deletions(-)
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index 9f42441f1c3..f484a482e74 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -442,8 +442,8 @@ labelPRBasedOnFilePath:
- airflow-core/docs/howto/usage-cli.rst
area:Lineage:
- - airflow-core/src/airflow/lineage/**/*
- - airflow-core/tests/unit/lineage/**/*
+ - task-sdk/src/airflow/sdk/lineage.py
+ - task-sdk/tests/task_sdk/test_lineage.py
- airflow-core/docs/administration-and-deployment/lineage.rst
area:Logging:
diff --git a/airflow-core/src/airflow/lineage/__init__.py
b/airflow-core/src/airflow/lineage/__init__.py
index 217e5db9607..15afa1a75de 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/src/airflow/lineage/__init__.py
@@ -15,3 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+add_deprecated_classes(
+ {
+ __name__: {"hook": "airflow.sdk.lineage"},
+ "hook": {"*": "airflow.sdk.lineage"},
+ },
+ package=__name__,
+)
diff --git a/airflow-core/src/airflow/plugins_manager.py
b/airflow-core/src/airflow/plugins_manager.py
index af05f1aacd2..f26878fa4ee 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -38,7 +38,6 @@ from airflow._shared.plugins_manager import (
from airflow.configuration import conf
if TYPE_CHECKING:
- from airflow.lineage.hook import HookLineageReader
from airflow.listeners.listener import ListenerManager
from airflow.partition_mapper.base import PartitionMapper
from airflow.task.priority_strategy import PriorityWeightStrategy
@@ -283,17 +282,6 @@ def get_partition_mapper_plugins() -> dict[str,
type[PartitionMapper]]:
}
-@cache
-def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
- """Collect and get hook lineage reader classes registered by plugins."""
- log.debug("Initialize hook lineage readers plugins")
- result: list[type[HookLineageReader]] = []
-
- for plugin in _get_plugins()[0]:
- result.extend(plugin.hook_lineage_readers)
- return result
-
-
@cache
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
diff --git a/airflow-core/tests/unit/lineage/__init__.py
b/airflow-core/tests/unit/lineage/__init__.py
deleted file mode 100644
index 217e5db9607..00000000000
--- a/airflow-core/tests/unit/lineage/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 982dc4e37b1..7e70fe6162a 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -163,7 +163,6 @@ def _find_all_integration_folders() -> list[str]:
"airflow-core/tests/unit/decorators",
"airflow-core/tests/unit/hooks",
"airflow-core/tests/unit/io",
- "airflow-core/tests/unit/lineage",
"airflow-core/tests/unit/listeners",
"airflow-core/tests/unit/logging",
"airflow-core/tests/unit/macros",
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index 258d3345706..cdb5ffe183a 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1980,7 +1980,7 @@ def _mock_plugins(request: pytest.FixtureRequest):
@pytest.fixture
def hook_lineage_collector():
- from airflow.lineage.hook import HookLineageCollector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
hlc = HookLineageCollector()
with mock.patch(
diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py
b/devel-common/src/tests_common/test_utils/mock_plugins.py
index dfc654d15ee..d4bc391418f 100644
--- a/devel-common/src/tests_common/test_utils/mock_plugins.py
+++ b/devel-common/src/tests_common/test_utils/mock_plugins.py
@@ -85,6 +85,7 @@ def mock_plugin_manager(plugins=None, **kwargs):
if AIRFLOW_V_3_2_PLUS:
# Always start the block with an non-initialized plugins, so
ensure_plugins_loaded runs.
from airflow import plugins_manager
+ from airflow.sdk import plugins_manager as sdk_plugins_manager
plugins_manager._get_plugins.cache_clear()
plugins_manager._get_ui_plugins.cache_clear()
@@ -92,10 +93,12 @@ def mock_plugin_manager(plugins=None, **kwargs):
plugins_manager.get_fastapi_plugins.cache_clear()
plugins_manager._get_extra_operators_links_plugins.cache_clear()
plugins_manager.get_timetables_plugins.cache_clear()
- plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
plugins_manager.integrate_macros_plugins.cache_clear()
plugins_manager.get_priority_weight_strategy_plugins.cache_clear()
+ sdk_plugins_manager.integrate_macros_plugins.cache_clear()
+ sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
+
if plugins is not None or "import_errors" in kwargs:
exit_stack.enter_context(
mock.patch(
@@ -106,6 +109,15 @@ def mock_plugin_manager(plugins=None, **kwargs):
),
)
)
+ exit_stack.enter_context(
+ mock.patch(
+ "airflow.sdk.plugins_manager._get_plugins",
+ return_value=(
+ plugins or [],
+ kwargs.get("import_errors", {}),
+ ),
+ )
+ )
elif kwargs:
raise NotImplementedError(
"mock_plugin_manager does not support patching other
attributes in Airflow 3.2+"
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 2eb07f446f3..37b352b2cc5 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
- from airflow.lineage.hook import LineageContext
+ from airflow.sdk.lineage import LineageContext
def _lacks_asset_methods(collector):
@@ -62,7 +62,7 @@ def _add_extra_polyfill(collector):
import attr
- from airflow.lineage.hook import HookLineage as _BaseHookLineage
+ from airflow.providers.common.compat.sdk import HookLineage as
_BaseHookLineage
# Add `extra` to HookLineage returned by `collected_assets` property
@attr.define
@@ -229,7 +229,7 @@ def get_hook_lineage_collector():
Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` -
apply single layer.
Airflow 3.2+: Collector has asset-based methods and `add_extra` support -
no action required.
"""
- from airflow.lineage.hook import get_hook_lineage_collector as
get_global_collector
+ from airflow.providers.common.compat.sdk import get_hook_lineage_collector
as get_global_collector
global_collector = get_global_collector()
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index ccb4e715abf..0cfa54dbd35 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -95,6 +95,13 @@ if TYPE_CHECKING:
TaskDeferred as TaskDeferred,
XComNotFound as XComNotFound,
)
+ from airflow.sdk.lineage import (
+ HookLineage as HookLineage,
+ HookLineageCollector as HookLineageCollector,
+ HookLineageReader as HookLineageReader,
+ NoOpCollector as NoOpCollector,
+ get_hook_lineage_collector as get_hook_lineage_collector,
+ )
from airflow.sdk.listener import get_listener_manager as
get_listener_manager
from airflow.sdk.log import redact as redact
from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin
@@ -126,6 +133,10 @@ _RENAME_MAP: dict[str, tuple[str, str, str]] = {
"AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
}
+# Airflow 3-only renames (not available in Airflow 2)
+_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {}
+
+
# Import map for classes/functions/constants
# Format: class_name -> module_path(s)
# - str: single module path (no fallback)
@@ -235,6 +246,15 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
#
============================================================================
"XCOM_RETURN_KEY": "airflow.models.xcom",
#
============================================================================
+ # Lineage
+ #
============================================================================
+ "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ "get_hook_lineage_collector": ("airflow.sdk.lineage",
"airflow.lineage.hook"),
+ "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo ->
AssetLineageInfo)
+ "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ #
============================================================================
# Exceptions (deprecated in airflow.exceptions, prefer SDK)
#
============================================================================
# Note: AirflowException and AirflowNotFoundException are not deprecated,
but exposing them
@@ -279,9 +299,14 @@ _AIRFLOW_3_ONLY_EXCEPTIONS: dict[str, tuple[str, ...]] = {
"DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"),
}
-# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+
+# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow
3+
if AIRFLOW_V_3_0_PLUS:
_IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS)
+ _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES)
+ # AssetLineageInfo exists in 3.0+ but location changed in 3.2
+ # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo
+ # 3.2+: airflow.sdk.lineage.AssetLineageInfo
+ _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage",
"airflow.lineage.hook")
# Module map: module_name -> module_path(s)
# For entire modules that have been moved (e.g., timezone)
diff --git
a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
index 9045512531e..c4d948e569b 100644
--- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
+++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
@@ -27,7 +27,7 @@ from tests_common.test_utils.version_compat import
AIRFLOW_V_3_0_PLUS
@pytest.fixture
def collector():
- from airflow.lineage.hook import HookLineageCollector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
# Patch the "inner" function that the compat version will call
with mock.patch(
@@ -41,7 +41,7 @@ def collector():
@pytest.fixture
def noop_collector():
- from airflow.lineage.hook import NoOpCollector
+ from airflow.providers.common.compat.sdk import NoOpCollector
# Patch the "inner" function that the compat version will call
with mock.patch(
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 374d8b2f06b..375b0bbf2f1 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -21,7 +21,7 @@ from airflow.providers.openlineage import conf
# Conditional imports - only load expensive dependencies when plugin is enabled
if not conf.is_disabled():
- from airflow.lineage.hook import HookLineageReader
+ from airflow.providers.common.compat.sdk import HookLineageReader
from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
diff --git
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 3989144009e..9e2b1782b81 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -50,19 +50,25 @@ if TYPE_CHECKING:
@pytest.fixture
def hook_lineage_collector():
- from airflow.lineage import hook
- from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
- hlc = hook.HookLineageCollector()
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+ hlc = HookLineageCollector()
+ patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+ if AIRFLOW_V_3_2_PLUS:
+ patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
if AIRFLOW_V_3_0_PLUS:
from unittest import mock
- with mock.patch(
- "airflow.lineage.hook.get_hook_lineage_collector",
- return_value=hlc,
- ):
+ with mock.patch(patch_target, return_value=hlc):
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
yield get_hook_lineage_collector()
else:
+ from airflow.lineage import hook
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
diff --git a/task-sdk/src/airflow/sdk/io/path.py
b/task-sdk/src/airflow/sdk/io/path.py
index 611dfe2c8df..3f87e6a1d95 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -44,7 +44,7 @@ class _TrackingFileWrapper:
self._obj = obj
def __getattr__(self, name):
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if not callable(attr := getattr(self._obj, name)):
return attr
@@ -312,7 +312,7 @@ class ObjectStoragePath(CloudPath):
kwargs: Additional keyword arguments to be passed to the underlying
implementation.
"""
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if isinstance(dst, str):
dst = ObjectStoragePath(dst)
@@ -380,7 +380,7 @@ class ObjectStoragePath(CloudPath):
kwargs: Additional keyword arguments to be passed to the underlying
implementation.
"""
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if isinstance(path, str):
path = ObjectStoragePath(path)
diff --git a/airflow-core/src/airflow/lineage/hook.py
b/task-sdk/src/airflow/sdk/lineage.py
similarity index 97%
rename from airflow-core/src/airflow/lineage/hook.py
rename to task-sdk/src/airflow/sdk/lineage.py
index 7c22a367006..acdf284a596 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/task-sdk/src/airflow/sdk/lineage.py
@@ -24,10 +24,11 @@ from functools import cache
from typing import TYPE_CHECKING, Any, TypeAlias
import attr
+import structlog
-from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
from airflow.sdk.definitions.asset import Asset
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
if TYPE_CHECKING:
from pydantic.types import JsonValue
@@ -38,6 +39,9 @@ if TYPE_CHECKING:
LineageContext: TypeAlias = BaseHook | ObjectStoragePath
+log = structlog.getLogger(__name__)
+
+
# Maximum number of assets input or output that can be collected in a single
hook execution.
# Input assets and output assets are collected separately.
MAX_COLLECTED_ASSETS = 100
@@ -106,7 +110,7 @@ class HookLineageCollector(LoggingMixin):
self._outputs: dict[str, tuple[Asset, LineageContext]] = {}
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
- self._asset_factories = ProvidersManager().asset_factories
+ self._asset_factories = ProvidersManagerTaskRuntime().asset_factories
self._extra_counts: dict[str, int] = defaultdict(int)
self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
@@ -335,7 +339,7 @@ class HookLineageReader(LoggingMixin):
@cache
def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
- from airflow import plugins_manager
+ from airflow.sdk import plugins_manager
if plugins_manager.get_hook_lineage_readers_plugins():
return HookLineageCollector()
diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py
b/task-sdk/src/airflow/sdk/plugins_manager.py
index 2cbdf1a44a4..4f4abbec68b 100644
--- a/task-sdk/src/airflow/sdk/plugins_manager.py
+++ b/task-sdk/src/airflow/sdk/plugins_manager.py
@@ -39,6 +39,7 @@ from airflow.sdk.providers_manager_runtime import
ProvidersManagerTaskRuntime
if TYPE_CHECKING:
from airflow.sdk._shared.listeners.listener import ListenerManager
+ from airflow.sdk.lineage import HookLineageReader
log = logging.getLogger(__name__)
@@ -131,3 +132,14 @@ def integrate_listener_plugins(listener_manager:
ListenerManager) -> None:
"""Add listeners from plugins."""
plugins, _ = _get_plugins()
_integrate_listener_plugins(listener_manager, plugins=plugins)
+
+
+@cache
+def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
+ """Collect and get hook lineage reader classes registered by plugins."""
+ log.debug("Initialize hook lineage readers plugins")
+ result: list[type[HookLineageReader]] = []
+
+ for plugin in _get_plugins()[0]:
+ result.extend(plugin.hook_lineage_readers)
+ return result
diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py
b/task-sdk/tests/task_sdk/docs/test_public_api.py
index f53887ec5c9..e8d7dd68e9e 100644
--- a/task-sdk/tests/task_sdk/docs/test_public_api.py
+++ b/task-sdk/tests/task_sdk/docs/test_public_api.py
@@ -63,6 +63,7 @@ def test_airflow_sdk_no_unexpected_exports():
"listener",
"crypto",
"providers_manager_runtime",
+ "lineage",
}
unexpected = actual - public - ignore
assert not unexpected, f"Unexpected exports in airflow.sdk:
{sorted(unexpected)}"
diff --git a/airflow-core/tests/unit/lineage/test_hook.py
b/task-sdk/tests/task_sdk/test_lineage.py
similarity index 98%
rename from airflow-core/tests/unit/lineage/test_hook.py
rename to task-sdk/tests/task_sdk/test_lineage.py
index f5403159247..3bf706cabb4 100644
--- a/airflow-core/tests/unit/lineage/test_hook.py
+++ b/task-sdk/tests/task_sdk/test_lineage.py
@@ -21,9 +21,8 @@ from unittest.mock import MagicMock, patch
import pytest
-from airflow import plugins_manager
-from airflow.lineage import hook
-from airflow.lineage.hook import (
+from airflow.sdk import Asset, BaseHook, plugins_manager
+from airflow.sdk.lineage import (
AssetLineageInfo,
HookLineage,
HookLineageCollector,
@@ -31,8 +30,6 @@ from airflow.lineage.hook import (
NoOpCollector,
get_hook_lineage_collector,
)
-from airflow.sdk import BaseHook
-from airflow.sdk.definitions.asset import Asset
from tests_common.test_utils.mock_plugins import mock_plugin_manager
@@ -137,7 +134,7 @@ class TestHookLineageCollector:
],
)
- @patch("airflow.lineage.hook.Asset")
+ @patch("airflow.sdk.lineage.Asset")
def test_add_input_asset(self, mock_asset, collector):
asset = MagicMock(spec=Asset, extra={})
mock_asset.return_value = asset
@@ -196,7 +193,7 @@ class TestHookLineageCollector:
uri="myscheme://value_1/value_2", name="asset-value_1",
group="test", extra={"key": "value"}
)
- @patch("airflow.lineage.hook.ProvidersManager")
+ @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
def test_create_asset_no_factory(self, mock_providers_manager, collector):
test_scheme = "myscheme"
mock_providers_manager.return_value.asset_factories = {}
@@ -215,7 +212,7 @@ class TestHookLineageCollector:
is None
)
- @patch("airflow.lineage.hook.ProvidersManager")
+ @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
def test_create_asset_factory_exception(self, mock_providers_manager,
collector):
def create_asset(extra=None, **kwargs):
raise RuntimeError("Factory error")
@@ -873,7 +870,7 @@ class FakePlugin(plugins_manager.AirflowPlugin):
)
def test_get_hook_lineage_collector(has_readers, expected_class):
# reset cached instance
- hook.get_hook_lineage_collector.cache_clear()
+ get_hook_lineage_collector.cache_clear()
plugins = [FakePlugin()] if has_readers else []
with mock_plugin_manager(plugins=plugins):
assert isinstance(get_hook_lineage_collector(), expected_class)