This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8defa759f9f Redact secrets in rendered templates properly when
truncating it (#59566)
8defa759f9f is described below
commit 8defa759f9f569d1fe8bc2d6de9c0bb957ec505b
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Dec 22 12:32:36 2025 +0530
Redact secrets in rendered templates properly when truncating it (#59566)
---
.../src/airflow/sdk/execution_time/task_runner.py | 80 ++++++++++++++++++++--
.../task_sdk/execution_time/test_task_runner.py | 44 ++++++++++++
2 files changed, 119 insertions(+), 5 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 08c7d7da6d5..8f1e8c37da9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -784,17 +784,87 @@ def startup() -> tuple[RuntimeTaskInstance, Context,
Logger]:
return ti, ti.get_template_context(), log
+def _serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float:
+ """
+ Return a serializable representation of the templated field.
+
+ If ``templated_field`` contains a class or instance that requires recursive
+ templating, store them as strings. Otherwise simply return the field as-is.
+
+ Used sdk secrets masker to redact secrets in the serialized output.
+ """
+ import json
+
+ from airflow.sdk._shared.secrets_masker import redact
+
+ def is_jsonable(x):
+ try:
+ json.dumps(x)
+ except (TypeError, OverflowError):
+ return False
+ else:
+ return True
+
+ def translate_tuples_to_lists(obj: Any):
+ """Recursively convert tuples to lists."""
+ if isinstance(obj, tuple):
+ return [translate_tuples_to_lists(item) for item in obj]
+ if isinstance(obj, list):
+ return [translate_tuples_to_lists(item) for item in obj]
+ if isinstance(obj, dict):
+ return {key: translate_tuples_to_lists(value) for key, value in
obj.items()}
+ return obj
+
+ def sort_dict_recursively(obj: Any) -> Any:
+ """Recursively sort dictionaries to ensure consistent ordering."""
+ if isinstance(obj, dict):
+ return {k: sort_dict_recursively(v) for k, v in
sorted(obj.items())}
+ if isinstance(obj, list):
+ return [sort_dict_recursively(item) for item in obj]
+ if isinstance(obj, tuple):
+ return tuple(sort_dict_recursively(item) for item in obj)
+ return obj
+
+ max_length = conf.getint("core", "max_templated_field_length")
+
+ if not is_jsonable(template_field):
+ try:
+ serialized = template_field.serialize()
+ except AttributeError:
+ serialized = str(template_field)
+ if len(serialized) > max_length:
+ rendered = redact(serialized, name)
+ return (
+ "Truncated. You can change this behaviour in
[core]max_templated_field_length. "
+ f"{rendered[: max_length - 79]!r}... "
+ )
+ return serialized
+ if not template_field and not isinstance(template_field, tuple):
+ # Avoid unnecessary serialization steps for empty fields unless they
are tuples
+ # and need to be converted to lists
+ return template_field
+ template_field = translate_tuples_to_lists(template_field)
+ # Sort dictionaries recursively to ensure consistent string representation
+ # This prevents hash inconsistencies when dict ordering varies
+ if isinstance(template_field, dict):
+ template_field = sort_dict_recursively(template_field)
+ serialized = str(template_field)
+ if len(serialized) > max_length:
+ rendered = redact(serialized, name)
+ return (
+ "Truncated. You can change this behaviour in
[core]max_templated_field_length. "
+ f"{rendered[: max_length - 79]!r}... "
+ )
+ return template_field
+
+
def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]:
- # TODO: Port one of the following to Task SDK
- # airflow.serialization.helpers.serialize_template_field or
- # airflow.models.renderedtifields.get_serialized_template_fields
from airflow.sdk._shared.secrets_masker import redact
- from airflow.serialization.helpers import serialize_template_field
rendered_fields = {}
for field in task.template_fields:
value = getattr(task, field)
- serialized = serialize_template_field(value, field)
+ serialized = _serialize_template_field(value, field)
# Redact secrets in the task process itself before sending to API
server
# This ensures that the secrets those are registered via mask_secret()
on workers / dag processor are properly masked
# on the UI.
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 54458097cc6..9914f68e265 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2286,6 +2286,50 @@ class TestRuntimeTaskInstance:
pulled = runtime_ti.xcom_pull(key="key/slash", task_ids="dict_task")
assert pulled == "Some Value"
+ @pytest.mark.enable_redact
+ def test_rendered_templates_mask_secrets_with_truncation(self,
create_runtime_ti, mock_supervisor_comms):
+ """Test that secrets are masked before truncation when rendered fields
exceed max_templated_field_length."""
+ from airflow.sdk._shared.secrets_masker import _secrets_masker
+
+ secret_url =
"postgresql+psycopg2://username:[email protected]/testdb"
+ _secrets_masker().add_mask(secret_url, None)
+
+ class CustomOperator(BaseOperator):
+ template_fields = ("env_vars", "region")
+
+ def __init__(self, env_vars, region, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.env_vars = env_vars
+ self.region = region
+
+ def execute(self, context):
+ pass
+
+ # generate 50 env_vars to exceed default char limit of 4096 (50 * 87
chars ≈ 4350 chars)
+ env_vars = {f"TEST_URL_{i}": secret_url for i in range(50)}
+
+ task = CustomOperator(
+ task_id="test_truncation_masking",
+ env_vars=env_vars,
+ region="us-west-2",
+ )
+
+ runtime_ti = create_runtime_ti(task=task,
dag_id="test_truncation_masking_dag")
+ run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
+
+ assert (
+ call(
+ msg=SetRenderedFields(
+ rendered_fields={
+ "env_vars": "Truncated. You can change this behaviour
in [core]max_templated_field_length. \"{'TEST_URL_0': '***', 'TEST_URL_1':
'***', 'TEST_URL_10': '***', 'TEST_URL_11': '***', 'TEST_URL_12': '***',
'TEST_URL_13': '***', 'TEST_URL_14': '***', 'TEST_URL_15': '***',
'TEST_URL_16': '***', 'TEST_URL_17': '***', 'TEST_URL_18': '***',
'TEST_URL_19': '***', 'TEST_URL_2': '***', 'TEST_URL_20': '***', 'TEST_URL_21':
'***', 'TEST_URL_22': '***', 'TEST_URL_23': '***', 'TE [...]
+ "region": "us-west-2",
+ },
+ type="SetRenderedFields",
+ )
+ )
+ in mock_supervisor_comms.send.mock_calls
+ )
+
class TestXComAfterTaskExecution:
@pytest.mark.parametrize(