dimberman commented on code in PR #28454:
URL: https://github.com/apache/airflow/pull/28454#discussion_r1055774132
##########
airflow/utils/sqlalchemy.py:
##########
@@ -153,6 +156,75 @@ def process_result_value(self, value, dialect):
return BaseSerialization.deserialize(value)
+def sanitize_for_serialization(obj: V1Pod):
+ """
+ Convert pod to dict.... but *safely*.
+
+ When pod objects created with one k8s version are unpickled in a python
+ env with a more recent k8s version (in which the object attrs may have
+ changed) the unpickled obj may throw an error because the attr
+ expected on new obj may not be there on the unpickled obj.
+
+ This function still converts the pod to a dict; the only difference is
+ it populates missing attrs with None.
+
+ If obj is None, return None.
+ If obj is str, int, long, float, bool, return directly.
+ If obj is datetime.datetime, datetime.date
+ convert to string in iso8601 format.
+ If obj is list, sanitize each element in the list.
+ If obj is dict, return the dict.
+ If obj is OpenAPI model, return the properties dict.
+
+ :param obj: The data to serialize.
+ :return: The serialized form of data.
+
+ :meta private:
+ """
+ if obj is None:
+ return None
+ elif isinstance(obj, (float, bool, bytes, str, int)):
+ return obj
+ elif isinstance(obj, list):
+ return [sanitize_for_serialization(sub_obj) for sub_obj in obj]
+ elif isinstance(obj, tuple):
+ return tuple(sanitize_for_serialization(sub_obj) for sub_obj in obj)
+ elif isinstance(obj, (datetime.datetime, datetime.date)):
+ return obj.isoformat()
+
+ if isinstance(obj, dict):
+ obj_dict = obj
+ else:
+ obj_dict = {
+ obj.attribute_map[attr]: getattr(obj, attr)
+ for attr, _ in obj.openapi_types.items()
+ if getattr(obj, attr, None) is not None
+ }
+
+ return {key: sanitize_for_serialization(val) for key, val in
obj_dict.items()}
+
+
+def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None:
+ """
+ Convert pod to json and back so that pod is safe.
Review Comment:
Can you explain what you mean by "safe" in this docstring?
##########
airflow/utils/sqlalchemy.py:
##########
@@ -153,6 +156,75 @@ def process_result_value(self, value, dialect):
return BaseSerialization.deserialize(value)
+def sanitize_for_serialization(obj: V1Pod):
+ """
+ Convert pod to dict.... but *safely*.
+
+ When pod objects created with one k8s version are unpickled in a python
+ env with a more recent k8s version (in which the object attrs may have
+ changed) the unpickled obj may throw an error because the attr
+ expected on new obj may not be there on the unpickled obj.
+
+ This function still converts the pod to a dict; the only difference is
+ it populates missing attrs with None.
+
+ If obj is None, return None.
+ If obj is str, int, long, float, bool, return directly.
+ If obj is datetime.datetime, datetime.date
+ convert to string in iso8601 format.
+ If obj is list, sanitize each element in the list.
+ If obj is dict, return the dict.
+ If obj is OpenAPI model, return the properties dict.
+
+ :param obj: The data to serialize.
+ :return: The serialized form of data.
+
+ :meta private:
+ """
+ if obj is None:
+ return None
+ elif isinstance(obj, (float, bool, bytes, str, int)):
+ return obj
+ elif isinstance(obj, list):
+ return [sanitize_for_serialization(sub_obj) for sub_obj in obj]
+ elif isinstance(obj, tuple):
+ return tuple(sanitize_for_serialization(sub_obj) for sub_obj in obj)
+ elif isinstance(obj, (datetime.datetime, datetime.date)):
+ return obj.isoformat()
+
+ if isinstance(obj, dict):
+ obj_dict = obj
+ else:
+ obj_dict = {
+ obj.attribute_map[attr]: getattr(obj, attr)
+ for attr, _ in obj.openapi_types.items()
+ if getattr(obj, attr, None) is not None
Review Comment:
Can you separate this out into a a function? Might make it easier to
test/read in the future.
##########
airflow/utils/sqlalchemy.py:
##########
@@ -188,9 +260,20 @@ def process(value):
if isinstance(value, dict) and "pod_override" in value:
pod_override = value["pod_override"]
- # If pod_override was serialized with Airflow's
BaseSerialization, deserialize it
if isinstance(pod_override, dict) and
pod_override.get(Encoding.TYPE):
+ # If pod_override was serialized with Airflow's
BaseSerialization, deserialize it
value["pod_override"] =
BaseSerialization.deserialize(pod_override)
+ else:
+ # backcompat path
+ # pod_override is a V1Pod object. re-serialize it to
ensure it is
+ # not going to blow up (objects created with one k8s
version, when
+ # unpickled in an env with upgraded k8s version, may blow
up when
+ # `to_dict` is called, because openapi client code gen
calls
+ # getattr on all attrs in openapi_types for each object,
and when
+ # new attrs are added to that list, getattr will fail.
Review Comment:
Oh that is terrifying.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]