This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2a85aca306ebdc549fe2fa0251b7ae2abe9d18c9 Author: Kaxil Naik <[email protected]> AuthorDate: Tue Jan 20 13:23:08 2026 +0000 Fix unnecessary DAG version churn when DAG file paths change (#60799) (cherry picked from commit fbb200e4151a5ea842a88f333f6b337d47ee431f) --- .../airflow/serialization/serialized_objects.py | 14 +++-- airflow-core/src/airflow/utils/module_loading.py | 23 ++++++- .../unit/serialization/test_dag_serialization.py | 41 +++++++++++-- .../tests/unit/utils/test_module_loading.py | 70 +++++++++++++++++++++- 4 files changed, 134 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 62259d131f0..47226751e5b 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1471,11 +1471,13 @@ class SerializedBaseOperator(DAGNode, BaseSerialization): continue serialized_op["partial_kwargs"].update({k: cls.serialize(v)}) - # we want to store python_callable_name, not python_callable + # Store python_callable_name instead of python_callable. + # exclude_module=True ensures stable names across bundle version changes. python_callable = op.partial_kwargs.get("python_callable", None) if python_callable: - callable_name = qualname(python_callable) - serialized_op["partial_kwargs"]["python_callable_name"] = callable_name + serialized_op["partial_kwargs"]["python_callable_name"] = qualname( + python_callable, exclude_module=True + ) del serialized_op["partial_kwargs"]["python_callable"] serialized_op["_is_mapped"] = True @@ -1496,11 +1498,11 @@ class SerializedBaseOperator(DAGNode, BaseSerialization): if attr in serialize_op: del serialize_op[attr] - # Detect if there's a change in python callable name + # Store python_callable_name for change detection. + # exclude_module=True ensures stable names across bundle version changes. python_callable = getattr(op, "python_callable", None) if python_callable: - callable_name = qualname(python_callable) - serialize_op["python_callable_name"] = callable_name + serialize_op["python_callable_name"] = qualname(python_callable, exclude_module=True) serialize_op["task_type"] = getattr(op, "task_type", type(op).__name__) serialize_op["_task_module"] = getattr(op, "_task_module", type(op).__module__) diff --git a/airflow-core/src/airflow/utils/module_loading.py b/airflow-core/src/airflow/utils/module_loading.py index 50ae287c36d..71fc8126167 100644 --- a/airflow-core/src/airflow/utils/module_loading.py +++ b/airflow-core/src/airflow/utils/module_loading.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import functools import pkgutil from collections.abc import Callable from importlib import import_module @@ -46,9 +47,24 @@ def import_string(dotted_path: str): raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') -def qualname(o: object | Callable, use_qualname: bool = False) -> str: - """Convert an attribute/class/callable to a string importable by ``import_string``.""" +def qualname(o: object | Callable, use_qualname: bool = False, exclude_module: bool = False) -> str: + """ + Convert an attribute/class/callable to a string. + + By default, returns a string importable by ``import_string`` (includes module path). + With exclude_module=True, returns only the qualified name without module prefix, + useful for stable identification across deployments where module paths may vary. + """ if callable(o) and hasattr(o, "__module__"): + if exclude_module: + if hasattr(o, "__qualname__"): + return o.__qualname__ + if hasattr(o, "__name__"): + return o.__name__ + # Handle functools.partial objects specifically (not just any object with 'func' attr) + if isinstance(o, functools.partial): + return qualname(o.func, exclude_module=True) + return type(o).__qualname__ if use_qualname and hasattr(o, "__qualname__"): return f"{o.__module__}.{o.__qualname__}" if hasattr(o, "__name__"): @@ -62,6 +78,9 @@ def qualname(o: object | Callable, use_qualname: bool = False) -> str: name = cls.__qualname__ module = cls.__module__ + if exclude_module: + return name + if module and module != "__builtin__": return f"{module}.{name}" diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 24c0bb46df6..cae004ac8bf 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -22,6 +22,7 @@ from __future__ import annotations import contextlib import copy import dataclasses +import functools import importlib import importlib.util import json @@ -80,7 +81,6 @@ from airflow.task.priority_strategy import _AbsolutePriorityWeightStrategy, _Dow from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep from airflow.timetables.simple import NullTimetable, OnceTimetable from airflow.triggers.base import StartTriggerArgs -from airflow.utils.module_loading import qualname from tests_common.test_utils.config import conf_vars from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker, skip_if_not_on_main @@ -2885,7 +2885,7 @@ def test_taskflow_expand_serde(): }, "_disallow_kwargs_override": False, "_expand_input_attr": "op_kwargs_expand_input", - "python_callable_name": qualname(x), + "python_callable_name": "test_taskflow_expand_serde.<locals>.x", } deserialized = BaseSerialization.deserialize(serialized) @@ -2952,7 +2952,7 @@ def test_taskflow_expand_kwargs_serde(strict): "_task_module": "airflow.providers.standard.decorators.python", "task_type": "_PythonDecoratedOperator", "_operator_name": "@task", - "python_callable_name": qualname(x), + "python_callable_name": "test_taskflow_expand_kwargs_serde.<locals>.x", "partial_kwargs": { "op_args": [], "op_kwargs": { @@ -3123,11 +3123,42 @@ def test_python_callable_in_partial_kwargs(): serialized = SerializedBaseOperator.serialize_mapped_operator(operator) assert "python_callable" not in serialized["partial_kwargs"] - assert serialized["partial_kwargs"]["python_callable_name"] == qualname(empty_function) + assert serialized["partial_kwargs"]["python_callable_name"] == "empty_function" deserialized = SerializedBaseOperator.deserialize_operator(serialized) assert "python_callable" not in deserialized.partial_kwargs - assert deserialized.partial_kwargs["python_callable_name"] == qualname(empty_function) + assert deserialized.partial_kwargs["python_callable_name"] == "empty_function" + + +def test_python_callable_name_uses_qualname_exclude_module(): + """Test python_callable_name is stable across bundle version changes.""" + from airflow.providers.standard.operators.python import PythonOperator + + # Module-level function + op1 = PythonOperator(task_id="task1", python_callable=empty_function) + serialized1 = SerializedBaseOperator.serialize_operator(op1) + assert serialized1["python_callable_name"] == "empty_function" + + # Nested function + def outer(): + def inner(): + pass + + return inner + + inner_func = outer() + op2 = PythonOperator(task_id="task2", python_callable=inner_func) + serialized2 = SerializedBaseOperator.serialize_operator(op2) + assert ( + serialized2["python_callable_name"] + == "test_python_callable_name_uses_qualname_exclude_module.<locals>.outer.<locals>.inner" + ) + + # functools.partial + partial_func = functools.partial(empty_function, x=1) + op3 = PythonOperator(task_id="task3", python_callable=partial_func) + serialized3 = SerializedBaseOperator.serialize_operator(op3) + assert serialized3["python_callable_name"] == "empty_function" def test_handle_v1_serdag(): diff --git a/airflow-core/tests/unit/utils/test_module_loading.py b/airflow-core/tests/unit/utils/test_module_loading.py index 1f92a004b8f..c59c5f0c639 100644 --- a/airflow-core/tests/unit/utils/test_module_loading.py +++ b/airflow-core/tests/unit/utils/test_module_loading.py @@ -17,9 +17,15 @@ # under the License. from __future__ import annotations +import functools + import pytest -from airflow.utils.module_loading import import_string +from airflow.utils.module_loading import import_string, qualname + + +def _sample_function(): + pass class TestModuleImport: @@ -33,3 +39,65 @@ class TestModuleImport: msg = 'Module "airflow.utils" does not define a "nonexistent" attribute' with pytest.raises(ImportError, match=msg): import_string("airflow.utils.nonexistent") + + +class TestQualname: + def test_qualname_default_includes_module(self): + """Test that qualname() by default includes the module path.""" + result = qualname(_sample_function) + assert result == "module_loading.test_module_loading._sample_function" + + def test_qualname_exclude_module_simple_function(self): + """Test that exclude_module=True returns only the function name.""" + result = qualname(_sample_function, exclude_module=True) + assert result == "_sample_function" + + def test_qualname_exclude_module_nested_function(self): + """Test that exclude_module=True works with nested functions.""" + + def outer(): + def inner(): + pass + + return inner + + inner_func = outer() + result = qualname(inner_func, exclude_module=True) + assert ( + result + == "TestQualname.test_qualname_exclude_module_nested_function.<locals>.outer.<locals>.inner" + ) + + def test_qualname_exclude_module_functools_partial(self): + """Test that exclude_module=True handles functools.partial correctly.""" + + def base_func(x, y): + pass + + partial_func = functools.partial(base_func, x=1) + result = qualname(partial_func, exclude_module=True) + assert result == "TestQualname.test_qualname_exclude_module_functools_partial.<locals>.base_func" + + def test_qualname_exclude_module_class(self): + """Test that exclude_module=True works with classes.""" + + class MyClass: + pass + + result = qualname(MyClass, exclude_module=True) + assert result == "TestQualname.test_qualname_exclude_module_class.<locals>.MyClass" + + def test_qualname_exclude_module_instance(self): + """Test that exclude_module=True works with class instances.""" + + class MyClass: + pass + + instance = MyClass() + result = qualname(instance, exclude_module=True) + assert result == "TestQualname.test_qualname_exclude_module_instance.<locals>.MyClass" + + def test_qualname_use_qualname_still_includes_module(self): + """Test that use_qualname=True still includes module prefix.""" + result = qualname(_sample_function, use_qualname=True) + assert result == "module_loading.test_module_loading._sample_function"
