This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ce2200d04c add operator-level render_template_as_native_obj override 
(#60619)
9ce2200d04c is described below

commit 9ce2200d04ca16541a8189e65c8bf7e79e354eaa
Author: Deji Ibrahim <[email protected]>
AuthorDate: Fri Jan 23 11:02:22 2026 +0100

    add operator-level render_template_as_native_obj override (#60619)
    
    * add operator-level render_template_as_native_obj override
    
    * fix dag serialization
    
    * fix mypy
    
    * update serialized fields
    
    * add significant newsfragement
---
 airflow-core/newsfragments/60619.significant.rst   | 22 +++++++++++
 .../serialization/definitions/baseoperator.py      |  2 +
 airflow-core/src/airflow/serialization/schema.json |  1 +
 .../unit/serialization/test_dag_serialization.py   |  1 +
 task-sdk/src/airflow/sdk/bases/operator.py         |  8 ++++
 .../sdk/definitions/_internal/abstractoperator.py  | 21 +++++++++++
 .../airflow/sdk/definitions/_internal/templater.py | 43 +++++++++++++++++++++-
 task-sdk/src/airflow/sdk/definitions/dag.py        | 34 +++++------------
 .../src/airflow/sdk/definitions/mappedoperator.py  |  8 ++++
 task-sdk/tests/task_sdk/bases/test_operator.py     | 35 ++++++++++++++++++
 .../task_sdk/definitions/test_mappedoperator.py    |  1 +
 11 files changed, 151 insertions(+), 25 deletions(-)

diff --git a/airflow-core/newsfragments/60619.significant.rst 
b/airflow-core/newsfragments/60619.significant.rst
new file mode 100644
index 00000000000..2d975f624bd
--- /dev/null
+++ b/airflow-core/newsfragments/60619.significant.rst
@@ -0,0 +1,22 @@
+Add operator-level ``render_template_as_native_obj`` override
+
+Operators can now override the DAG-level ``render_template_as_native_obj`` 
setting,
+enabling fine-grained control over whether templates are rendered as native 
Python
+types or strings on a per-task basis. Set 
``render_template_as_native_obj=True`` or
+``False`` on any operator to override the DAG setting, or leave as ``None`` 
(default)
+to inherit from the DAG.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
+
+* Migration rules needed
+
+  * None - this is a new optional feature with backwards-compatible defaults
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py 
b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 20cac69435f..8efbc36a9e5 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -121,6 +121,7 @@ class SerializedBaseOperator(DAGNode):
     priority_weight: int = 1
     queue: str = "default"
 
+    render_template_as_native_obj: bool | None = None
     resources: dict[str, Any] | None = None
     retries: int = 0
     retry_delay: datetime.timedelta = datetime.timedelta(seconds=300)
@@ -212,6 +213,7 @@ class SerializedBaseOperator(DAGNode):
                 "pool_slots",
                 "priority_weight",
                 "queue",
+                "render_template_as_native_obj",
                 "resources",
                 "retries",
                 "retry_delay",
diff --git a/airflow-core/src/airflow/serialization/schema.json 
b/airflow-core/src/airflow/serialization/schema.json
index 1ea2509b1df..6058275f35c 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -322,6 +322,7 @@
         "_expand_input_attr": { "type": "string" },
         "map_index_template": { "type": "string" },
         "allow_nested_operators": { "type": "boolean", "default": true },
+        "render_template_as_native_obj": { "anyOf": [{"type": "boolean"}, 
{"type": "null"}], "default": null },
         "inlets": {"type":  "array", "default": []},
         "outlets": {"type":  "array", "default": []},
         "has_on_execute_callback": {"type": "boolean", "default": false},
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 4312a80a6dc..41c74efa2c9 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1569,6 +1569,7 @@ class TestStringifiedDAGs:
             "pool_slots": 1,
             "priority_weight": 1,
             "queue": "default",
+            "render_template_as_native_obj": None,
             "resources": None,
             "retries": 0,
             "retry_delay": timedelta(0, 300),
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py 
b/task-sdk/src/airflow/sdk/bases/operator.py
index 5f07a188362..4b6c9c3f90f 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -842,6 +842,10 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
                     dag=dag,
                 )
                 hello_world_task.execute(context)
+    :param render_template_as_native_obj: If True, uses a Jinja 
``NativeEnvironment``
+        to render templates as native Python types. If False, a Jinja
+        ``Environment`` is used to render templates as string values.
+        If None (default), inherits from the DAG setting.
     """
 
     task_id: str
@@ -898,6 +902,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
     _task_display_name: str | None = None
     logger_name: str | None = None
     allow_nested_operators: bool = True
+    render_template_as_native_obj: bool | None = None
 
     is_setup: bool = False
     is_teardown: bool = False
@@ -1053,6 +1058,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         task_display_name: str | None = None,
         logger_name: str | None = None,
         allow_nested_operators: bool = True,
+        render_template_as_native_obj: bool | None = None,
         **kwargs: Any,
     ):
         # Note: Metaclass handles passing in the Dag/TaskGroup from active 
context manager, if any
@@ -1180,6 +1186,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
         self.allow_nested_operators = allow_nested_operators
 
+        self.render_template_as_native_obj = render_template_as_native_obj
+
         self._logger_name = logger_name
 
         # Lineage
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py 
b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
index e7e5ebe8b9a..e32bd377f01 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
@@ -255,8 +255,29 @@ class AbstractOperator(Templater, DAGNode):
     #   _render
     def get_template_env(self, dag: DAG | None = None) -> jinja2.Environment:
         """Get the template environment for rendering templates."""
+        from airflow.sdk.definitions._internal.templater import 
create_template_env
+
         if dag is None:
             dag = self.get_dag()
+        # Check if the operator has an explicit native rendering preference
+        render_op_template_as_native_obj = getattr(self, 
"render_template_as_native_obj", None)
+        if render_op_template_as_native_obj is not None:
+            if dag:
+                # Use dag's template settings (searchpath, macros, filters, 
etc.)
+                searchpath = [dag.folder]
+                if dag.template_searchpath:
+                    searchpath += dag.template_searchpath
+                return create_template_env(
+                    native=render_op_template_as_native_obj,
+                    searchpath=searchpath,
+                    template_undefined=dag.template_undefined,
+                    jinja_environment_kwargs=dag.jinja_environment_kwargs,
+                    user_defined_macros=dag.user_defined_macros,
+                    user_defined_filters=dag.user_defined_filters,
+                )
+            # No dag context available, use minimal template env
+            return create_template_env(native=render_op_template_as_native_obj)
+        # No operator-level override, delegate to parent class
         return super().get_template_env(dag=dag)
 
     def _render(self, template, context, dag: DAG | None = None):
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py 
b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
index 62821eb4de7..faf97bdd3f8 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
@@ -108,6 +108,14 @@ class Templater:
                                 log.exception("Failed to get source %s", item)
         self.prepare_template()
 
+    def _should_render_native(self, dag: DAG | None = None) -> bool:
+        # Operator explicitly set? Use that value, otherwise inherit from DAG
+        render_op_template_as_native_obj = getattr(self, 
"render_template_as_native_obj", None)
+        if render_op_template_as_native_obj is not None:
+            return render_op_template_as_native_obj
+
+        return dag.render_template_as_native_obj if dag else False
+
     def _do_render_template_fields(
         self,
         parent: Any,
@@ -128,7 +136,7 @@ class Templater:
                 setattr(parent, attr_name, rendered_content)
 
     def _render(self, template, context, dag=None) -> Any:
-        if dag and dag.render_template_as_native_obj:
+        if self._should_render_native(dag):
             return render_template_as_native(template, context)
         return render_template_to_string(template, context)
 
@@ -289,3 +297,36 @@ FILTERS = {
     "ts_nodash": ts_nodash_filter,
     "ts_nodash_with_tz": ts_nodash_with_tz_filter,
 }
+
+
+def create_template_env(
+    *,
+    native: bool = False,
+    searchpath: list[str] | None = None,
+    template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
+    jinja_environment_kwargs: dict | None = None,
+    user_defined_macros: dict | None = None,
+    user_defined_filters: dict | None = None,
+) -> jinja2.Environment:
+    """Create a Jinja2 environment with the given settings."""
+    # Default values (for backward compatibility)
+    jinja_env_options = {
+        "undefined": template_undefined,
+        "extensions": ["jinja2.ext.do"],
+        "cache_size": 0,
+    }
+    if searchpath:
+        jinja_env_options["loader"] = jinja2.FileSystemLoader(searchpath)
+    if jinja_environment_kwargs:
+        jinja_env_options.update(jinja_environment_kwargs)
+
+    env = NativeEnvironment(**jinja_env_options) if native else 
SandboxedEnvironment(**jinja_env_options)
+
+    # Add any user defined items. Safe to edit globals as long as no templates 
are rendered yet.
+    # http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
+    if user_defined_macros:
+        env.globals.update(user_defined_macros)
+    if user_defined_filters:
+        env.filters.update(user_defined_filters)
+
+    return env
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index c01af7c691d..61a74729909 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -775,36 +775,22 @@ class DAG:
 
     def get_template_env(self, *, force_sandboxed: bool = False) -> 
jinja2.Environment:
         """Build a Jinja2 environment."""
-        from airflow.sdk.definitions._internal.templater import 
NativeEnvironment, SandboxedEnvironment
+        from airflow.sdk.definitions._internal.templater import 
create_template_env
 
         # Collect directories to search for template files
         searchpath = [self.folder]
         if self.template_searchpath:
             searchpath += self.template_searchpath
 
-        # Default values (for backward compatibility)
-        jinja_env_options = {
-            "loader": jinja2.FileSystemLoader(searchpath),
-            "undefined": self.template_undefined,
-            "extensions": ["jinja2.ext.do"],
-            "cache_size": 0,
-        }
-        if self.jinja_environment_kwargs:
-            jinja_env_options.update(self.jinja_environment_kwargs)
-        env: jinja2.Environment
-        if self.render_template_as_native_obj and not force_sandboxed:
-            env = NativeEnvironment(**jinja_env_options)
-        else:
-            env = SandboxedEnvironment(**jinja_env_options)
-
-        # Add any user defined items. Safe to edit globals as long as no 
templates are rendered yet.
-        # http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
-        if self.user_defined_macros:
-            env.globals.update(self.user_defined_macros)
-        if self.user_defined_filters:
-            env.filters.update(self.user_defined_filters)
-
-        return env
+        use_native = self.render_template_as_native_obj and not force_sandboxed
+        return create_template_env(
+            native=use_native,
+            searchpath=searchpath,
+            template_undefined=self.template_undefined,
+            jinja_environment_kwargs=self.jinja_environment_kwargs,
+            user_defined_macros=self.user_defined_macros,
+            user_defined_filters=self.user_defined_filters,
+        )
 
     def set_dependency(self, upstream_task_id, downstream_task_id):
         """Set dependency between two tasks that already have been added to 
the Dag using add_task()."""
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py 
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index c4d39f2ffc1..e77d462e6a0 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -694,6 +694,14 @@ class MappedOperator(AbstractOperator):
     def allow_nested_operators(self) -> bool:
         return bool(self.partial_kwargs.get("allow_nested_operators"))
 
+    @property
+    def render_template_as_native_obj(self) -> bool | None:
+        return self.partial_kwargs.get("render_template_as_native_obj")
+
+    @render_template_as_native_obj.setter
+    def render_template_as_native_obj(self, value: bool | None) -> None:
+        self.partial_kwargs["render_template_as_native_obj"] = value
+
     def get_dag(self) -> DAG | None:
         """Implement Operator."""
         return self.dag
diff --git a/task-sdk/tests/task_sdk/bases/test_operator.py 
b/task-sdk/tests/task_sdk/bases/test_operator.py
index 85b127dfce6..9e6db88d5cf 100644
--- a/task-sdk/tests/task_sdk/bases/test_operator.py
+++ b/task-sdk/tests/task_sdk/bases/test_operator.py
@@ -672,6 +672,41 @@ class TestBaseOperator:
         result = task.render_template(content, context)
         assert result == expected_output
 
+    @pytest.mark.parametrize(
+        ("dag_native", "op_native", "content", "context", "expected_result", 
"expected_type"),
+        [
+            # Operator overrides DAG
+            (False, True, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1", 
"bar2"], list),
+            (True, False, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1', 
'bar2']", str),
+            # Operator inherits from DAG (None = inherit)
+            (True, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1", 
"bar2"], list),
+            (False, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1', 
'bar2']", str),
+            # No DAG context
+            (None, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1', 
'bar2']", str),
+            (None, True, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1", 
"bar2"], list),
+            # Native rendering preserves various types
+            (None, True, "{{ foo }}", {"foo": 42}, 42, int),
+            (None, True, "{{ foo }}", {"foo": {"key": "value"}}, {"key": 
"value"}, dict),
+            (None, True, "{{ foo }}", {"foo": True}, True, bool),
+            (None, True, "{{ ds }}", {"ds": date(2018, 12, 6)}, date(2018, 12, 
6), date),
+        ],
+    )
+    def test_operator_render_template_as_native_obj(
+        self, dag_native, op_native, content, context, expected_result, 
expected_type
+    ):
+        """Test operator render_template_as_native_obj overrides DAG settings 
and preserves types."""
+        if dag_native is not None:
+            with DAG(
+                "test-dag", schedule=None, start_date=DEFAULT_DATE, 
render_template_as_native_obj=dag_native
+            ):
+                task = BaseOperator(task_id="op1", 
render_template_as_native_obj=op_native)
+        else:
+            task = BaseOperator(task_id="op1", 
render_template_as_native_obj=op_native)
+
+        result = task.render_template(content, context)
+        assert result == expected_result
+        assert isinstance(result, expected_type)
+
     def test_render_template_fields(self):
         """Verify if operator attributes are correctly templated."""
         task = MockOperator(task_id="op1", arg1="{{ foo }}", arg2="{{ bar }}")
diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py 
b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
index 85b6e9c0b27..2b34fac6ea0 100644
--- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
+++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
@@ -780,6 +780,7 @@ def test_mapped_xcom_push_skipped_tasks(create_runtime_ti, 
mock_supervisor_comms
         ("on_skipped_callback", [], [id]),
         ("inlets", ["a"], ["b"]),
         ("outlets", ["a"], ["b"]),
+        ("render_template_as_native_obj", True, False),
     ],
 )
 def test_setters(setter_name: str, old_value: object, new_value: object) -> 
None:

Reply via email to