This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new e4d791b7fcf [v3-1-test] Operator template fields via callable
serialization causes unstable DAG serialization (#60065) (#60221)
e4d791b7fcf is described below
commit e4d791b7fcfa7753109c98de5511d2147ddc1d7d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 7 18:55:47 2026 +0100
[v3-1-test] Operator template fields via callable serialization causes
unstable DAG serialization (#60065) (#60221)
* [v3-1-test] Operator template fields via callable serialization causes
unstable DAG serialization (#60065)
* op template field via callable serialization fix
* removed duplicate MockOperator import
* template var serialization: use md5 fingerprint
* doc for template field callable serialization logic
* template var callable serialization: use md5 from
airflow.utils.hashlib_wrapper
* template var callable serialization: use callable fully qualified name
* Apply suggestions from code review
---------
(cherry picked from commit ca21ef63ab9221b92f696673475215bcc5ec2578)
Co-authored-by: Andrei Leib <[email protected]>
Co-authored-by: AndreiLeib <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
* update import path for qualname
* Use correct class for serialization
---------
Co-authored-by: Andrei Leib <[email protected]>
Co-authored-by: AndreiLeib <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
---
airflow-core/src/airflow/serialization/helpers.py | 10 ++++++++-
airflow-core/src/airflow/utils/module_loading.py | 11 ++++++----
.../unit/serialization/test_dag_serialization.py | 25 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/helpers.py
b/airflow-core/src/airflow/serialization/helpers.py
index 5f564df6188..f315ff12f6a 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -23,12 +23,16 @@ from typing import Any
from airflow._shared.secrets_masker import redact
from airflow.configuration import conf
from airflow.settings import json
+from airflow.utils.module_loading import qualname
def serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float:
"""
Return a serializable representation of the templated field.
+ If ``templated_field`` is provided via a callable, compute MD5 hash of
source
+ and return following serialized value: ``<callable fingerprint(MD5)
hash_value``
+
If ``templated_field`` contains a class or instance that requires recursive
templating, store them as strings. Otherwise simply return the field as-is.
"""
@@ -67,7 +71,11 @@ def serialize_template_field(template_field: Any, name: str)
-> str | dict | lis
try:
serialized = template_field.serialize()
except AttributeError:
- serialized = str(template_field)
+ if callable(template_field):
+ full_qualified_name = qualname(template_field, True)
+ serialized = f"<callable {full_qualified_name}>"
+ else:
+ serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
return (
diff --git a/airflow-core/src/airflow/utils/module_loading.py
b/airflow-core/src/airflow/utils/module_loading.py
index e0ec74bcb1f..50ae287c36d 100644
--- a/airflow-core/src/airflow/utils/module_loading.py
+++ b/airflow-core/src/airflow/utils/module_loading.py
@@ -46,10 +46,13 @@ def import_string(dotted_path: str):
raise ImportError(f'Module "{module_path}" does not define a
"{class_name}" attribute/class')
-def qualname(o: object | Callable) -> str:
- """Convert an attribute/class/function to a string importable by
``import_string``."""
- if callable(o) and hasattr(o, "__module__") and hasattr(o, "__name__"):
- return f"{o.__module__}.{o.__name__}"
+def qualname(o: object | Callable, use_qualname: bool = False) -> str:
+ """Convert an attribute/class/callable to a string importable by
``import_string``."""
+ if callable(o) and hasattr(o, "__module__"):
+ if use_qualname and hasattr(o, "__qualname__"):
+ return f"{o.__module__}.{o.__qualname__}"
+ if hasattr(o, "__name__"):
+ return f"{o.__module__}.{o.__name__}"
cls = o
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 337d5cd7254..24c0bb46df6 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1593,6 +1593,31 @@ class TestStringifiedDAGs:
assert deserialized_task.resources == task.resources
assert isinstance(deserialized_task.resources, Resources)
+ def test_template_field_via_callable_serialization(self):
+ """
+ Test operator template fields serialization when provided as a
callable.
+ """
+
+ def fn_template_field_callable(context, jinja_env):
+ pass
+
+ def fn_returns_callable():
+ def get_arg(context, jinja_env):
+ pass
+
+ return get_arg
+
+ task = MockOperator(task_id="task1", arg1=fn_template_field_callable,
arg2=fn_returns_callable())
+ serialized_task = SerializedBaseOperator.serialize_operator(task)
+ assert (
+ serialized_task.get("arg1")
+ == "<callable
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_template_field_callable>"
+ )
+ assert (
+ serialized_task.get("arg2")
+ == "<callable
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_returns_callable.<locals>.get_arg>"
+ )
+
def test_task_group_serialization(self):
"""
Test TaskGroup serialization/deserialization.