ashb commented on a change in pull request #17989:
URL: https://github.com/apache/airflow/pull/17989#discussion_r701078245



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -128,22 +122,39 @@ def decode_timezone(var: Union[str, int]) -> Timezone:
     return pendulum.timezone(var)
 
 
-def encode_timetable(var: Timetable) -> Dict[str, Any]:
+def _get_registered_timetable(importable_string: str) -> 
Optional[Type[Timetable]]:
+    from airflow import plugins_manager
+
+    if importable_string.startswith("airflow.timetables."):
+        return import_string(importable_string)
+    plugins_manager.initialize_timetables_plugins()
+    return plugins_manager.timetable_classes.get(importable_string)
+
+
+def _encode_timetable(var: Timetable) -> Dict[str, Any]:
     """Encode a timetable instance.
 
     This delegates most of the serialization work to the type, so the behavior
     can be completely controlled by a custom subclass.
     """
-    return {"type": as_importable_string(type(var)), "value": var.serialize()}
+    timetable_class = type(var)
+    importable_string = as_importable_string(timetable_class)
+    if _get_registered_timetable(importable_string) != timetable_class:
+        raise AirflowException(f"Cannot encode unregistered timetable class 
{importable_string!r}")

Review comment:
       This _might_ be the error that shows up as the import error if you try 
to use a timetable in a dag that isn't registered, so we should probably change 
the error to be more helpful to users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to