This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new a2f302de1c4 Issue deprecation warning for plugins registering
`ti_deps` (#45742)
a2f302de1c4 is described below
commit a2f302de1c41af74261bbaebc0159789f6020d0a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Jan 29 11:03:16 2025 +0000
Issue deprecation warning for plugins registering `ti_deps` (#45742)
This is removed in Airflow3 via #45713
---
airflow/plugins_manager.py | 12 ++++++++++++
tests/plugins/test_plugins_manager.py | 17 +++++++++++++++++
tests/serialization/test_dag_serialization.py | 20 ++++++++++++++++++++
3 files changed, 49 insertions(+)
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 8ccdef2c639..bb90d80ec5b 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -27,6 +27,7 @@ import logging
import os
import sys
import types
+import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable
@@ -431,6 +432,17 @@ def initialize_ti_deps_plugins():
registered_ti_dep_classes = {}
for plugin in plugins:
+ if not plugin.ti_deps:
+ continue
+
+ from airflow.exceptions import RemovedInAirflow3Warning
+
+ warnings.warn(
+ "Using custom `ti_deps` on operators has been removed in Airflow
3.0",
+ RemovedInAirflow3Warning,
+ stacklevel=1,
+ )
+
registered_ti_dep_classes.update(
{qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in
plugin.ti_deps}
)
diff --git a/tests/plugins/test_plugins_manager.py
b/tests/plugins/test_plugins_manager.py
index 2426352fc85..2e7ddd9bac8 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -28,6 +28,7 @@ from unittest import mock
import pytest
+from airflow.exceptions import RemovedInAirflow3Warning
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import get_listener_manager
from airflow.plugins_manager import AirflowPlugin
@@ -174,6 +175,11 @@ class TestPluginsManager:
plugins_manager.loaded_plugins = set()
plugins_manager.plugins = []
+ yield
+ plugins_manager.loaded_plugins = set()
+
+ plugins_manager.registered_ti_dep_classes = None
+ plugins_manager.plugins = None
def test_no_log_when_no_plugins(self, caplog):
with mock_plugin_manager(plugins=[]):
@@ -270,6 +276,17 @@ class TestPluginsManager:
),
]
+ def test_deprecate_ti_deps(self):
+ class DeprecatedTIDeps(AirflowPlugin):
+ name = "ti_deps"
+
+ ti_deps = [mock.MagicMock()]
+
+ with mock_plugin_manager(plugins=[DeprecatedTIDeps()]),
pytest.warns(RemovedInAirflow3Warning):
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_ti_deps_plugins()
+
def test_should_not_warning_about_fab_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index d7f09c20ff9..58f16d80f8c 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -408,6 +408,21 @@ def timetable_plugin(monkeypatch):
)
[email protected]
+def custom_ti_dep(monkeypatch):
+ """Patch plugins manager to always and only return our custom timetable."""
+ from test_plugin import CustomTestTriggerRule
+
+ from airflow import plugins_manager
+
+ monkeypatch.setattr(plugins_manager, "initialize_ti_deps_plugins", lambda:
None)
+ monkeypatch.setattr(
+ plugins_manager,
+ "registered_ti_dep_classes",
+ {"test_plugin.CustomTestTriggerRule": CustomTestTriggerRule},
+ )
+
+
# TODO: (potiuk) - AIP-44 - check why this test hangs
@pytest.mark.skip_if_database_isolation_mode
class TestStringifiedDAGs:
@@ -430,6 +445,7 @@ class TestStringifiedDAGs:
)
@pytest.mark.db_test
+
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialization(self):
"""Serialization and deserialization should work for every DAG and
Operator."""
dags = collect_dags()
@@ -539,6 +555,7 @@ class TestStringifiedDAGs:
return actual, expected
@pytest.mark.db_test
+
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_deserialization_across_process(self):
"""A serialized DAG can be deserialized in another process."""
@@ -1596,6 +1613,7 @@ class TestStringifiedDAGs:
"airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep",
]
+
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_error_on_unregistered_ti_dep_serialization(self):
# trigger rule not registered through the plugin system will not be
serialized
class DummyTriggerRule(BaseTIDep):
@@ -1634,6 +1652,8 @@ class TestStringifiedDAGs:
SerializedBaseOperator.deserialize_operator(serialize_op)
@pytest.mark.db_test
+ @pytest.mark.usefixtures("custom_ti_dep")
+
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialize_and_deserialize_custom_ti_deps(self):
from test_plugin import CustomTestTriggerRule