ferruzzi commented on code in PR #61118:
URL: https://github.com/apache/airflow/pull/61118#discussion_r2734113801
##########
airflow-core/src/airflow/serialization/decoders.py:
##########
@@ -131,6 +135,32 @@ def decode_asset_like(var: dict[str, Any]) ->
SerializedAssetBase:
raise ValueError(f"deserialization not implemented for DAT
{data_type!r}")
+def decode_deadline_alert(encoded_data: dict):
+ """
+ Decode a previously serialized deadline alert.
+
+ :meta private:
+ """
+ from datetime import timedelta
+
+ from airflow.sdk.serde import deserialize
+ from airflow.serialization.definitions.deadline import
SerializedDeadlineAlert
Review Comment:
Here and elsewhere: Why are we introducing local imports all over the
place? Isn't the project standard to use top-level imports unless we have a
reason not to (circular imports, etc)?
##########
airflow-core/src/airflow/serialization/serialized_objects.py:
##########
@@ -512,8 +513,10 @@ def serialize(
)
elif isinstance(var, DAG):
return cls._encode(DagSerialization.serialize_dag(var),
type_=DAT.DAG)
- elif isinstance(var, DeadlineAlert):
- return cls._encode(DeadlineAlert.serialize_deadline_alert(var),
type_=DAT.DEADLINE_ALERT)
+ elif isinstance(var, (DeadlineAlert, SerializedDeadlineAlert)):
Review Comment:
I'm not sure I follow why it can be either one. If you are attempting to
serialize then shouldn't it be the unserialized version coming in? Or is this
just defensive in case we try to re-serialize? I don't see this pattern in the
other elif clauses so it seems a odd enough to ask.
##########
airflow-core/tests/unit/serialization/test_serialized_objects.py:
##########
@@ -485,14 +485,16 @@ def test_serialize_deserialize_deadline_alert(reference):
callback=AsyncCallback(empty_callback_for_deadline,
kwargs=TEST_CALLBACK_KWARGS),
)
- serialized = original.serialize_deadline_alert()
+ # Use BaseSerialization like assets do
+ serialized = BaseSerialization.serialize(original)
assert serialized[Encoding.TYPE] == DAT.DEADLINE_ALERT
assert set(serialized[Encoding.VAR].keys()) == public_deadline_alert_fields
- deserialized = DeadlineAlert.deserialize_deadline_alert(serialized)
+ deserialized = BaseSerialization.deserialize(serialized)
assert deserialized.reference.serialize_reference() ==
reference.serialize_reference()
assert deserialized.interval == original.interval
- assert deserialized.callback == original.callback
+ # Callback is deserialized as SerializedDeadlineAlert, which may have
different callback representation
Review Comment:
Shouldn't serializing then deserializing return the original? Or at least
"a new object containing equal values", not the literal same object.
##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -28,3 +46,235 @@ class DeadlineAlertFields:
REFERENCE = "reference"
INTERVAL = "interval"
CALLBACK = "callback"
+
+
Review Comment:
For the most part this is all copypasta from the old ReferenceModels, other
than renaming some of the the classes and type hints? Just double checking,
let me know if there are specific changes I should take a closer look at closer
in this file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]