ephraimbuddy commented on code in PR #63871:
URL: https://github.com/apache/airflow/pull/63871#discussion_r3241012344
##########
airflow-core/src/airflow/serialization/helpers.py:
##########
@@ -19,87 +19,96 @@
from __future__ import annotations
import contextlib
+import inspect
from typing import TYPE_CHECKING, Any
from airflow._shared.module_loading import qualname
from airflow._shared.secrets_masker import redact
from airflow._shared.template_rendering import truncate_rendered_value
from airflow.configuration import conf
-from airflow.settings import json
if TYPE_CHECKING:
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import Timetable as CoreTimetable
-def serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float:
+def serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float | bool | None:
"""
Return a serializable representation of the templated field.
- If ``templated_field`` is provided via a callable then
- return the following serialized value: ``<callable full_qualified_name>``
+ The walk has two responsibilities:
- If ``templated_field`` contains a class or instance that requires recursive
- templating, store them as strings. Otherwise simply return the field as-is.
+ 1. **Make the template_field JSON-encodable** — every container is rebuilt
+ with primitive leaves (str/int/float/bool/None), tuples and sets are
+ flattened to lists, and unsupported objects fall through to ``str()``
+ so ``json.dumps`` never raises on the result.
+ 2. **Keep the output deterministic across parses** — callables are replaced
+ with their qualified name (never the default ``<function ... at 0x...>``
+ repr), dicts are key-sorted, and (frozen)sets are sorted by element so
+ the same input always produces the same string.
"""
- def is_jsonable(x):
- try:
- json.dumps(x)
- except (TypeError, OverflowError):
- return False
- else:
- return True
-
- def translate_tuples_to_lists(obj: Any):
- """Recursively convert tuples to lists."""
- if isinstance(obj, tuple):
- return [translate_tuples_to_lists(item) for item in obj]
- if isinstance(obj, list):
- return [translate_tuples_to_lists(item) for item in obj]
- if isinstance(obj, dict):
- return {key: translate_tuples_to_lists(value) for key, value in
obj.items()}
- return obj
+ def normalize_dict_key(key) -> str:
+ """Normalize a dict key to a serialized string type."""
+ # Serialized template_field keys must all be strings, not a mix of
types, so that
+ # downstream json.dumps(..., sort_keys=True) does not raise on
mixed-type keys.
+ return str(serialize_object(key))
+
+ def serialize_object(obj):
+ """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable
structure."""
+ if obj is None or isinstance(obj, (str, int, float, bool)):
+ return obj
- def sort_dict_recursively(obj: Any) -> Any:
- """Recursively sort dictionaries to ensure consistent ordering."""
if isinstance(obj, dict):
- return {k: sort_dict_recursively(v) for k, v in
sorted(obj.items())}
- if isinstance(obj, list):
- return [sort_dict_recursively(item) for item in obj]
- if isinstance(obj, tuple):
- return tuple(sort_dict_recursively(item) for item in obj)
- return obj
+ # Serialize keys/values first so each key is a string and the
output is hash-stable,
+ # then sort by the serialized key to prevent hash inconsistencies
when dict ordering varies.
+ serialized_pairs = [(normalize_dict_key(k), serialize_object(v))
for k, v in obj.items()]
+ return dict(sorted(serialized_pairs, key=lambda kv: kv[0]))
+
+ if isinstance(obj, (list, tuple)):
+ return [serialize_object(item) for item in obj]
+
+ if isinstance(obj, (set, frozenset)):
+ # JSON has no set type → flatten to a list with deterministic
ordering
+ # so hash randomization on element types cannot shift
cross-process iteration order.
+ serialized_set = [serialize_object(e) for e in obj]
+ return sorted(serialized_set, key=lambda x: (type(x).__name__,
str(x)))
+
+ # Use inspect.getattr_static to bypass any custom __getattr__ /
metaclass magic
+ if callable(inspect.getattr_static(obj, "serialize", None)):
+ return serialize_object(obj.serialize())
+
+ # Kubernetes client objects (V1Pod, V1Container, ...) expose their
content via to_dict().
+ # Scope the branch to the kubernetes namespace so unrelated user
classes that happen to
+ # define a to_dict() method fall through to str() instead of being
treated as K8s payloads.
+ if getattr(type(obj), "__module__", "").startswith("kubernetes.") and
callable(
Review Comment:
```suggestion
if getattr(type(obj), "__module__", "").startswith(("kubernetes.",
"kubernetes_asyncio.")) and callable(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]