This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca21ef63ab9 Operator template fields via callable serialization causes 
unstable DAG serialization (#60065)
ca21ef63ab9 is described below

commit ca21ef63ab9221b92f696673475215bcc5ec2578
Author: Andrei Leib <[email protected]>
AuthorDate: Wed Jan 7 09:54:46 2026 -0500

    Operator template fields via callable serialization causes unstable DAG 
serialization (#60065)
    
    * op template field via callable serialization fix
    
    * removed duplicate MockOperator import
    
    * template var serialization: use md5 fingerprint
    
    * doc for template field callable serialization logic
    
    * template var callable serialization: use md5 from 
airflow.utils.hashlib_wrapper
    
    * template var callable serialization: use callable fully qualified name
    
    * Apply suggestions from code review
    
    ---------
    
    Co-authored-by: AndreiLeib <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow-core/src/airflow/serialization/helpers.py  | 10 ++++++++-
 .../unit/serialization/test_dag_serialization.py   | 25 ++++++++++++++++++++++
 .../src/airflow_shared/module_loading/__init__.py  | 11 ++++++----
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index 723c113709a..5367865673f 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 import contextlib
 from typing import TYPE_CHECKING, Any
 
+from airflow._shared.module_loading import qualname
 from airflow._shared.secrets_masker import redact
 from airflow.configuration import conf
 from airflow.settings import json
@@ -33,6 +34,9 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
     """
     Return a serializable representation of the templated field.
 
+    If ``templated_field`` is provided via a callable, compute MD5 hash of 
source
+    and return following serialized value: ``<callable fingerprint(MD5) 
hash_value``
+
     If ``templated_field`` contains a class or instance that requires recursive
     templating, store them as strings. Otherwise simply return the field as-is.
     """
@@ -71,7 +75,11 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
         try:
             serialized = template_field.serialize()
         except AttributeError:
-            serialized = str(template_field)
+            if callable(template_field):
+                full_qualified_name = qualname(template_field, True)
+                serialized = f"<callable {full_qualified_name}>"
+            else:
+                serialized = str(template_field)
         if len(serialized) > max_length:
             rendered = redact(serialized, name)
             return (
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index bc9cd305a24..5e054ea2114 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1639,6 +1639,31 @@ class TestStringifiedDAGs:
         assert deserialized_task.resources == task.resources
         assert isinstance(deserialized_task.resources, Resources)
 
+    def test_template_field_via_callable_serialization(self):
+        """
+        Test operator template fields serialization when provided as a 
callable.
+        """
+
+        def fn_template_field_callable(context, jinja_env):
+            pass
+
+        def fn_returns_callable():
+            def get_arg(context, jinja_env):
+                pass
+
+            return get_arg
+
+        task = MockOperator(task_id="task1", arg1=fn_template_field_callable, 
arg2=fn_returns_callable())
+        serialized_task = OperatorSerialization.serialize_operator(task)
+        assert (
+            serialized_task.get("arg1")
+            == "<callable 
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_template_field_callable>"
+        )
+        assert (
+            serialized_task.get("arg2")
+            == "<callable 
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_returns_callable.<locals>.get_arg>"
+        )
+
     def test_task_group_serialization(self):
         """
         Test TaskGroup serialization/deserialization.
diff --git 
a/shared/module_loading/src/airflow_shared/module_loading/__init__.py 
b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
index 3268dc3ddc2..8868cc704ab 100644
--- a/shared/module_loading/src/airflow_shared/module_loading/__init__.py
+++ b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
@@ -63,10 +63,13 @@ def import_string(dotted_path: str):
         raise ImportError(f'Module "{module_path}" does not define a 
"{class_name}" attribute/class')
 
 
-def qualname(o: object | Callable) -> str:
-    """Convert an attribute/class/function to a string importable by 
``import_string``."""
-    if callable(o) and hasattr(o, "__module__") and hasattr(o, "__name__"):
-        return f"{o.__module__}.{o.__name__}"
+def qualname(o: object | Callable, use_qualname: bool = False) -> str:
+    """Convert an attribute/class/callable to a string importable by 
``import_string``."""
+    if callable(o) and hasattr(o, "__module__"):
+        if use_qualname and hasattr(o, "__qualname__"):
+            return f"{o.__module__}.{o.__qualname__}"
+        if hasattr(o, "__name__"):
+            return f"{o.__module__}.{o.__name__}"
 
     cls = o
 

Reply via email to