uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698670417
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
return _OPERATOR_EXTRA_LINKS
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+ encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_")
and v}
+ if var.weekday and var.weekday.n:
+ # Every n'th Friday for example
+ encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+ elif var.weekday:
+ encoded['weekday'] = [var.weekday.weekday]
+ return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+ if 'weekday' in var:
+ var['weekday'] = relativedelta.weekday(*var['weekday']) # type: ignore
+ return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+ """Encode a Pendulum Timezone for serialization.
+
+ Airflow only supports timezone objects that implements Pendulum's Timezone
+ interface. We try to keep as much information as possible to make
conversion
+ round-tripping possible (see ``decode_timezone``). We need to special-case
+ UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+ 0 without the special case), but passing 0 into ``pendulum.timezone`` does
+ not give us UTC (but ``+00:00``).
+ """
+ if isinstance(var, FixedTimezone):
+ if var.offset == 0:
+ return "UTC"
+ return var.offset
+ if isinstance(var, Timezone):
+ return var.name
+ raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not
{var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+ """Decode a previously serialized Pendulum Timezone."""
+ return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+ """Encode a timetable instance.
+
+ This delegates most of the serialization work to the type, so the behavior
+ can be completely controlled by a custom subclass.
+ """
+ return {"type": as_importable_string(type(var)), "value": var.serialize()}
Review comment:
I’m assuming we’re not yet set on the serialisation method because of
the class registration stuff (and I’m not sure the current method will always
work if the timetable class is declared in a DAG file), so let’s not think too
hard on this for now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]