This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 2c9067b315d [v3-1-test] fix: Rendered Templates not showing dictionary
items in AF3 (#58071) (#59176)
2c9067b315d is described below
commit 2c9067b315d440eb8f6e39cba9e118ddcd3b4eb4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Dec 7 23:18:26 2025 +0100
[v3-1-test] fix: Rendered Templates not showing dictionary items in AF3
(#58071) (#59176)
* Rendered Templates not showing dictionary items in AF3
* add a unit test
* fix test error
(cherry picked from commit 4408d68a93be5cd9785f65c3be6ee77647150879)
Co-authored-by: GUAN-HAO HUANG <[email protected]>
---
.../src/airflow/models/renderedtifields.py | 43 ++++++++++++-
.../tests/unit/models/test_renderedtifields.py | 70 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/renderedtifields.py
b/airflow-core/src/airflow/models/renderedtifields.py
index b19b7f1f6ed..100da273154 100644
--- a/airflow-core/src/airflow/models/renderedtifields.py
+++ b/airflow-core/src/airflow/models/renderedtifields.py
@@ -20,7 +20,7 @@
from __future__ import annotations
import os
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import sqlalchemy_jsonfield
from sqlalchemy import (
@@ -51,6 +51,28 @@ if TYPE_CHECKING:
from airflow.serialization.serialized_objects import SerializedBaseOperator
+def _get_nested_value(obj: Any, path: str) -> Any:
+ """
+ Get a nested value from an object using a dot-separated path.
+
+ :param obj: The object to extract the value from
+ :param path: A dot-separated path (e.g., "configuration.query.sql")
+ :return: The value at the nested path, or None if the path doesn't exist
+ """
+ keys = path.split(".")
+ current = obj
+ for key in keys:
+ if isinstance(current, dict):
+ current = current.get(key)
+ elif hasattr(current, key):
+ current = getattr(current, key)
+ else:
+ return None
+ if current is None:
+ return None
+ return current
+
+
def get_serialized_template_fields(task: SerializedBaseOperator):
"""
Get and serialize the template fields for a task.
@@ -61,7 +83,24 @@ def get_serialized_template_fields(task:
SerializedBaseOperator):
:meta private:
"""
- return {field: serialize_template_field(getattr(task, field), field) for
field in task.template_fields}
+ rendered_fields = {}
+
+ for field in task.template_fields:
+ rendered_fields[field] = serialize_template_field(getattr(task,
field), field)
+
+ renderers = getattr(task, "template_fields_renderers", {})
+ for renderer_path in renderers:
+ if "." in renderer_path:
+ base_field = renderer_path.split(".", 1)[0]
+
+ if base_field in task.template_fields:
+ base_value = getattr(task, base_field)
+ nested_value = _get_nested_value(base_value,
renderer_path[len(base_field) + 1 :])
+
+ if nested_value is not None:
+ rendered_fields[renderer_path] =
serialize_template_field(nested_value, renderer_path)
+
+ return rendered_fields
class RenderedTaskInstanceFields(TaskInstanceDependencies):
diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py
b/airflow-core/tests/unit/models/test_renderedtifields.py
index 47aa8829aa8..ee9cc3b9b12 100644
--- a/airflow-core/tests/unit/models/test_renderedtifields.py
+++ b/airflow-core/tests/unit/models/test_renderedtifields.py
@@ -21,6 +21,7 @@ from __future__ import annotations
import os
from collections import Counter
+from collections.abc import Sequence
from datetime import date, timedelta
from typing import TYPE_CHECKING
from unittest import mock
@@ -33,6 +34,7 @@ from airflow import settings
from airflow._shared.timezones.timezone import datetime
from airflow.configuration import conf
from airflow.models import DagRun, Variable
+from airflow.models.baseoperator import BaseOperator
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.taskmap import TaskMap
from airflow.providers.standard.operators.bash import BashOperator
@@ -448,3 +450,71 @@ class TestRenderedTaskInstanceFields:
# rerun the old run. this will shouldn't fail
ti.task = task
ti.run()
+
+ def test_nested_dictionary_template_field_rendering(self, dag_maker):
+ """
+ Test that nested dictionary items in template fields are properly
rendered
+ when using template_fields_renderers with dot-separated paths.
+
+ This test verifies the fix for rendering dictionary items in templates.
+ Before the fix, nested dictionary items specified in
template_fields_renderers
+ (e.g., "configuration.query.sql") would not be rendered. After the fix,
+ these nested items are properly extracted and rendered.
+ """
+
+ # Create a custom operator with a dictionary template field
+ class MyConfigOperator(BaseOperator):
+ template_fields: Sequence[str] = ("configuration",)
+ template_fields_renderers = {
+ "configuration": "json",
+ "configuration.query.sql": "sql",
+ }
+
+ def __init__(self, configuration: dict, **kwargs):
+ super().__init__(**kwargs)
+ self.configuration = configuration
+
+ # Create a configuration dictionary with nested structure
+ configuration = {
+ "query": {
+ "job_id": "123",
+ "sql": "select * from my_table where date = '{{ ds }}'",
+ }
+ }
+
+ with dag_maker("test_nested_dict_rendering"):
+ task = MyConfigOperator(task_id="test_config",
configuration=configuration)
+ dr = dag_maker.create_dagrun()
+
+ session = dag_maker.session
+ ti = dr.task_instances[0]
+ ti.task = task
+ rtif = RTIF(ti=ti)
+
+ # Verify that the base configuration field is rendered
+ assert "configuration" in rtif.rendered_fields
+ rendered_config = rtif.rendered_fields["configuration"]
+ assert isinstance(rendered_config, dict)
+ assert rendered_config["query"]["job_id"] == "123"
+ # The SQL should be templated (ds should be replaced with actual date)
+ assert "select * from my_table where date = '" in
rendered_config["query"]["sql"]
+ assert rendered_config["query"]["sql"] != configuration["query"]["sql"]
+
+ # Verify that the nested dictionary item is also rendered
+ # This is the key test - before the fix, this would not exist
+ assert "configuration.query.sql" in rtif.rendered_fields
+ rendered_sql = rtif.rendered_fields["configuration.query.sql"]
+ assert isinstance(rendered_sql, str)
+ assert "select * from my_table where date = '" in rendered_sql
+ # The template should be rendered (ds should be replaced)
+ assert "{{ ds }}" not in rendered_sql
+
+ # Store in database and verify retrieval
+ session.add(rtif)
+ session.flush()
+
+ retrieved_fields = RTIF.get_templated_fields(ti=ti, session=session)
+ assert retrieved_fields is not None
+ assert "configuration" in retrieved_fields
+ assert "configuration.query.sql" in retrieved_fields
+ assert retrieved_fields["configuration.query.sql"] == rendered_sql