This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 354863690f Add listeners for Dag import errors (#39739)
354863690f is described below

commit 354863690f2593d02af33c44829a99cd875f2f22
Author: pushpendu91 <[email protected]>
AuthorDate: Mon Jun 3 17:37:15 2024 +0530

    Add listeners for Dag import errors (#39739)
    
    * Adding new feature to get notification incase of import errors
    
    ---------
    
    Co-authored-by: PUSHPENDU DHARA 
<[email protected]>
---
 airflow/dag_processing/processor.py                |   7 +
 airflow/listeners/listener.py                      |   3 +-
 airflow/listeners/spec/importerrors.py             |  32 ++++
 .../administration-and-deployment/listeners.rst    |  11 +-
 tests/listeners/dag_import_error_listener.py       |  44 ++++++
 tests/listeners/test_dag_import_error_listener.py  | 166 +++++++++++++++++++++
 6 files changed, 261 insertions(+), 2 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index f813a1beb2..8df64c9f1e 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -39,6 +39,7 @@ from airflow.callbacks.callback_requests import (
 )
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, TaskNotFound
+from airflow.listeners.listener import get_listener_manager
 from airflow.models import SlaMiss
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
@@ -629,6 +630,10 @@ class DagFileProcessor(LoggingMixin):
                     {"filename": filename, "timestamp": timezone.utcnow(), 
"stacktrace": stacktrace},
                     synchronize_session="fetch",
                 )
+                # sending notification when an existing dag import error occurs
+                get_listener_manager().hook.on_existing_dag_import_error(
+                    filename=filename, stacktrace=stacktrace
+                )
             else:
                 session.add(
                     ParseImportError(
@@ -638,6 +643,8 @@ class DagFileProcessor(LoggingMixin):
                         processor_subdir=processor_subdir,
                     )
                 )
+                # sending notification when a new dag import error occurs
+                
get_listener_manager().hook.on_new_dag_import_error(filename=filename, 
stacktrace=stacktrace)
             (
                 session.query(DagModel)
                 .filter(DagModel.fileloc == filename)
diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py
index d7944aa4eb..cdabfebb75 100644
--- a/airflow/listeners/listener.py
+++ b/airflow/listeners/listener.py
@@ -37,13 +37,14 @@ class ListenerManager:
     """Manage listener registration and provides hook property for calling 
them."""
 
     def __init__(self):
-        from airflow.listeners.spec import dagrun, dataset, lifecycle, 
taskinstance
+        from airflow.listeners.spec import dagrun, dataset, importerrors, 
lifecycle, taskinstance
 
         self.pm = pluggy.PluginManager("airflow")
         self.pm.add_hookspecs(lifecycle)
         self.pm.add_hookspecs(dagrun)
         self.pm.add_hookspecs(dataset)
         self.pm.add_hookspecs(taskinstance)
+        self.pm.add_hookspecs(importerrors)
 
     @property
     def has_listeners(self) -> bool:
diff --git a/airflow/listeners/spec/importerrors.py 
b/airflow/listeners/spec/importerrors.py
new file mode 100644
index 0000000000..2cb2b4e454
--- /dev/null
+++ b/airflow/listeners/spec/importerrors.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pluggy import HookspecMarker
+
+hookspec = HookspecMarker("airflow")
+
+
+@hookspec
+def on_new_dag_import_error(filename, stacktrace):
+    """Execute when new dag import error appears."""
+
+
+@hookspec
+def on_existing_dag_import_error(filename, stacktrace):
+    """Execute when existing dag import error appears."""
diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst 
b/docs/apache-airflow/administration-and-deployment/listeners.rst
index 550278e37f..a8dbda4c5d 100644
--- a/docs/apache-airflow/administration-and-deployment/listeners.rst
+++ b/docs/apache-airflow/administration-and-deployment/listeners.rst
@@ -94,6 +94,16 @@ Dataset Events
 
 Dataset events occur when Dataset management operations are run.
 
+
+Dag Import Error Events
+-----------------------
+
+- ``on_new_dag_import_error``
+- ``on_existing_dag_import_error``
+
+Dag import error events occur when dag processor finds import error in the Dag 
code and update the metadata database table.
+
+
 |experimental|
 
 
@@ -165,7 +175,6 @@ For example if you want to implement a listener that uses 
the ``error`` field in
                 # Handle no error case here
                 pass
 
-
 List of changes in the listener interfaces since 2.8.0 when they were 
introduced:
 
 
diff --git a/tests/listeners/dag_import_error_listener.py 
b/tests/listeners/dag_import_error_listener.py
new file mode 100644
index 0000000000..a4426c1324
--- /dev/null
+++ b/tests/listeners/dag_import_error_listener.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.listeners import hookimpl
+
+new = {}
+existing = {}
+
+
+@hookimpl
+def on_new_dag_import_error(filename, stacktrace):
+    """Execute when new dag import error appears"""
+    new["filename"] = stacktrace
+    print("new error>> filename:" + str(filename))
+    print("new error>> stacktrace:" + str(stacktrace))
+
+
+@hookimpl
+def on_existing_dag_import_error(filename, stacktrace):
+    """Execute when existing dag import error appears"""
+    existing["filename"] = stacktrace
+    print("existing error>> filename:" + str(filename))
+    print("existing error>> stacktrace:" + str(stacktrace))
+
+
+def clear():
+    global new, existing
+    new, existing = {}, {}
diff --git a/tests/listeners/test_dag_import_error_listener.py 
b/tests/listeners/test_dag_import_error_listener.py
new file mode 100644
index 0000000000..0417ae24f5
--- /dev/null
+++ b/tests/listeners/test_dag_import_error_listener.py
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import sys
+from unittest import mock
+
+import pytest
+
+from airflow import settings
+from airflow.configuration import TEST_DAGS_FOLDER
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import DagModel
+from airflow.models.errors import ParseImportError
+from airflow.utils import timezone
+from tests.listeners import dag_import_error_listener
+from tests.test_utils.config import conf_vars, env_vars
+from tests.test_utils.db import (
+    clear_db_dags,
+    clear_db_import_errors,
+    clear_db_jobs,
+    clear_db_pools,
+    clear_db_runs,
+    clear_db_serialized_dags,
+    clear_db_sla_miss,
+)
+from tests.test_utils.mock_executor import MockExecutor
+
+pytestmark = pytest.mark.db_test
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+PY311 = sys.version_info >= (3, 11)
+
+# Include the words "airflow" and "dag" in the file contents,
+# tricking airflow into thinking these
+# files contain a DAG (otherwise Airflow will skip them)
+PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"'
+UNPARSEABLE_DAG_FILE_CONTENTS = "airflow DAG"
+INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = "def something():\n    return 
airflow_DAG\nsomething()"
+
+# Filename to be used for dags that are created in an ad-hoc manner and can be 
removed/
+# created at runtime
+TEMP_DAG_FILENAME = "temp_dag.py"
+
+
[email protected](scope="class")
+def disable_load_example():
+    with conf_vars({("core", "load_examples"): "false"}):
+        with env_vars({"AIRFLOW__CORE__LOAD_EXAMPLES": "false"}):
+            yield
+
+
[email protected]("disable_load_example")
+class TestDagFileProcessor:
+    @staticmethod
+    def clean_db():
+        clear_db_runs()
+        clear_db_pools()
+        clear_db_dags()
+        clear_db_sla_miss()
+        clear_db_import_errors()
+        clear_db_jobs()
+        clear_db_serialized_dags()
+
+    def setup_class(self):
+        self.clean_db()
+
+    def setup_method(self):
+        # Speed up some tests by not running the tasks, just look at what we
+        # enqueue!
+        self.null_exec = MockExecutor()
+        self.scheduler_job = None
+
+    def teardown_method(self) -> None:
+        if self.scheduler_job and 
self.scheduler_job.job_runner.processor_agent:
+            self.scheduler_job.job_runner.processor_agent.end()
+            self.scheduler_job = None
+        self.clean_db()
+
+    def _process_file(self, file_path, dag_directory, session):
+        dag_file_processor = DagFileProcessor(
+            dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
+        )
+
+        dag_file_processor.process_file(file_path, [], False, session)
+
+    def test_newly_added_import_error(self, tmp_path, session):
+        dag_import_error_listener.clear()
+        get_listener_manager().add_listener(dag_import_error_listener)
+
+        dag_file = os.path.join(TEST_DAGS_FOLDER, 
"test_example_bash_operator.py")
+        temp_dagfile = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
+        with open(dag_file) as main_dag, open(temp_dagfile, "w") as next_dag:
+            for line in main_dag:
+                next_dag.write(line)
+        # first we parse the dag
+        self._process_file(temp_dagfile, dag_directory=tmp_path, 
session=session)
+        # assert DagModel.has_import_errors is false
+        dm = session.query(DagModel).filter(DagModel.fileloc == 
temp_dagfile).first()
+        assert not dm.has_import_errors
+        # corrupt the file
+        with open(temp_dagfile, "a") as file:
+            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+        self._process_file(temp_dagfile, dag_directory=tmp_path, 
session=session)
+        import_errors = session.query(ParseImportError).all()
+
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == temp_dagfile
+        assert import_error.stacktrace
+        dm = session.query(DagModel).filter(DagModel.fileloc == 
temp_dagfile).first()
+        assert dm.has_import_errors
+
+        # Ensure the listener was notified
+        assert len(dag_import_error_listener.new) == 1
+        assert dag_import_error_listener.new["filename"] == 
import_error.stacktrace
+
+    def test_already_existing_import_error(self, tmp_path):
+        dag_import_error_listener.clear()
+        get_listener_manager().add_listener(dag_import_error_listener)
+
+        filename_to_parse = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
+        # Generate original import error
+        with open(filename_to_parse, "w") as file_to_parse:
+            file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+        session = settings.Session()
+        self._process_file(filename_to_parse, dag_directory=tmp_path, 
session=session)
+
+        import_error_1 = (
+            session.query(ParseImportError).filter(ParseImportError.filename 
== filename_to_parse).one()
+        )
+
+        # process the file multiple times
+        for _ in range(10):
+            self._process_file(filename_to_parse, dag_directory=tmp_path, 
session=session)
+
+        import_error_2 = (
+            session.query(ParseImportError).filter(ParseImportError.filename 
== filename_to_parse).one()
+        )
+
+        # assert that the ID of the import error did not change
+        assert import_error_1.id == import_error_2.id
+
+        # Ensure the listener was notified
+        assert len(dag_import_error_listener.existing) == 1
+        assert dag_import_error_listener.existing["filename"] == 
import_error_1.stacktrace
+        assert dag_import_error_listener.existing["filename"] == 
import_error_2.stacktrace

Reply via email to