This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9236ee424f3a08e7086d0e6b58a0bca30f6f1e56 Author: Jarek Potiuk <[email protected]> AuthorDate: Sun Dec 7 23:18:26 2025 +0100 [v3-1-test] fix: Rendered Templates not showing dictionary items in AF3 (#58071) (#59176) * Rendered Templates not showing dictionary items in AF3 * add a unit test * fix test error (cherry picked from commit 4408d68a93be5cd9785f65c3be6ee77647150879) Co-authored-by: GUAN-HAO HUANG <[email protected]> --- .../src/airflow/models/renderedtifields.py | 43 ++++++++++++- .../tests/unit/models/test_renderedtifields.py | 70 ++++++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/renderedtifields.py b/airflow-core/src/airflow/models/renderedtifields.py index b19b7f1f6ed..100da273154 100644 --- a/airflow-core/src/airflow/models/renderedtifields.py +++ b/airflow-core/src/airflow/models/renderedtifields.py @@ -20,7 +20,7 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import sqlalchemy_jsonfield from sqlalchemy import ( @@ -51,6 +51,28 @@ if TYPE_CHECKING: from airflow.serialization.serialized_objects import SerializedBaseOperator +def _get_nested_value(obj: Any, path: str) -> Any: + """ + Get a nested value from an object using a dot-separated path. + + :param obj: The object to extract the value from + :param path: A dot-separated path (e.g., "configuration.query.sql") + :return: The value at the nested path, or None if the path doesn't exist + """ + keys = path.split(".") + current = obj + for key in keys: + if isinstance(current, dict): + current = current.get(key) + elif hasattr(current, key): + current = getattr(current, key) + else: + return None + if current is None: + return None + return current + + def get_serialized_template_fields(task: SerializedBaseOperator): """ Get and serialize the template fields for a task. @@ -61,7 +83,24 @@ def get_serialized_template_fields(task: SerializedBaseOperator): :meta private: """ - return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} + rendered_fields = {} + + for field in task.template_fields: + rendered_fields[field] = serialize_template_field(getattr(task, field), field) + + renderers = getattr(task, "template_fields_renderers", {}) + for renderer_path in renderers: + if "." in renderer_path: + base_field = renderer_path.split(".", 1)[0] + + if base_field in task.template_fields: + base_value = getattr(task, base_field) + nested_value = _get_nested_value(base_value, renderer_path[len(base_field) + 1 :]) + + if nested_value is not None: + rendered_fields[renderer_path] = serialize_template_field(nested_value, renderer_path) + + return rendered_fields class RenderedTaskInstanceFields(TaskInstanceDependencies): diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py b/airflow-core/tests/unit/models/test_renderedtifields.py index 47aa8829aa8..ee9cc3b9b12 100644 --- a/airflow-core/tests/unit/models/test_renderedtifields.py +++ b/airflow-core/tests/unit/models/test_renderedtifields.py @@ -21,6 +21,7 @@ from __future__ import annotations import os from collections import Counter +from collections.abc import Sequence from datetime import date, timedelta from typing import TYPE_CHECKING from unittest import mock @@ -33,6 +34,7 @@ from airflow import settings from airflow._shared.timezones.timezone import datetime from airflow.configuration import conf from airflow.models import DagRun, Variable +from airflow.models.baseoperator import BaseOperator from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.taskmap import TaskMap from airflow.providers.standard.operators.bash import BashOperator @@ -448,3 +450,71 @@ class TestRenderedTaskInstanceFields: # rerun the old run. this will shouldn't fail ti.task = task ti.run() + + def test_nested_dictionary_template_field_rendering(self, dag_maker): + """ + Test that nested dictionary items in template fields are properly rendered + when using template_fields_renderers with dot-separated paths. + + This test verifies the fix for rendering dictionary items in templates. + Before the fix, nested dictionary items specified in template_fields_renderers + (e.g., "configuration.query.sql") would not be rendered. After the fix, + these nested items are properly extracted and rendered. + """ + + # Create a custom operator with a dictionary template field + class MyConfigOperator(BaseOperator): + template_fields: Sequence[str] = ("configuration",) + template_fields_renderers = { + "configuration": "json", + "configuration.query.sql": "sql", + } + + def __init__(self, configuration: dict, **kwargs): + super().__init__(**kwargs) + self.configuration = configuration + + # Create a configuration dictionary with nested structure + configuration = { + "query": { + "job_id": "123", + "sql": "select * from my_table where date = '{{ ds }}'", + } + } + + with dag_maker("test_nested_dict_rendering"): + task = MyConfigOperator(task_id="test_config", configuration=configuration) + dr = dag_maker.create_dagrun() + + session = dag_maker.session + ti = dr.task_instances[0] + ti.task = task + rtif = RTIF(ti=ti) + + # Verify that the base configuration field is rendered + assert "configuration" in rtif.rendered_fields + rendered_config = rtif.rendered_fields["configuration"] + assert isinstance(rendered_config, dict) + assert rendered_config["query"]["job_id"] == "123" + # The SQL should be templated (ds should be replaced with actual date) + assert "select * from my_table where date = '" in rendered_config["query"]["sql"] + assert rendered_config["query"]["sql"] != configuration["query"]["sql"] + + # Verify that the nested dictionary item is also rendered + # This is the key test - before the fix, this would not exist + assert "configuration.query.sql" in rtif.rendered_fields + rendered_sql = rtif.rendered_fields["configuration.query.sql"] + assert isinstance(rendered_sql, str) + assert "select * from my_table where date = '" in rendered_sql + # The template should be rendered (ds should be replaced) + assert "{{ ds }}" not in rendered_sql + + # Store in database and verify retrieval + session.add(rtif) + session.flush() + + retrieved_fields = RTIF.get_templated_fields(ti=ti, session=session) + assert retrieved_fields is not None + assert "configuration" in retrieved_fields + assert "configuration.query.sql" in retrieved_fields + assert retrieved_fields["configuration.query.sql"] == rendered_sql
