This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5b65c1737c2d4f62107809eb41bae19c3f62a320 Author: Kaxil Naik <[email protected]> AuthorDate: Fri Oct 30 10:39:31 2020 +0000 Log instead of raise an Error for unregistered OperatorLinks (#11959) Currently, if someone uses OperatorLinks that are not registered, it will break the UI when someone clicks on that DAG. This commit will instead log an error in the Webserver logs so that someone can still see the DAG in different Views (graph, tree, etc). (cherry picked from commit 44f6e6fca59596a5cdf27ca0910e86a9d8150a63) --- airflow/serialization/serialized_objects.py | 3 ++- tests/serialization/test_dag_serialization.py | 37 ++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index d959e92..c527ddf 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -477,7 +477,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization): elif _operator_link_class_path in registered_operator_link_classes: single_op_link_class = registered_operator_link_classes[_operator_link_class_path] else: - raise KeyError("Operator Link class %r not registered" % _operator_link_class_path) + log.error("Operator Link class %r not registered", _operator_link_class_path) + return {} op_predefined_extra_link = cattr.structure( data, single_op_link_class) # type: BaseOperatorLink diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 6b714a8..d999cb0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -33,7 +33,7 @@ from dateutil.relativedelta import relativedelta, FR from airflow.hooks.base_hook import BaseHook from airflow.models import DAG, Connection, DagBag, TaskInstance -from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.operators.bash_operator import BashOperator from airflow.serialization.json_schema import load_dag_schema_dict from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG @@ -560,6 +560,41 @@ class TestStringifiedDAGs(unittest.TestCase): google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name) self.assertEqual("https://www.google.com", google_link_from_plugin) + @unittest.skipIf(six.PY2, 'self.assertLogs not available for Python 2') + def test_extra_operator_links_logs_error_for_non_registered_extra_links(self): + """ + Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link, + it can still deserialize the DAG (does not error) but just logs an error + """ + + class TaskStateLink(BaseOperatorLink): + """OperatorLink not registered via Plugins nor a built-in OperatorLink""" + name = 'My Link' + + def get_link(self, operator, dttm): + return 'https://www.google.com' + + class MyOperator(BaseOperator): + """Just a DummyOperator using above defined Extra Operator Link""" + operator_extra_links = [TaskStateLink()] + + def execute(self, context): + pass + + with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1)) as dag: + MyOperator(task_id='blah') + + serialized_dag = SerializedDAG.to_dict(dag) + + with self.assertLogs("airflow.serialization.serialized_objects", level="ERROR") as log_output: + SerializedDAG.from_dict(serialized_dag) + received_logs = log_output.output[0] + expected_err_msg = ( + "Operator Link class 'tests.serialization.test_dag_serialization.TaskStateLink' " + "not registered" + ) + assert expected_err_msg in received_logs + def test_extra_serialized_field_and_multiple_operator_links(self): """ Assert extra field exists & OperatorLinks defined in Plugins and inbuilt Operator Links.
