This is an automated email from the ASF dual-hosted git repository. jscheffl pushed a commit to branch revert-60968-lineage-to-sdk in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 33ff3e6dec41d1e80aaf105dcc4a3e319f38ef0d Author: Jens Scheffler <[email protected]> AuthorDate: Tue Jan 27 22:19:11 2026 +0100 Revert "Move lineage from airflow core to task sdk (#60968)" This reverts commit e871af6270e97cbdf6be3fd1626d655773770f5d. --- .github/boring-cyborg.yml | 4 ++-- airflow-core/src/airflow/lineage/__init__.py | 11 --------- .../src/airflow/lineage/hook.py | 12 ++++------ airflow-core/src/airflow/plugins_manager.py | 12 ++++++++++ .../airflow => tests/unit}/lineage/__init__.py | 11 --------- .../tests/unit/lineage/test_hook.py | 15 +++++++----- .../tests/test_pytest_args_for_test_types.py | 1 + devel-common/src/tests_common/pytest_plugin.py | 2 +- .../src/tests_common/test_utils/mock_plugins.py | 14 +---------- .../providers/common/compat/lineage/hook.py | 6 ++--- .../src/airflow/providers/common/compat/sdk.py | 27 +--------------------- .../tests/unit/common/compat/lineage/test_hook.py | 4 ++-- .../providers/openlineage/plugins/openlineage.py | 2 +- .../unit/openlineage/extractors/test_manager.py | 20 ++++++---------- task-sdk/src/airflow/sdk/io/path.py | 6 ++--- task-sdk/src/airflow/sdk/plugins_manager.py | 12 ---------- task-sdk/tests/task_sdk/docs/test_public_api.py | 1 - 17 files changed, 47 insertions(+), 113 deletions(-) diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index f484a482e74..9f42441f1c3 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -442,8 +442,8 @@ labelPRBasedOnFilePath: - airflow-core/docs/howto/usage-cli.rst area:Lineage: - - task-sdk/src/airflow/sdk/lineage.py - - task-sdk/tests/task_sdk/test_lineage.py + - airflow-core/src/airflow/lineage/**/* + - airflow-core/tests/unit/lineage/**/* - airflow-core/docs/administration-and-deployment/lineage.rst area:Logging: diff --git a/airflow-core/src/airflow/lineage/__init__.py b/airflow-core/src/airflow/lineage/__init__.py index 15afa1a75de..217e5db9607 100644 --- a/airflow-core/src/airflow/lineage/__init__.py +++ b/airflow-core/src/airflow/lineage/__init__.py @@ -15,14 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from __future__ import annotations - -from airflow.utils.deprecation_tools import add_deprecated_classes - -add_deprecated_classes( - { - __name__: {"hook": "airflow.sdk.lineage"}, - "hook": {"*": "airflow.sdk.lineage"}, - }, - package=__name__, -) diff --git a/task-sdk/src/airflow/sdk/lineage.py b/airflow-core/src/airflow/lineage/hook.py similarity index 97% rename from task-sdk/src/airflow/sdk/lineage.py rename to airflow-core/src/airflow/lineage/hook.py index acdf284a596..7c22a367006 100644 --- a/task-sdk/src/airflow/sdk/lineage.py +++ b/airflow-core/src/airflow/lineage/hook.py @@ -24,11 +24,10 @@ from functools import cache from typing import TYPE_CHECKING, Any, TypeAlias import attr -import structlog -from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin +from airflow.providers_manager import ProvidersManager from airflow.sdk.definitions.asset import Asset -from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime +from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from pydantic.types import JsonValue @@ -39,9 +38,6 @@ if TYPE_CHECKING: LineageContext: TypeAlias = BaseHook | ObjectStoragePath -log = structlog.getLogger(__name__) - - # Maximum number of assets input or output that can be collected in a single hook execution. # Input assets and output assets are collected separately. MAX_COLLECTED_ASSETS = 100 @@ -110,7 +106,7 @@ class HookLineageCollector(LoggingMixin): self._outputs: dict[str, tuple[Asset, LineageContext]] = {} self._input_counts: dict[str, int] = defaultdict(int) self._output_counts: dict[str, int] = defaultdict(int) - self._asset_factories = ProvidersManagerTaskRuntime().asset_factories + self._asset_factories = ProvidersManager().asset_factories self._extra_counts: dict[str, int] = defaultdict(int) self._extra: dict[str, tuple[str, Any, LineageContext]] = {} @@ -339,7 +335,7 @@ class HookLineageReader(LoggingMixin): @cache def get_hook_lineage_collector() -> HookLineageCollector: """Get singleton lineage collector.""" - from airflow.sdk import plugins_manager + from airflow import plugins_manager if plugins_manager.get_hook_lineage_readers_plugins(): return HookLineageCollector() diff --git a/airflow-core/src/airflow/plugins_manager.py b/airflow-core/src/airflow/plugins_manager.py index f26878fa4ee..af05f1aacd2 100644 --- a/airflow-core/src/airflow/plugins_manager.py +++ b/airflow-core/src/airflow/plugins_manager.py @@ -38,6 +38,7 @@ from airflow._shared.plugins_manager import ( from airflow.configuration import conf if TYPE_CHECKING: + from airflow.lineage.hook import HookLineageReader from airflow.listeners.listener import ListenerManager from airflow.partition_mapper.base import PartitionMapper from airflow.task.priority_strategy import PriorityWeightStrategy @@ -282,6 +283,17 @@ def get_partition_mapper_plugins() -> dict[str, type[PartitionMapper]]: } +@cache +def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: + """Collect and get hook lineage reader classes registered by plugins.""" + log.debug("Initialize hook lineage readers plugins") + result: list[type[HookLineageReader]] = [] + + for plugin in _get_plugins()[0]: + result.extend(plugin.hook_lineage_readers) + return result + + @cache def integrate_macros_plugins() -> None: """Integrates macro plugins.""" diff --git a/airflow-core/src/airflow/lineage/__init__.py b/airflow-core/tests/unit/lineage/__init__.py similarity index 75% copy from airflow-core/src/airflow/lineage/__init__.py copy to airflow-core/tests/unit/lineage/__init__.py index 15afa1a75de..217e5db9607 100644 --- a/airflow-core/src/airflow/lineage/__init__.py +++ b/airflow-core/tests/unit/lineage/__init__.py @@ -15,14 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from __future__ import annotations - -from airflow.utils.deprecation_tools import add_deprecated_classes - -add_deprecated_classes( - { - __name__: {"hook": "airflow.sdk.lineage"}, - "hook": {"*": "airflow.sdk.lineage"}, - }, - package=__name__, -) diff --git a/task-sdk/tests/task_sdk/test_lineage.py b/airflow-core/tests/unit/lineage/test_hook.py similarity index 98% rename from task-sdk/tests/task_sdk/test_lineage.py rename to airflow-core/tests/unit/lineage/test_hook.py index 3bf706cabb4..f5403159247 100644 --- a/task-sdk/tests/task_sdk/test_lineage.py +++ b/airflow-core/tests/unit/lineage/test_hook.py @@ -21,8 +21,9 @@ from unittest.mock import MagicMock, patch import pytest -from airflow.sdk import Asset, BaseHook, plugins_manager -from airflow.sdk.lineage import ( +from airflow import plugins_manager +from airflow.lineage import hook +from airflow.lineage.hook import ( AssetLineageInfo, HookLineage, HookLineageCollector, @@ -30,6 +31,8 @@ from airflow.sdk.lineage import ( NoOpCollector, get_hook_lineage_collector, ) +from airflow.sdk import BaseHook +from airflow.sdk.definitions.asset import Asset from tests_common.test_utils.mock_plugins import mock_plugin_manager @@ -134,7 +137,7 @@ class TestHookLineageCollector: ], ) - @patch("airflow.sdk.lineage.Asset") + @patch("airflow.lineage.hook.Asset") def test_add_input_asset(self, mock_asset, collector): asset = MagicMock(spec=Asset, extra={}) mock_asset.return_value = asset @@ -193,7 +196,7 @@ class TestHookLineageCollector: uri="myscheme://value_1/value_2", name="asset-value_1", group="test", extra={"key": "value"} ) - @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime") + @patch("airflow.lineage.hook.ProvidersManager") def test_create_asset_no_factory(self, mock_providers_manager, collector): test_scheme = "myscheme" mock_providers_manager.return_value.asset_factories = {} @@ -212,7 +215,7 @@ class TestHookLineageCollector: is None ) - @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime") + @patch("airflow.lineage.hook.ProvidersManager") def test_create_asset_factory_exception(self, mock_providers_manager, collector): def create_asset(extra=None, **kwargs): raise RuntimeError("Factory error") @@ -870,7 +873,7 @@ class FakePlugin(plugins_manager.AirflowPlugin): ) def test_get_hook_lineage_collector(has_readers, expected_class): # reset cached instance - get_hook_lineage_collector.cache_clear() + hook.get_hook_lineage_collector.cache_clear() plugins = [FakePlugin()] if has_readers else [] with mock_plugin_manager(plugins=plugins): assert isinstance(get_hook_lineage_collector(), expected_class) diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index 7e70fe6162a..982dc4e37b1 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -163,6 +163,7 @@ def _find_all_integration_folders() -> list[str]: "airflow-core/tests/unit/decorators", "airflow-core/tests/unit/hooks", "airflow-core/tests/unit/io", + "airflow-core/tests/unit/lineage", "airflow-core/tests/unit/listeners", "airflow-core/tests/unit/logging", "airflow-core/tests/unit/macros", diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index cdb5ffe183a..258d3345706 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -1980,7 +1980,7 @@ def _mock_plugins(request: pytest.FixtureRequest): @pytest.fixture def hook_lineage_collector(): - from airflow.providers.common.compat.sdk import HookLineageCollector + from airflow.lineage.hook import HookLineageCollector hlc = HookLineageCollector() with mock.patch( diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py b/devel-common/src/tests_common/test_utils/mock_plugins.py index d4bc391418f..dfc654d15ee 100644 --- a/devel-common/src/tests_common/test_utils/mock_plugins.py +++ b/devel-common/src/tests_common/test_utils/mock_plugins.py @@ -85,7 +85,6 @@ def mock_plugin_manager(plugins=None, **kwargs): if AIRFLOW_V_3_2_PLUS: # Always start the block with an non-initialized plugins, so ensure_plugins_loaded runs. from airflow import plugins_manager - from airflow.sdk import plugins_manager as sdk_plugins_manager plugins_manager._get_plugins.cache_clear() plugins_manager._get_ui_plugins.cache_clear() @@ -93,12 +92,10 @@ def mock_plugin_manager(plugins=None, **kwargs): plugins_manager.get_fastapi_plugins.cache_clear() plugins_manager._get_extra_operators_links_plugins.cache_clear() plugins_manager.get_timetables_plugins.cache_clear() + plugins_manager.get_hook_lineage_readers_plugins.cache_clear() plugins_manager.integrate_macros_plugins.cache_clear() plugins_manager.get_priority_weight_strategy_plugins.cache_clear() - sdk_plugins_manager.integrate_macros_plugins.cache_clear() - sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear() - if plugins is not None or "import_errors" in kwargs: exit_stack.enter_context( mock.patch( @@ -109,15 +106,6 @@ def mock_plugin_manager(plugins=None, **kwargs): ), ) ) - exit_stack.enter_context( - mock.patch( - "airflow.sdk.plugins_manager._get_plugins", - return_value=( - plugins or [], - kwargs.get("import_errors", {}), - ), - ) - ) elif kwargs: raise NotImplementedError( "mock_plugin_manager does not support patching other attributes in Airflow 3.2+" diff --git a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py index 37b352b2cc5..2eb07f446f3 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py +++ b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any - from airflow.sdk.lineage import LineageContext + from airflow.lineage.hook import LineageContext def _lacks_asset_methods(collector): @@ -62,7 +62,7 @@ def _add_extra_polyfill(collector): import attr - from airflow.providers.common.compat.sdk import HookLineage as _BaseHookLineage + from airflow.lineage.hook import HookLineage as _BaseHookLineage # Add `extra` to HookLineage returned by `collected_assets` property @attr.define @@ -229,7 +229,7 @@ def get_hook_lineage_collector(): Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - apply single layer. Airflow 3.2+: Collector has asset-based methods and `add_extra` support - no action required. """ - from airflow.providers.common.compat.sdk import get_hook_lineage_collector as get_global_collector + from airflow.lineage.hook import get_hook_lineage_collector as get_global_collector global_collector = get_global_collector() diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py b/providers/common/compat/src/airflow/providers/common/compat/sdk.py index 0cfa54dbd35..ccb4e715abf 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py +++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py @@ -95,13 +95,6 @@ if TYPE_CHECKING: TaskDeferred as TaskDeferred, XComNotFound as XComNotFound, ) - from airflow.sdk.lineage import ( - HookLineage as HookLineage, - HookLineageCollector as HookLineageCollector, - HookLineageReader as HookLineageReader, - NoOpCollector as NoOpCollector, - get_hook_lineage_collector as get_hook_lineage_collector, - ) from airflow.sdk.listener import get_listener_manager as get_listener_manager from airflow.sdk.log import redact as redact from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin @@ -133,10 +126,6 @@ _RENAME_MAP: dict[str, tuple[str, str, str]] = { "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"), } -# Airflow 3-only renames (not available in Airflow 2) -_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {} - - # Import map for classes/functions/constants # Format: class_name -> module_path(s) # - str: single module path (no fallback) @@ -246,15 +235,6 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = { # ============================================================================ "XCOM_RETURN_KEY": "airflow.models.xcom", # ============================================================================ - # Lineage - # ============================================================================ - "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"), - "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"), - "get_hook_lineage_collector": ("airflow.sdk.lineage", "airflow.lineage.hook"), - "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"), - # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo -> AssetLineageInfo) - "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"), - # ============================================================================ # Exceptions (deprecated in airflow.exceptions, prefer SDK) # ============================================================================ # Note: AirflowException and AirflowNotFoundException are not deprecated, but exposing them @@ -299,14 +279,9 @@ _AIRFLOW_3_ONLY_EXCEPTIONS: dict[str, tuple[str, ...]] = { "DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"), } -# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow 3+ +# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+ if AIRFLOW_V_3_0_PLUS: _IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS) - _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES) - # AssetLineageInfo exists in 3.0+ but location changed in 3.2 - # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo - # 3.2+: airflow.sdk.lineage.AssetLineageInfo - _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage", "airflow.lineage.hook") # Module map: module_name -> module_path(s) # For entire modules that have been moved (e.g., timezone) diff --git a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py index c4d948e569b..9045512531e 100644 --- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py +++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py @@ -27,7 +27,7 @@ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @pytest.fixture def collector(): - from airflow.providers.common.compat.sdk import HookLineageCollector + from airflow.lineage.hook import HookLineageCollector # Patch the "inner" function that the compat version will call with mock.patch( @@ -41,7 +41,7 @@ def collector(): @pytest.fixture def noop_collector(): - from airflow.providers.common.compat.sdk import NoOpCollector + from airflow.lineage.hook import NoOpCollector # Patch the "inner" function that the compat version will call with mock.patch( diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py index 375b0bbf2f1..374d8b2f06b 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py @@ -21,7 +21,7 @@ from airflow.providers.openlineage import conf # Conditional imports - only load expensive dependencies when plugin is enabled if not conf.is_disabled(): - from airflow.providers.common.compat.sdk import HookLineageReader + from airflow.lineage.hook import HookLineageReader from airflow.providers.openlineage.plugins.listener import get_openlineage_listener from airflow.providers.openlineage.plugins.macros import ( lineage_job_name, diff --git a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py index 9e2b1782b81..3989144009e 100644 --- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py +++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py @@ -50,25 +50,19 @@ if TYPE_CHECKING: @pytest.fixture def hook_lineage_collector(): - from airflow.providers.common.compat.sdk import HookLineageCollector + from airflow.lineage import hook + from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector - from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS - - hlc = HookLineageCollector() - patch_target = "airflow.lineage.hook.get_hook_lineage_collector" - if AIRFLOW_V_3_2_PLUS: - patch_target = "airflow.sdk.lineage.get_hook_lineage_collector" + hlc = hook.HookLineageCollector() if AIRFLOW_V_3_0_PLUS: from unittest import mock - with mock.patch(patch_target, return_value=hlc): - from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector - + with mock.patch( + "airflow.lineage.hook.get_hook_lineage_collector", + return_value=hlc, + ): yield get_hook_lineage_collector() else: - from airflow.lineage import hook - from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector - hook._hook_lineage_collector = hlc yield get_hook_lineage_collector() diff --git a/task-sdk/src/airflow/sdk/io/path.py b/task-sdk/src/airflow/sdk/io/path.py index 3f87e6a1d95..611dfe2c8df 100644 --- a/task-sdk/src/airflow/sdk/io/path.py +++ b/task-sdk/src/airflow/sdk/io/path.py @@ -44,7 +44,7 @@ class _TrackingFileWrapper: self._obj = obj def __getattr__(self, name): - from airflow.sdk.lineage import get_hook_lineage_collector + from airflow.lineage.hook import get_hook_lineage_collector if not callable(attr := getattr(self._obj, name)): return attr @@ -312,7 +312,7 @@ class ObjectStoragePath(CloudPath): kwargs: Additional keyword arguments to be passed to the underlying implementation. """ - from airflow.sdk.lineage import get_hook_lineage_collector + from airflow.lineage.hook import get_hook_lineage_collector if isinstance(dst, str): dst = ObjectStoragePath(dst) @@ -380,7 +380,7 @@ class ObjectStoragePath(CloudPath): kwargs: Additional keyword arguments to be passed to the underlying implementation. """ - from airflow.sdk.lineage import get_hook_lineage_collector + from airflow.lineage.hook import get_hook_lineage_collector if isinstance(path, str): path = ObjectStoragePath(path) diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py b/task-sdk/src/airflow/sdk/plugins_manager.py index 4f4abbec68b..2cbdf1a44a4 100644 --- a/task-sdk/src/airflow/sdk/plugins_manager.py +++ b/task-sdk/src/airflow/sdk/plugins_manager.py @@ -39,7 +39,6 @@ from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime if TYPE_CHECKING: from airflow.sdk._shared.listeners.listener import ListenerManager - from airflow.sdk.lineage import HookLineageReader log = logging.getLogger(__name__) @@ -132,14 +131,3 @@ def integrate_listener_plugins(listener_manager: ListenerManager) -> None: """Add listeners from plugins.""" plugins, _ = _get_plugins() _integrate_listener_plugins(listener_manager, plugins=plugins) - - -@cache -def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: - """Collect and get hook lineage reader classes registered by plugins.""" - log.debug("Initialize hook lineage readers plugins") - result: list[type[HookLineageReader]] = [] - - for plugin in _get_plugins()[0]: - result.extend(plugin.hook_lineage_readers) - return result diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py b/task-sdk/tests/task_sdk/docs/test_public_api.py index e8d7dd68e9e..f53887ec5c9 100644 --- a/task-sdk/tests/task_sdk/docs/test_public_api.py +++ b/task-sdk/tests/task_sdk/docs/test_public_api.py @@ -63,7 +63,6 @@ def test_airflow_sdk_no_unexpected_exports(): "listener", "crypto", "providers_manager_runtime", - "lineage", } unexpected = actual - public - ignore assert not unexpected, f"Unexpected exports in airflow.sdk: {sorted(unexpected)}"
