This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new a2f302de1c4 Issue deprecation warning for plugins registering 
`ti_deps` (#45742)
a2f302de1c4 is described below

commit a2f302de1c41af74261bbaebc0159789f6020d0a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Jan 29 11:03:16 2025 +0000

    Issue deprecation warning for plugins registering `ti_deps` (#45742)
    
    This is removed in Airflow3 via #45713
---
 airflow/plugins_manager.py                    | 12 ++++++++++++
 tests/plugins/test_plugins_manager.py         | 17 +++++++++++++++++
 tests/serialization/test_dag_serialization.py | 20 ++++++++++++++++++++
 3 files changed, 49 insertions(+)

diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 8ccdef2c639..bb90d80ec5b 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -27,6 +27,7 @@ import logging
 import os
 import sys
 import types
+import warnings
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Iterable
 
@@ -431,6 +432,17 @@ def initialize_ti_deps_plugins():
     registered_ti_dep_classes = {}
 
     for plugin in plugins:
+        if not plugin.ti_deps:
+            continue
+
+        from airflow.exceptions import RemovedInAirflow3Warning
+
+        warnings.warn(
+            "Using custom `ti_deps` on operators has been removed in Airflow 
3.0",
+            RemovedInAirflow3Warning,
+            stacklevel=1,
+        )
+
         registered_ti_dep_classes.update(
             {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in 
plugin.ti_deps}
         )
diff --git a/tests/plugins/test_plugins_manager.py 
b/tests/plugins/test_plugins_manager.py
index 2426352fc85..2e7ddd9bac8 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -28,6 +28,7 @@ from unittest import mock
 
 import pytest
 
+from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.hooks.base import BaseHook
 from airflow.listeners.listener import get_listener_manager
 from airflow.plugins_manager import AirflowPlugin
@@ -174,6 +175,11 @@ class TestPluginsManager:
 
         plugins_manager.loaded_plugins = set()
         plugins_manager.plugins = []
+        yield
+        plugins_manager.loaded_plugins = set()
+
+        plugins_manager.registered_ti_dep_classes = None
+        plugins_manager.plugins = None
 
     def test_no_log_when_no_plugins(self, caplog):
         with mock_plugin_manager(plugins=[]):
@@ -270,6 +276,17 @@ class TestPluginsManager:
             ),
         ]
 
+    def test_deprecate_ti_deps(self):
+        class DeprecatedTIDeps(AirflowPlugin):
+            name = "ti_deps"
+
+            ti_deps = [mock.MagicMock()]
+
+        with mock_plugin_manager(plugins=[DeprecatedTIDeps()]), 
pytest.warns(RemovedInAirflow3Warning):
+            from airflow import plugins_manager
+
+            plugins_manager.initialize_ti_deps_plugins()
+
     def test_should_not_warning_about_fab_plugins(self, caplog):
         class AirflowAdminViewsPlugin(AirflowPlugin):
             name = "test_admin_views_plugin"
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index d7f09c20ff9..58f16d80f8c 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -408,6 +408,21 @@ def timetable_plugin(monkeypatch):
     )
 
 
[email protected]
+def custom_ti_dep(monkeypatch):
+    """Patch plugins manager to always and only return our custom timetable."""
+    from test_plugin import CustomTestTriggerRule
+
+    from airflow import plugins_manager
+
+    monkeypatch.setattr(plugins_manager, "initialize_ti_deps_plugins", lambda: 
None)
+    monkeypatch.setattr(
+        plugins_manager,
+        "registered_ti_dep_classes",
+        {"test_plugin.CustomTestTriggerRule": CustomTestTriggerRule},
+    )
+
+
 # TODO: (potiuk) - AIP-44 - check why this test hangs
 @pytest.mark.skip_if_database_isolation_mode
 class TestStringifiedDAGs:
@@ -430,6 +445,7 @@ class TestStringifiedDAGs:
             )
 
     @pytest.mark.db_test
+    
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
     def test_serialization(self):
         """Serialization and deserialization should work for every DAG and 
Operator."""
         dags = collect_dags()
@@ -539,6 +555,7 @@ class TestStringifiedDAGs:
         return actual, expected
 
     @pytest.mark.db_test
+    
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
     def test_deserialization_across_process(self):
         """A serialized DAG can be deserialized in another process."""
 
@@ -1596,6 +1613,7 @@ class TestStringifiedDAGs:
             "airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep",
         ]
 
+    
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
     def test_error_on_unregistered_ti_dep_serialization(self):
         # trigger rule not registered through the plugin system will not be 
serialized
         class DummyTriggerRule(BaseTIDep):
@@ -1634,6 +1652,8 @@ class TestStringifiedDAGs:
             SerializedBaseOperator.deserialize_operator(serialize_op)
 
     @pytest.mark.db_test
+    @pytest.mark.usefixtures("custom_ti_dep")
+    
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
     def test_serialize_and_deserialize_custom_ti_deps(self):
         from test_plugin import CustomTestTriggerRule
 

Reply via email to