This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 354863690f Add listeners for Dag import errors (#39739)
354863690f is described below
commit 354863690f2593d02af33c44829a99cd875f2f22
Author: pushpendu91 <[email protected]>
AuthorDate: Mon Jun 3 17:37:15 2024 +0530
Add listeners for Dag import errors (#39739)
* Adding new feature to get notification incase of import errors
---------
Co-authored-by: PUSHPENDU DHARA
<[email protected]>
---
airflow/dag_processing/processor.py | 7 +
airflow/listeners/listener.py | 3 +-
airflow/listeners/spec/importerrors.py | 32 ++++
.../administration-and-deployment/listeners.rst | 11 +-
tests/listeners/dag_import_error_listener.py | 44 ++++++
tests/listeners/test_dag_import_error_listener.py | 166 +++++++++++++++++++++
6 files changed, 261 insertions(+), 2 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index f813a1beb2..8df64c9f1e 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -39,6 +39,7 @@ from airflow.callbacks.callback_requests import (
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
+from airflow.listeners.listener import get_listener_manager
from airflow.models import SlaMiss
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
@@ -629,6 +630,10 @@ class DagFileProcessor(LoggingMixin):
{"filename": filename, "timestamp": timezone.utcnow(),
"stacktrace": stacktrace},
synchronize_session="fetch",
)
+ # sending notification when an existing dag import error occurs
+ get_listener_manager().hook.on_existing_dag_import_error(
+ filename=filename, stacktrace=stacktrace
+ )
else:
session.add(
ParseImportError(
@@ -638,6 +643,8 @@ class DagFileProcessor(LoggingMixin):
processor_subdir=processor_subdir,
)
)
+ # sending notification when a new dag import error occurs
+
get_listener_manager().hook.on_new_dag_import_error(filename=filename,
stacktrace=stacktrace)
(
session.query(DagModel)
.filter(DagModel.fileloc == filename)
diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py
index d7944aa4eb..cdabfebb75 100644
--- a/airflow/listeners/listener.py
+++ b/airflow/listeners/listener.py
@@ -37,13 +37,14 @@ class ListenerManager:
"""Manage listener registration and provides hook property for calling
them."""
def __init__(self):
- from airflow.listeners.spec import dagrun, dataset, lifecycle,
taskinstance
+ from airflow.listeners.spec import dagrun, dataset, importerrors,
lifecycle, taskinstance
self.pm = pluggy.PluginManager("airflow")
self.pm.add_hookspecs(lifecycle)
self.pm.add_hookspecs(dagrun)
self.pm.add_hookspecs(dataset)
self.pm.add_hookspecs(taskinstance)
+ self.pm.add_hookspecs(importerrors)
@property
def has_listeners(self) -> bool:
diff --git a/airflow/listeners/spec/importerrors.py
b/airflow/listeners/spec/importerrors.py
new file mode 100644
index 0000000000..2cb2b4e454
--- /dev/null
+++ b/airflow/listeners/spec/importerrors.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pluggy import HookspecMarker
+
+hookspec = HookspecMarker("airflow")
+
+
+@hookspec
+def on_new_dag_import_error(filename, stacktrace):
+ """Execute when new dag import error appears."""
+
+
+@hookspec
+def on_existing_dag_import_error(filename, stacktrace):
+ """Execute when existing dag import error appears."""
diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst
b/docs/apache-airflow/administration-and-deployment/listeners.rst
index 550278e37f..a8dbda4c5d 100644
--- a/docs/apache-airflow/administration-and-deployment/listeners.rst
+++ b/docs/apache-airflow/administration-and-deployment/listeners.rst
@@ -94,6 +94,16 @@ Dataset Events
Dataset events occur when Dataset management operations are run.
+
+Dag Import Error Events
+-----------------------
+
+- ``on_new_dag_import_error``
+- ``on_existing_dag_import_error``
+
+Dag import error events occur when dag processor finds import error in the Dag
code and update the metadata database table.
+
+
|experimental|
@@ -165,7 +175,6 @@ For example if you want to implement a listener that uses
the ``error`` field in
# Handle no error case here
pass
-
List of changes in the listener interfaces since 2.8.0 when they were
introduced:
diff --git a/tests/listeners/dag_import_error_listener.py
b/tests/listeners/dag_import_error_listener.py
new file mode 100644
index 0000000000..a4426c1324
--- /dev/null
+++ b/tests/listeners/dag_import_error_listener.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.listeners import hookimpl
+
+new = {}
+existing = {}
+
+
+@hookimpl
+def on_new_dag_import_error(filename, stacktrace):
+ """Execute when new dag import error appears"""
+ new["filename"] = stacktrace
+ print("new error>> filename:" + str(filename))
+ print("new error>> stacktrace:" + str(stacktrace))
+
+
+@hookimpl
+def on_existing_dag_import_error(filename, stacktrace):
+ """Execute when existing dag import error appears"""
+ existing["filename"] = stacktrace
+ print("existing error>> filename:" + str(filename))
+ print("existing error>> stacktrace:" + str(stacktrace))
+
+
+def clear():
+ global new, existing
+ new, existing = {}, {}
diff --git a/tests/listeners/test_dag_import_error_listener.py
b/tests/listeners/test_dag_import_error_listener.py
new file mode 100644
index 0000000000..0417ae24f5
--- /dev/null
+++ b/tests/listeners/test_dag_import_error_listener.py
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import sys
+from unittest import mock
+
+import pytest
+
+from airflow import settings
+from airflow.configuration import TEST_DAGS_FOLDER
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import DagModel
+from airflow.models.errors import ParseImportError
+from airflow.utils import timezone
+from tests.listeners import dag_import_error_listener
+from tests.test_utils.config import conf_vars, env_vars
+from tests.test_utils.db import (
+ clear_db_dags,
+ clear_db_import_errors,
+ clear_db_jobs,
+ clear_db_pools,
+ clear_db_runs,
+ clear_db_serialized_dags,
+ clear_db_sla_miss,
+)
+from tests.test_utils.mock_executor import MockExecutor
+
+pytestmark = pytest.mark.db_test
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+PY311 = sys.version_info >= (3, 11)
+
+# Include the words "airflow" and "dag" in the file contents,
+# tricking airflow into thinking these
+# files contain a DAG (otherwise Airflow will skip them)
+PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"'
+UNPARSEABLE_DAG_FILE_CONTENTS = "airflow DAG"
+INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = "def something():\n return
airflow_DAG\nsomething()"
+
+# Filename to be used for dags that are created in an ad-hoc manner and can be
removed/
+# created at runtime
+TEMP_DAG_FILENAME = "temp_dag.py"
+
+
[email protected](scope="class")
+def disable_load_example():
+ with conf_vars({("core", "load_examples"): "false"}):
+ with env_vars({"AIRFLOW__CORE__LOAD_EXAMPLES": "false"}):
+ yield
+
+
[email protected]("disable_load_example")
+class TestDagFileProcessor:
+ @staticmethod
+ def clean_db():
+ clear_db_runs()
+ clear_db_pools()
+ clear_db_dags()
+ clear_db_sla_miss()
+ clear_db_import_errors()
+ clear_db_jobs()
+ clear_db_serialized_dags()
+
+ def setup_class(self):
+ self.clean_db()
+
+ def setup_method(self):
+ # Speed up some tests by not running the tasks, just look at what we
+ # enqueue!
+ self.null_exec = MockExecutor()
+ self.scheduler_job = None
+
+ def teardown_method(self) -> None:
+ if self.scheduler_job and
self.scheduler_job.job_runner.processor_agent:
+ self.scheduler_job.job_runner.processor_agent.end()
+ self.scheduler_job = None
+ self.clean_db()
+
+ def _process_file(self, file_path, dag_directory, session):
+ dag_file_processor = DagFileProcessor(
+ dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
+ )
+
+ dag_file_processor.process_file(file_path, [], False, session)
+
+ def test_newly_added_import_error(self, tmp_path, session):
+ dag_import_error_listener.clear()
+ get_listener_manager().add_listener(dag_import_error_listener)
+
+ dag_file = os.path.join(TEST_DAGS_FOLDER,
"test_example_bash_operator.py")
+ temp_dagfile = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
+ with open(dag_file) as main_dag, open(temp_dagfile, "w") as next_dag:
+ for line in main_dag:
+ next_dag.write(line)
+ # first we parse the dag
+ self._process_file(temp_dagfile, dag_directory=tmp_path,
session=session)
+ # assert DagModel.has_import_errors is false
+ dm = session.query(DagModel).filter(DagModel.fileloc ==
temp_dagfile).first()
+ assert not dm.has_import_errors
+ # corrupt the file
+ with open(temp_dagfile, "a") as file:
+ file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+ self._process_file(temp_dagfile, dag_directory=tmp_path,
session=session)
+ import_errors = session.query(ParseImportError).all()
+
+ assert len(import_errors) == 1
+ import_error = import_errors[0]
+ assert import_error.filename == temp_dagfile
+ assert import_error.stacktrace
+ dm = session.query(DagModel).filter(DagModel.fileloc ==
temp_dagfile).first()
+ assert dm.has_import_errors
+
+ # Ensure the listener was notified
+ assert len(dag_import_error_listener.new) == 1
+ assert dag_import_error_listener.new["filename"] ==
import_error.stacktrace
+
+ def test_already_existing_import_error(self, tmp_path):
+ dag_import_error_listener.clear()
+ get_listener_manager().add_listener(dag_import_error_listener)
+
+ filename_to_parse = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
+ # Generate original import error
+ with open(filename_to_parse, "w") as file_to_parse:
+ file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+ session = settings.Session()
+ self._process_file(filename_to_parse, dag_directory=tmp_path,
session=session)
+
+ import_error_1 = (
+ session.query(ParseImportError).filter(ParseImportError.filename
== filename_to_parse).one()
+ )
+
+ # process the file multiple times
+ for _ in range(10):
+ self._process_file(filename_to_parse, dag_directory=tmp_path,
session=session)
+
+ import_error_2 = (
+ session.query(ParseImportError).filter(ParseImportError.filename
== filename_to_parse).one()
+ )
+
+ # assert that the ID of the import error did not change
+ assert import_error_1.id == import_error_2.id
+
+ # Ensure the listener was notified
+ assert len(dag_import_error_listener.existing) == 1
+ assert dag_import_error_listener.existing["filename"] ==
import_error_1.stacktrace
+ assert dag_import_error_listener.existing["filename"] ==
import_error_2.stacktrace