This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new e4d791b7fcf [v3-1-test] Operator template fields via callable 
serialization causes unstable DAG serialization (#60065) (#60221)
e4d791b7fcf is described below

commit e4d791b7fcfa7753109c98de5511d2147ddc1d7d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 7 18:55:47 2026 +0100

    [v3-1-test] Operator template fields via callable serialization causes 
unstable DAG serialization (#60065) (#60221)
    
    * [v3-1-test] Operator template fields via callable serialization causes 
unstable DAG serialization (#60065)
    
    * op template field via callable serialization fix
    
    * removed duplicate MockOperator import
    
    * template var serialization: use md5 fingerprint
    
    * doc for template field callable serialization logic
    
    * template var callable serialization: use md5 from 
airflow.utils.hashlib_wrapper
    
    * template var callable serialization: use callable fully qualified name
    
    * Apply suggestions from code review
    
    ---------
    (cherry picked from commit ca21ef63ab9221b92f696673475215bcc5ec2578)
    
    Co-authored-by: Andrei Leib <[email protected]>
    Co-authored-by: AndreiLeib <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    * update import path for qualname
    
    * Use correct class for serialization
    
    ---------
    
    Co-authored-by: Andrei Leib <[email protected]>
    Co-authored-by: AndreiLeib <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow-core/src/airflow/serialization/helpers.py  | 10 ++++++++-
 airflow-core/src/airflow/utils/module_loading.py   | 11 ++++++----
 .../unit/serialization/test_dag_serialization.py   | 25 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index 5f564df6188..f315ff12f6a 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -23,12 +23,16 @@ from typing import Any
 from airflow._shared.secrets_masker import redact
 from airflow.configuration import conf
 from airflow.settings import json
+from airflow.utils.module_loading import qualname
 
 
 def serialize_template_field(template_field: Any, name: str) -> str | dict | 
list | int | float:
     """
     Return a serializable representation of the templated field.
 
+    If ``templated_field`` is provided via a callable, compute MD5 hash of 
source
+    and return following serialized value: ``<callable fingerprint(MD5) 
hash_value``
+
     If ``templated_field`` contains a class or instance that requires recursive
     templating, store them as strings. Otherwise simply return the field as-is.
     """
@@ -67,7 +71,11 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
         try:
             serialized = template_field.serialize()
         except AttributeError:
-            serialized = str(template_field)
+            if callable(template_field):
+                full_qualified_name = qualname(template_field, True)
+                serialized = f"<callable {full_qualified_name}>"
+            else:
+                serialized = str(template_field)
         if len(serialized) > max_length:
             rendered = redact(serialized, name)
             return (
diff --git a/airflow-core/src/airflow/utils/module_loading.py 
b/airflow-core/src/airflow/utils/module_loading.py
index e0ec74bcb1f..50ae287c36d 100644
--- a/airflow-core/src/airflow/utils/module_loading.py
+++ b/airflow-core/src/airflow/utils/module_loading.py
@@ -46,10 +46,13 @@ def import_string(dotted_path: str):
         raise ImportError(f'Module "{module_path}" does not define a 
"{class_name}" attribute/class')
 
 
-def qualname(o: object | Callable) -> str:
-    """Convert an attribute/class/function to a string importable by 
``import_string``."""
-    if callable(o) and hasattr(o, "__module__") and hasattr(o, "__name__"):
-        return f"{o.__module__}.{o.__name__}"
+def qualname(o: object | Callable, use_qualname: bool = False) -> str:
+    """Convert an attribute/class/callable to a string importable by 
``import_string``."""
+    if callable(o) and hasattr(o, "__module__"):
+        if use_qualname and hasattr(o, "__qualname__"):
+            return f"{o.__module__}.{o.__qualname__}"
+        if hasattr(o, "__name__"):
+            return f"{o.__module__}.{o.__name__}"
 
     cls = o
 
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 337d5cd7254..24c0bb46df6 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1593,6 +1593,31 @@ class TestStringifiedDAGs:
         assert deserialized_task.resources == task.resources
         assert isinstance(deserialized_task.resources, Resources)
 
+    def test_template_field_via_callable_serialization(self):
+        """
+        Test operator template fields serialization when provided as a 
callable.
+        """
+
+        def fn_template_field_callable(context, jinja_env):
+            pass
+
+        def fn_returns_callable():
+            def get_arg(context, jinja_env):
+                pass
+
+            return get_arg
+
+        task = MockOperator(task_id="task1", arg1=fn_template_field_callable, 
arg2=fn_returns_callable())
+        serialized_task = SerializedBaseOperator.serialize_operator(task)
+        assert (
+            serialized_task.get("arg1")
+            == "<callable 
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_template_field_callable>"
+        )
+        assert (
+            serialized_task.get("arg2")
+            == "<callable 
unit.serialization.test_dag_serialization.TestStringifiedDAGs.test_template_field_via_callable_serialization.<locals>.fn_returns_callable.<locals>.get_arg>"
+        )
+
     def test_task_group_serialization(self):
         """
         Test TaskGroup serialization/deserialization.

Reply via email to