This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2a85aca306ebdc549fe2fa0251b7ae2abe9d18c9
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Jan 20 13:23:08 2026 +0000

    Fix unnecessary DAG version churn when DAG file paths change (#60799)
    
    (cherry picked from commit fbb200e4151a5ea842a88f333f6b337d47ee431f)
---
 .../airflow/serialization/serialized_objects.py    | 14 +++--
 airflow-core/src/airflow/utils/module_loading.py   | 23 ++++++-
 .../unit/serialization/test_dag_serialization.py   | 41 +++++++++++--
 .../tests/unit/utils/test_module_loading.py        | 70 +++++++++++++++++++++-
 4 files changed, 134 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 62259d131f0..47226751e5b 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1471,11 +1471,13 @@ class SerializedBaseOperator(DAGNode, 
BaseSerialization):
                     continue
                 serialized_op["partial_kwargs"].update({k: cls.serialize(v)})
 
-        # we want to store python_callable_name, not python_callable
+        # Store python_callable_name instead of python_callable.
+        # exclude_module=True ensures stable names across bundle version 
changes.
         python_callable = op.partial_kwargs.get("python_callable", None)
         if python_callable:
-            callable_name = qualname(python_callable)
-            serialized_op["partial_kwargs"]["python_callable_name"] = 
callable_name
+            serialized_op["partial_kwargs"]["python_callable_name"] = qualname(
+                python_callable, exclude_module=True
+            )
             del serialized_op["partial_kwargs"]["python_callable"]
 
         serialized_op["_is_mapped"] = True
@@ -1496,11 +1498,11 @@ class SerializedBaseOperator(DAGNode, 
BaseSerialization):
                 if attr in serialize_op:
                     del serialize_op[attr]
 
-        # Detect if there's a change in python callable name
+        # Store python_callable_name for change detection.
+        # exclude_module=True ensures stable names across bundle version 
changes.
         python_callable = getattr(op, "python_callable", None)
         if python_callable:
-            callable_name = qualname(python_callable)
-            serialize_op["python_callable_name"] = callable_name
+            serialize_op["python_callable_name"] = qualname(python_callable, 
exclude_module=True)
 
         serialize_op["task_type"] = getattr(op, "task_type", type(op).__name__)
         serialize_op["_task_module"] = getattr(op, "_task_module", 
type(op).__module__)
diff --git a/airflow-core/src/airflow/utils/module_loading.py 
b/airflow-core/src/airflow/utils/module_loading.py
index 50ae287c36d..71fc8126167 100644
--- a/airflow-core/src/airflow/utils/module_loading.py
+++ b/airflow-core/src/airflow/utils/module_loading.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import functools
 import pkgutil
 from collections.abc import Callable
 from importlib import import_module
@@ -46,9 +47,24 @@ def import_string(dotted_path: str):
         raise ImportError(f'Module "{module_path}" does not define a 
"{class_name}" attribute/class')
 
 
-def qualname(o: object | Callable, use_qualname: bool = False) -> str:
-    """Convert an attribute/class/callable to a string importable by 
``import_string``."""
+def qualname(o: object | Callable, use_qualname: bool = False, exclude_module: 
bool = False) -> str:
+    """
+    Convert an attribute/class/callable to a string.
+
+    By default, returns a string importable by ``import_string`` (includes 
module path).
+    With exclude_module=True, returns only the qualified name without module 
prefix,
+    useful for stable identification across deployments where module paths may 
vary.
+    """
     if callable(o) and hasattr(o, "__module__"):
+        if exclude_module:
+            if hasattr(o, "__qualname__"):
+                return o.__qualname__
+            if hasattr(o, "__name__"):
+                return o.__name__
+            # Handle functools.partial objects specifically (not just any 
object with 'func' attr)
+            if isinstance(o, functools.partial):
+                return qualname(o.func, exclude_module=True)
+            return type(o).__qualname__
         if use_qualname and hasattr(o, "__qualname__"):
             return f"{o.__module__}.{o.__qualname__}"
         if hasattr(o, "__name__"):
@@ -62,6 +78,9 @@ def qualname(o: object | Callable, use_qualname: bool = 
False) -> str:
     name = cls.__qualname__
     module = cls.__module__
 
+    if exclude_module:
+        return name
+
     if module and module != "__builtin__":
         return f"{module}.{name}"
 
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 24c0bb46df6..cae004ac8bf 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -22,6 +22,7 @@ from __future__ import annotations
 import contextlib
 import copy
 import dataclasses
+import functools
 import importlib
 import importlib.util
 import json
@@ -80,7 +81,6 @@ from airflow.task.priority_strategy import 
_AbsolutePriorityWeightStrategy, _Dow
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.timetables.simple import NullTimetable, OnceTimetable
 from airflow.triggers.base import StartTriggerArgs
-from airflow.utils.module_loading import qualname
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker, skip_if_not_on_main
@@ -2885,7 +2885,7 @@ def test_taskflow_expand_serde():
         },
         "_disallow_kwargs_override": False,
         "_expand_input_attr": "op_kwargs_expand_input",
-        "python_callable_name": qualname(x),
+        "python_callable_name": "test_taskflow_expand_serde.<locals>.x",
     }
 
     deserialized = BaseSerialization.deserialize(serialized)
@@ -2952,7 +2952,7 @@ def test_taskflow_expand_kwargs_serde(strict):
         "_task_module": "airflow.providers.standard.decorators.python",
         "task_type": "_PythonDecoratedOperator",
         "_operator_name": "@task",
