This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ce2200d04c add operator-level render_template_as_native_obj override
(#60619)
9ce2200d04c is described below
commit 9ce2200d04ca16541a8189e65c8bf7e79e354eaa
Author: Deji Ibrahim <[email protected]>
AuthorDate: Fri Jan 23 11:02:22 2026 +0100
add operator-level render_template_as_native_obj override (#60619)
* add operator-level render_template_as_native_obj override
* fix dag serialization
* fix mypy
* update serialized fields
* add significant newsfragement
---
airflow-core/newsfragments/60619.significant.rst | 22 +++++++++++
.../serialization/definitions/baseoperator.py | 2 +
airflow-core/src/airflow/serialization/schema.json | 1 +
.../unit/serialization/test_dag_serialization.py | 1 +
task-sdk/src/airflow/sdk/bases/operator.py | 8 ++++
.../sdk/definitions/_internal/abstractoperator.py | 21 +++++++++++
.../airflow/sdk/definitions/_internal/templater.py | 43 +++++++++++++++++++++-
task-sdk/src/airflow/sdk/definitions/dag.py | 34 +++++------------
.../src/airflow/sdk/definitions/mappedoperator.py | 8 ++++
task-sdk/tests/task_sdk/bases/test_operator.py | 35 ++++++++++++++++++
.../task_sdk/definitions/test_mappedoperator.py | 1 +
11 files changed, 151 insertions(+), 25 deletions(-)
diff --git a/airflow-core/newsfragments/60619.significant.rst
b/airflow-core/newsfragments/60619.significant.rst
new file mode 100644
index 00000000000..2d975f624bd
--- /dev/null
+++ b/airflow-core/newsfragments/60619.significant.rst
@@ -0,0 +1,22 @@
+Add operator-level ``render_template_as_native_obj`` override
+
+Operators can now override the DAG-level ``render_template_as_native_obj``
setting,
+enabling fine-grained control over whether templates are rendered as native
Python
+types or strings on a per-task basis. Set
``render_template_as_native_obj=True`` or
+``False`` on any operator to override the DAG setting, or leave as ``None``
(default)
+to inherit from the DAG.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
+
+* Migration rules needed
+
+ * None - this is a new optional feature with backwards-compatible defaults
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 20cac69435f..8efbc36a9e5 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -121,6 +121,7 @@ class SerializedBaseOperator(DAGNode):
priority_weight: int = 1
queue: str = "default"
+ render_template_as_native_obj: bool | None = None
resources: dict[str, Any] | None = None
retries: int = 0
retry_delay: datetime.timedelta = datetime.timedelta(seconds=300)
@@ -212,6 +213,7 @@ class SerializedBaseOperator(DAGNode):
"pool_slots",
"priority_weight",
"queue",
+ "render_template_as_native_obj",
"resources",
"retries",
"retry_delay",
diff --git a/airflow-core/src/airflow/serialization/schema.json
b/airflow-core/src/airflow/serialization/schema.json
index 1ea2509b1df..6058275f35c 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -322,6 +322,7 @@
"_expand_input_attr": { "type": "string" },
"map_index_template": { "type": "string" },
"allow_nested_operators": { "type": "boolean", "default": true },
+ "render_template_as_native_obj": { "anyOf": [{"type": "boolean"},
{"type": "null"}], "default": null },
"inlets": {"type": "array", "default": []},
"outlets": {"type": "array", "default": []},
"has_on_execute_callback": {"type": "boolean", "default": false},
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 4312a80a6dc..41c74efa2c9 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1569,6 +1569,7 @@ class TestStringifiedDAGs:
"pool_slots": 1,
"priority_weight": 1,
"queue": "default",
+ "render_template_as_native_obj": None,
"resources": None,
"retries": 0,
"retry_delay": timedelta(0, 300),
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py
b/task-sdk/src/airflow/sdk/bases/operator.py
index 5f07a188362..4b6c9c3f90f 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -842,6 +842,10 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
dag=dag,
)
hello_world_task.execute(context)
+ :param render_template_as_native_obj: If True, uses a Jinja
``NativeEnvironment``
+ to render templates as native Python types. If False, a Jinja
+ ``Environment`` is used to render templates as string values.
+ If None (default), inherits from the DAG setting.
"""
task_id: str
@@ -898,6 +902,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
_task_display_name: str | None = None
logger_name: str | None = None
allow_nested_operators: bool = True
+ render_template_as_native_obj: bool | None = None
is_setup: bool = False
is_teardown: bool = False
@@ -1053,6 +1058,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
task_display_name: str | None = None,
logger_name: str | None = None,
allow_nested_operators: bool = True,
+ render_template_as_native_obj: bool | None = None,
**kwargs: Any,
):
# Note: Metaclass handles passing in the Dag/TaskGroup from active
context manager, if any
@@ -1180,6 +1186,8 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
self.allow_nested_operators = allow_nested_operators
+ self.render_template_as_native_obj = render_template_as_native_obj
+
self._logger_name = logger_name
# Lineage
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
index e7e5ebe8b9a..e32bd377f01 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
@@ -255,8 +255,29 @@ class AbstractOperator(Templater, DAGNode):
# _render
def get_template_env(self, dag: DAG | None = None) -> jinja2.Environment:
"""Get the template environment for rendering templates."""
+ from airflow.sdk.definitions._internal.templater import
create_template_env
+
if dag is None:
dag = self.get_dag()
+ # Check if the operator has an explicit native rendering preference
+ render_op_template_as_native_obj = getattr(self,
"render_template_as_native_obj", None)
+ if render_op_template_as_native_obj is not None:
+ if dag:
+ # Use dag's template settings (searchpath, macros, filters,
etc.)
+ searchpath = [dag.folder]
+ if dag.template_searchpath:
+ searchpath += dag.template_searchpath
+ return create_template_env(
+ native=render_op_template_as_native_obj,
+ searchpath=searchpath,
+ template_undefined=dag.template_undefined,
+ jinja_environment_kwargs=dag.jinja_environment_kwargs,
+ user_defined_macros=dag.user_defined_macros,
+ user_defined_filters=dag.user_defined_filters,
+ )
+ # No dag context available, use minimal template env
+ return create_template_env(native=render_op_template_as_native_obj)
+ # No operator-level override, delegate to parent class
return super().get_template_env(dag=dag)
def _render(self, template, context, dag: DAG | None = None):
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
index 62821eb4de7..faf97bdd3f8 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py
@@ -108,6 +108,14 @@ class Templater:
log.exception("Failed to get source %s", item)
self.prepare_template()
+ def _should_render_native(self, dag: DAG | None = None) -> bool:
+ # Operator explicitly set? Use that value, otherwise inherit from DAG
+ render_op_template_as_native_obj = getattr(self,
"render_template_as_native_obj", None)
+ if render_op_template_as_native_obj is not None:
+ return render_op_template_as_native_obj
+
+ return dag.render_template_as_native_obj if dag else False
+
def _do_render_template_fields(
self,
parent: Any,
@@ -128,7 +136,7 @@ class Templater:
setattr(parent, attr_name, rendered_content)
def _render(self, template, context, dag=None) -> Any:
- if dag and dag.render_template_as_native_obj:
+ if self._should_render_native(dag):
return render_template_as_native(template, context)
return render_template_to_string(template, context)
@@ -289,3 +297,36 @@ FILTERS = {
"ts_nodash": ts_nodash_filter,
"ts_nodash_with_tz": ts_nodash_with_tz_filter,
}
+
+
+def create_template_env(
+ *,
+ native: bool = False,
+ searchpath: list[str] | None = None,
+ template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
+ jinja_environment_kwargs: dict | None = None,
+ user_defined_macros: dict | None = None,
+ user_defined_filters: dict | None = None,
+) -> jinja2.Environment:
+ """Create a Jinja2 environment with the given settings."""
+ # Default values (for backward compatibility)
+ jinja_env_options = {
+ "undefined": template_undefined,
+ "extensions": ["jinja2.ext.do"],
+ "cache_size": 0,
+ }
+ if searchpath:
+ jinja_env_options["loader"] = jinja2.FileSystemLoader(searchpath)
+ if jinja_environment_kwargs:
+ jinja_env_options.update(jinja_environment_kwargs)
+
+ env = NativeEnvironment(**jinja_env_options) if native else
SandboxedEnvironment(**jinja_env_options)
+
+ # Add any user defined items. Safe to edit globals as long as no templates
are rendered yet.
+ # http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
+ if user_defined_macros:
+ env.globals.update(user_defined_macros)
+ if user_defined_filters:
+ env.filters.update(user_defined_filters)
+
+ return env
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index c01af7c691d..61a74729909 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -775,36 +775,22 @@ class DAG:
def get_template_env(self, *, force_sandboxed: bool = False) ->
jinja2.Environment:
"""Build a Jinja2 environment."""
- from airflow.sdk.definitions._internal.templater import
NativeEnvironment, SandboxedEnvironment
+ from airflow.sdk.definitions._internal.templater import
create_template_env
# Collect directories to search for template files
searchpath = [self.folder]
if self.template_searchpath:
searchpath += self.template_searchpath
- # Default values (for backward compatibility)
- jinja_env_options = {
- "loader": jinja2.FileSystemLoader(searchpath),
- "undefined": self.template_undefined,
- "extensions": ["jinja2.ext.do"],
- "cache_size": 0,
- }
- if self.jinja_environment_kwargs:
- jinja_env_options.update(self.jinja_environment_kwargs)
- env: jinja2.Environment
- if self.render_template_as_native_obj and not force_sandboxed:
- env = NativeEnvironment(**jinja_env_options)
- else:
- env = SandboxedEnvironment(**jinja_env_options)
-
- # Add any user defined items. Safe to edit globals as long as no
templates are rendered yet.
- # http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
- if self.user_defined_macros:
- env.globals.update(self.user_defined_macros)
- if self.user_defined_filters:
- env.filters.update(self.user_defined_filters)
-
- return env
+ use_native = self.render_template_as_native_obj and not force_sandboxed
+ return create_template_env(
+ native=use_native,
+ searchpath=searchpath,
+ template_undefined=self.template_undefined,
+ jinja_environment_kwargs=self.jinja_environment_kwargs,
+ user_defined_macros=self.user_defined_macros,
+ user_defined_filters=self.user_defined_filters,
+ )
def set_dependency(self, upstream_task_id, downstream_task_id):
"""Set dependency between two tasks that already have been added to
the Dag using add_task()."""
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index c4d39f2ffc1..e77d462e6a0 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -694,6 +694,14 @@ class MappedOperator(AbstractOperator):
def allow_nested_operators(self) -> bool:
return bool(self.partial_kwargs.get("allow_nested_operators"))
+ @property
+ def render_template_as_native_obj(self) -> bool | None:
+ return self.partial_kwargs.get("render_template_as_native_obj")
+
+ @render_template_as_native_obj.setter
+ def render_template_as_native_obj(self, value: bool | None) -> None:
+ self.partial_kwargs["render_template_as_native_obj"] = value
+
def get_dag(self) -> DAG | None:
"""Implement Operator."""
return self.dag
diff --git a/task-sdk/tests/task_sdk/bases/test_operator.py
b/task-sdk/tests/task_sdk/bases/test_operator.py
index 85b127dfce6..9e6db88d5cf 100644
--- a/task-sdk/tests/task_sdk/bases/test_operator.py
+++ b/task-sdk/tests/task_sdk/bases/test_operator.py
@@ -672,6 +672,41 @@ class TestBaseOperator:
result = task.render_template(content, context)
assert result == expected_output
+ @pytest.mark.parametrize(
+ ("dag_native", "op_native", "content", "context", "expected_result",
"expected_type"),
+ [
+ # Operator overrides DAG
+ (False, True, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1",
"bar2"], list),
+ (True, False, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1',
'bar2']", str),
+ # Operator inherits from DAG (None = inherit)
+ (True, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1",
"bar2"], list),
+ (False, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1',
'bar2']", str),
+ # No DAG context
+ (None, None, "{{ foo }}", {"foo": ["bar1", "bar2"]}, "['bar1',
'bar2']", str),
+ (None, True, "{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1",
"bar2"], list),
+ # Native rendering preserves various types
+ (None, True, "{{ foo }}", {"foo": 42}, 42, int),
+ (None, True, "{{ foo }}", {"foo": {"key": "value"}}, {"key":
"value"}, dict),
+ (None, True, "{{ foo }}", {"foo": True}, True, bool),
+ (None, True, "{{ ds }}", {"ds": date(2018, 12, 6)}, date(2018, 12,
6), date),
+ ],
+ )
+ def test_operator_render_template_as_native_obj(
+ self, dag_native, op_native, content, context, expected_result,
expected_type
+ ):
+ """Test operator render_template_as_native_obj overrides DAG settings
and preserves types."""
+ if dag_native is not None:
+ with DAG(
+ "test-dag", schedule=None, start_date=DEFAULT_DATE,
render_template_as_native_obj=dag_native
+ ):
+ task = BaseOperator(task_id="op1",
render_template_as_native_obj=op_native)
+ else:
+ task = BaseOperator(task_id="op1",
render_template_as_native_obj=op_native)
+
+ result = task.render_template(content, context)
+ assert result == expected_result
+ assert isinstance(result, expected_type)
+
def test_render_template_fields(self):
"""Verify if operator attributes are correctly templated."""
task = MockOperator(task_id="op1", arg1="{{ foo }}", arg2="{{ bar }}")
diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
index 85b6e9c0b27..2b34fac6ea0 100644
--- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
+++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
@@ -780,6 +780,7 @@ def test_mapped_xcom_push_skipped_tasks(create_runtime_ti,
mock_supervisor_comms
("on_skipped_callback", [], [id]),
("inlets", ["a"], ["b"]),
("outlets", ["a"], ["b"]),
+ ("render_template_as_native_obj", True, False),
],
)
def test_setters(setter_name: str, old_value: object, new_value: object) ->
None: