This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9236ee424f3a08e7086d0e6b58a0bca30f6f1e56
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Dec 7 23:18:26 2025 +0100

    [v3-1-test] fix: Rendered Templates not showing dictionary items in AF3 
(#58071) (#59176)
    
    * Rendered Templates not showing dictionary items in AF3
    
    * add a unit test
    
    * fix test error
    (cherry picked from commit 4408d68a93be5cd9785f65c3be6ee77647150879)
    
    Co-authored-by: GUAN-HAO HUANG <[email protected]>
---
 .../src/airflow/models/renderedtifields.py         | 43 ++++++++++++-
 .../tests/unit/models/test_renderedtifields.py     | 70 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/renderedtifields.py 
b/airflow-core/src/airflow/models/renderedtifields.py
index b19b7f1f6ed..100da273154 100644
--- a/airflow-core/src/airflow/models/renderedtifields.py
+++ b/airflow-core/src/airflow/models/renderedtifields.py
@@ -20,7 +20,7 @@
 from __future__ import annotations
 
 import os
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import sqlalchemy_jsonfield
 from sqlalchemy import (
@@ -51,6 +51,28 @@ if TYPE_CHECKING:
     from airflow.serialization.serialized_objects import SerializedBaseOperator
 
 
+def _get_nested_value(obj: Any, path: str) -> Any:
+    """
+    Get a nested value from an object using a dot-separated path.
+
+    :param obj: The object to extract the value from
+    :param path: A dot-separated path (e.g., "configuration.query.sql")
+    :return: The value at the nested path, or None if the path doesn't exist
+    """
+    keys = path.split(".")
+    current = obj
+    for key in keys:
+        if isinstance(current, dict):
+            current = current.get(key)
+        elif hasattr(current, key):
+            current = getattr(current, key)
+        else:
+            return None
+        if current is None:
+            return None
+    return current
+
+
 def get_serialized_template_fields(task: SerializedBaseOperator):
     """
     Get and serialize the template fields for a task.
@@ -61,7 +83,24 @@ def get_serialized_template_fields(task: 
SerializedBaseOperator):
 
     :meta private:
     """
-    return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}
+    rendered_fields = {}
+
+    for field in task.template_fields:
+        rendered_fields[field] = serialize_template_field(getattr(task, 
field), field)
+
+    renderers = getattr(task, "template_fields_renderers", {})
+    for renderer_path in renderers:
+        if "." in renderer_path:
+            base_field = renderer_path.split(".", 1)[0]
+
+            if base_field in task.template_fields:
+                base_value = getattr(task, base_field)
+                nested_value = _get_nested_value(base_value, 
renderer_path[len(base_field) + 1 :])
+
+                if nested_value is not None:
+                    rendered_fields[renderer_path] = 
serialize_template_field(nested_value, renderer_path)
+
+    return rendered_fields
 
 
 class RenderedTaskInstanceFields(TaskInstanceDependencies):
diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py 
b/airflow-core/tests/unit/models/test_renderedtifields.py
index 47aa8829aa8..ee9cc3b9b12 100644
--- a/airflow-core/tests/unit/models/test_renderedtifields.py
+++ b/airflow-core/tests/unit/models/test_renderedtifields.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 
 import os
 from collections import Counter
+from collections.abc import Sequence
 from datetime import date, timedelta
 from typing import TYPE_CHECKING
 from unittest import mock
@@ -33,6 +34,7 @@ from airflow import settings
 from airflow._shared.timezones.timezone import datetime
 from airflow.configuration import conf
 from airflow.models import DagRun, Variable
+from airflow.models.baseoperator import BaseOperator
 from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
 from airflow.models.taskmap import TaskMap
 from airflow.providers.standard.operators.bash import BashOperator
@@ -448,3 +450,71 @@ class TestRenderedTaskInstanceFields:
         # rerun the old run. this will shouldn't fail
         ti.task = task
         ti.run()
+
+    def test_nested_dictionary_template_field_rendering(self, dag_maker):
+        """
+        Test that nested dictionary items in template fields are properly 
rendered
+        when using template_fields_renderers with dot-separated paths.
+
+        This test verifies the fix for rendering dictionary items in templates.
+        Before the fix, nested dictionary items specified in 
template_fields_renderers
+        (e.g., "configuration.query.sql") would not be rendered. After the fix,
+        these nested items are properly extracted and rendered.
+        """
+
+        # Create a custom operator with a dictionary template field
+        class MyConfigOperator(BaseOperator):
+            template_fields: Sequence[str] = ("configuration",)
+            template_fields_renderers = {
+                "configuration": "json",
+                "configuration.query.sql": "sql",
+            }
+
+            def __init__(self, configuration: dict, **kwargs):
+                super().__init__(**kwargs)
+                self.configuration = configuration
+
+        # Create a configuration dictionary with nested structure
+        configuration = {
+            "query": {
+                "job_id": "123",
+                "sql": "select * from my_table where date = '{{ ds }}'",
+            }
+        }
+
+        with dag_maker("test_nested_dict_rendering"):
+            task = MyConfigOperator(task_id="test_config", 
configuration=configuration)
+        dr = dag_maker.create_dagrun()
+
+        session = dag_maker.session
+        ti = dr.task_instances[0]
+        ti.task = task
+        rtif = RTIF(ti=ti)
+
+        # Verify that the base configuration field is rendered
+        assert "configuration" in rtif.rendered_fields
+        rendered_config = rtif.rendered_fields["configuration"]
+        assert isinstance(rendered_config, dict)
+        assert rendered_config["query"]["job_id"] == "123"
+        # The SQL should be templated (ds should be replaced with actual date)
+        assert "select * from my_table where date = '" in 
rendered_config["query"]["sql"]
+        assert rendered_config["query"]["sql"] != configuration["query"]["sql"]
+
+        # Verify that the nested dictionary item is also rendered
+        # This is the key test - before the fix, this would not exist
+        assert "configuration.query.sql" in rtif.rendered_fields
+        rendered_sql = rtif.rendered_fields["configuration.query.sql"]
+        assert isinstance(rendered_sql, str)
+        assert "select * from my_table where date = '" in rendered_sql
+        # The template should be rendered (ds should be replaced)
+        assert "{{ ds }}" not in rendered_sql
+
+        # Store in database and verify retrieval
+        session.add(rtif)
+        session.flush()
+
+        retrieved_fields = RTIF.get_templated_fields(ti=ti, session=session)
+        assert retrieved_fields is not None
+        assert "configuration" in retrieved_fields
+        assert "configuration.query.sql" in retrieved_fields
+        assert retrieved_fields["configuration.query.sql"] == rendered_sql

Reply via email to