-        "python_callable_name": qualname(x),
+        "python_callable_name": "test_taskflow_expand_kwargs_serde.<locals>.x",
         "partial_kwargs": {
             "op_args": [],
             "op_kwargs": {
@@ -3123,11 +3123,42 @@ def test_python_callable_in_partial_kwargs():
 
     serialized = SerializedBaseOperator.serialize_mapped_operator(operator)
     assert "python_callable" not in serialized["partial_kwargs"]
-    assert serialized["partial_kwargs"]["python_callable_name"] == 
qualname(empty_function)
+    assert serialized["partial_kwargs"]["python_callable_name"] == 
"empty_function"
 
     deserialized = SerializedBaseOperator.deserialize_operator(serialized)
     assert "python_callable" not in deserialized.partial_kwargs
-    assert deserialized.partial_kwargs["python_callable_name"] == 
qualname(empty_function)
+    assert deserialized.partial_kwargs["python_callable_name"] == 
"empty_function"
+
+
+def test_python_callable_name_uses_qualname_exclude_module():
+    """Test python_callable_name is stable across bundle version changes."""
+    from airflow.providers.standard.operators.python import PythonOperator
+
+    # Module-level function
+    op1 = PythonOperator(task_id="task1", python_callable=empty_function)
+    serialized1 = SerializedBaseOperator.serialize_operator(op1)
+    assert serialized1["python_callable_name"] == "empty_function"
+
+    # Nested function
+    def outer():
+        def inner():
+            pass
+
+        return inner
+
+    inner_func = outer()
+    op2 = PythonOperator(task_id="task2", python_callable=inner_func)
+    serialized2 = SerializedBaseOperator.serialize_operator(op2)
+    assert (
+        serialized2["python_callable_name"]
+        == 
"test_python_callable_name_uses_qualname_exclude_module.<locals>.outer.<locals>.inner"
+    )
+
+    # functools.partial
+    partial_func = functools.partial(empty_function, x=1)
+    op3 = PythonOperator(task_id="task3", python_callable=partial_func)
+    serialized3 = SerializedBaseOperator.serialize_operator(op3)
+    assert serialized3["python_callable_name"] == "empty_function"
 
 
 def test_handle_v1_serdag():
diff --git a/airflow-core/tests/unit/utils/test_module_loading.py 
b/airflow-core/tests/unit/utils/test_module_loading.py
index 1f92a004b8f..c59c5f0c639 100644
--- a/airflow-core/tests/unit/utils/test_module_loading.py
+++ b/airflow-core/tests/unit/utils/test_module_loading.py
@@ -17,9 +17,15 @@
 # under the License.
 from __future__ import annotations
 
+import functools
+
 import pytest
 
-from airflow.utils.module_loading import import_string
+from airflow.utils.module_loading import import_string, qualname
+
+
+def _sample_function():
+    pass
 
 
 class TestModuleImport:
@@ -33,3 +39,65 @@ class TestModuleImport:
         msg = 'Module "airflow.utils" does not define a "nonexistent" 
attribute'
         with pytest.raises(ImportError, match=msg):
             import_string("airflow.utils.nonexistent")
+
+
+class TestQualname:
+    def test_qualname_default_includes_module(self):
+        """Test that qualname() by default includes the module path."""
+        result = qualname(_sample_function)
+        assert result == "module_loading.test_module_loading._sample_function"
+
+    def test_qualname_exclude_module_simple_function(self):
+        """Test that exclude_module=True returns only the function name."""
+        result = qualname(_sample_function, exclude_module=True)
+        assert result == "_sample_function"
+
+    def test_qualname_exclude_module_nested_function(self):
+        """Test that exclude_module=True works with nested functions."""
+
+        def outer():
+            def inner():
+                pass
+
+            return inner
+
+        inner_func = outer()
+        result = qualname(inner_func, exclude_module=True)
+        assert (
+            result
+            == 
"TestQualname.test_qualname_exclude_module_nested_function.<locals>.outer.<locals>.inner"
+        )
+
+    def test_qualname_exclude_module_functools_partial(self):
+        """Test that exclude_module=True handles functools.partial 
correctly."""
+
+        def base_func(x, y):
+            pass
+
+        partial_func = functools.partial(base_func, x=1)
+        result = qualname(partial_func, exclude_module=True)
+        assert result == 
"TestQualname.test_qualname_exclude_module_functools_partial.<locals>.base_func"
+
+    def test_qualname_exclude_module_class(self):
+        """Test that exclude_module=True works with classes."""
+
+        class MyClass:
+            pass
+
+        result = qualname(MyClass, exclude_module=True)
+        assert result == 
"TestQualname.test_qualname_exclude_module_class.<locals>.MyClass"
+
+    def test_qualname_exclude_module_instance(self):
+        """Test that exclude_module=True works with class instances."""
+
+        class MyClass:
+            pass
+
+        instance = MyClass()
+        result = qualname(instance, exclude_module=True)
+        assert result == 
"TestQualname.test_qualname_exclude_module_instance.<locals>.MyClass"
+
+    def test_qualname_use_qualname_still_includes_module(self):
+        """Test that use_qualname=True still includes module prefix."""
+        result = qualname(_sample_function, use_qualname=True)
+        assert result == "module_loading.test_module_loading._sample_function"

Reply via email to