ashb commented on code in PR #22698:
URL: https://github.com/apache/airflow/pull/22698#discussion_r844959173


##########
airflow/serialization/serialized_objects.py:
##########
@@ -670,6 +652,29 @@ def _serialize_node(cls, op: Union[BaseOperator, 
MappedOperator], include_deps:
 
         return serialize_op
 
+    @classmethod
+    def _serialize_deps(cls, op_deps: Iterable["BaseTIDep"]) -> List[str]:
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_ti_deps_plugins()
+
+        deps = []
+        for dep in op_deps:
+            klass = type(dep)
+            module_name = klass.__module__
+            qualname = f'{module_name}.{klass.__name__}'
+            if (
+                not qualname.startswith("airflow.ti_deps.deps.")
+                and not qualname in plugins_manager.registered_ti_dep_classes
+            ):
+                log.warning("Custom dep class %r not serialized, please 
register it through plugins.", qualname)

Review Comment:
   This should throw an error (which would show up as a DAG Import Error in the 
UI) as otherwise the DAG behaviour is essentially silently changed, and 
erroring here only affects the one dag file, not the whole scheduler so is 
"safe"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to