This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ae101f59ec Skip SLA miss callback when in DB isolation mode (#41028)
ae101f59ec is described below

commit ae101f59ecc17413a7586e9a5a699bdb63e35f24
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Jul 25 17:53:12 2024 +0200

    Skip SLA miss callback when in DB isolation mode (#41028)
    
    This PR disables SLA miss callback for DB isolation mode because it
    would require quite a havy refactoring (or rather rewriting from the
    scratch). Currently SLA miss calculation heavily depends on the fact
    that both - DB is available and DAG code is parsed
    - basically that we have DAG object in memory that has the logic
    that allows to calculate SLA misses while querying Airflow DB multiple
    times. It's likely possible to unentangle it and make the logic separate
    from the DAG object, but it's not really good idea to do it now. It's
    better to disable it.
---
 airflow/dag_processing/processor.py    |  9 +++++++--
 tests/conftest.py                      |  8 ++++++++
 tests/dag_processing/test_processor.py | 23 +++++++++++++++++++++--
 3 files changed, 36 insertions(+), 4 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 3cc2fe142b..68854dce1a 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -31,7 +31,7 @@ from setproctitle import setproctitle
 from sqlalchemy import delete, event, func, or_, select
 
 from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
+from airflow.api_internal.internal_api_call import InternalApiConfig, 
internal_api_call
 from airflow.callbacks.callback_requests import (
     DagCallbackRequest,
     SlaCallbackRequest,
@@ -749,7 +749,12 @@ class DagFileProcessor(LoggingMixin):
                     if isinstance(request, TaskCallbackRequest):
                         cls._execute_task_callbacks(dagbag, request, 
unit_test_mode, session=session)
                     elif isinstance(request, SlaCallbackRequest):
-                        DagFileProcessor.manage_slas(dagbag.dag_folder, 
request.dag_id, session=session)
+                        if InternalApiConfig.get_use_internal_api():
+                            cls.logger().warning(
+                                "SlaCallbacks are not supported when the 
Internal API is enabled"
+                            )
+                        else:
+                            DagFileProcessor.manage_slas(dagbag.dag_folder, 
request.dag_id, session=session)
                     elif isinstance(request, DagCallbackRequest):
                         cls._execute_dag_callbacks(dagbag, request, 
session=session)
                 except Exception:
diff --git a/tests/conftest.py b/tests/conftest.py
index b794d378f0..7acb731d00 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -235,6 +235,12 @@ def set_db_isolation_mode():
         InternalApiConfig.set_use_internal_api("tests", 
allow_tests_to_use_db=True)
 
 
+def skip_if_database_isolation_mode(item):
+    if os.environ.get("RUN_TESTS_WITH_DATABASE_ISOLATION", "false").lower() == 
"true":
+        for _ in item.iter_markers(name="skip_if_database_isolation_mode"):
+            pytest.skip("This test is skipped because it is not allowed in 
database isolation mode.")
+
+
 def pytest_addoption(parser: pytest.Parser):
     """Add options parser for custom plugins."""
     group = parser.getgroup("airflow")
@@ -458,6 +464,7 @@ def pytest_configure(config: pytest.Config) -> None:
         "external_python_operator: external python operator tests are 'long', 
we should run them separately",
     )
     config.addinivalue_line("markers", "enable_redact: do not mock redact 
secret masker")
+    config.addinivalue_line("markers", "skip_if_database_isolation_mode: skip 
if DB isolation is enabled")
 
     os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
 
@@ -697,6 +704,7 @@ def pytest_runtest_setup(item):
         skip_if_platform_doesnt_match(marker)
     for marker in item.iter_markers(name="backend"):
         skip_if_wrong_backend(marker, item)
+    skip_if_database_isolation_mode(item)
     selected_backend = item.config.option.backend
     if selected_backend:
         skip_if_not_marked_with_backend(selected_backend, item)
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 2cc0067cc6..0893072d0b 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -113,6 +113,7 @@ class TestDagFileProcessor:
 
         dag_file_processor.process_file(file_path, [], False)
 
+    @pytest.mark.skip_if_database_isolation_mode
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, 
create_dummy_dag, get_test_dag):
         """
@@ -143,11 +144,13 @@ class TestDagFileProcessor:
         mock_dagbag = mock.Mock()
         mock_dagbag.get_dag.return_value = dag
         mock_get_dagbag.return_value = mock_dagbag
+        session.commit()
 
         DagFileProcessor.manage_slas(dag_folder=dag.fileloc, 
dag_id="test_sla_miss", session=session)
 
         assert sla_callback.called
 
+    @pytest.mark.skip_if_database_isolation_mode
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_callback_invalid_sla(self, 
mock_get_dagbag, create_dummy_dag):
         """
@@ -180,6 +183,7 @@ class TestDagFileProcessor:
         DagFileProcessor.manage_slas(dag_folder=dag.fileloc, 
dag_id="test_sla_miss", session=session)
         sla_callback.assert_not_called()
 
+    @pytest.mark.skip_if_database_isolation_mode
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_callback_sent_notification(self, 
mock_get_dagbag, create_dummy_dag):
         """
@@ -225,6 +229,7 @@ class TestDagFileProcessor:
 
         sla_callback.assert_not_called()
 
+    @pytest.mark.skip_if_database_isolation_mode
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(
@@ -272,6 +277,7 @@ class TestDagFileProcessor:
         # ti is successful thereby trying to insert a duplicate record.
         DagFileProcessor.manage_slas(dag_folder=dag.fileloc, 
dag_id="test_sla_miss", session=session)
 
+    @pytest.mark.skip_if_database_isolation_mode
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def 
test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla(
@@ -318,11 +324,16 @@ class TestDagFileProcessor:
         assert sla_miss_count == 2
         mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id": 
"test_sla_miss", "task_id": "dummy"})
 
+    @pytest.mark.skip_if_database_isolation_mode
     @patch.object(DagFileProcessor, "logger")
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_callback_exception(
-        self, mock_get_dagbag, mock_stats_incr, mock_get_log, create_dummy_dag
+        self,
+        mock_get_dagbag,
+        mock_stats_incr,
+        mock_get_log,
+        create_dummy_dag,
     ):
         """
         Test that the dag file processor gracefully logs an exception if there 
is a problem
@@ -372,6 +383,7 @@ class TestDagFileProcessor:
                 tags={"dag_id": f"test_sla_miss_{i}", "func_name": 
sla_callback.__name__},
             )
 
+    @pytest.mark.skip_if_database_isolation_mode
     @mock.patch("airflow.dag_processing.processor.send_email")
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
@@ -407,12 +419,18 @@ class TestDagFileProcessor:
         assert email1 in send_email_to
         assert email2 not in send_email_to
 
+    @pytest.mark.skip_if_database_isolation_mode
     @patch.object(DagFileProcessor, "logger")
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
     @mock.patch("airflow.utils.email.send_email")
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_email_exception(
-        self, mock_get_dagbag, mock_send_email, mock_stats_incr, mock_get_log, 
create_dummy_dag
+        self,
+        mock_get_dagbag,
+        mock_send_email,
+        mock_stats_incr,
+        mock_get_log,
+        create_dummy_dag,
     ):
         """
         Test that the dag file processor gracefully logs an exception if there 
is a problem
@@ -453,6 +471,7 @@ class TestDagFileProcessor:
         )
         
mock_stats_incr.assert_called_once_with("sla_email_notification_failure", 
tags={"dag_id": dag_id})
 
+    @pytest.mark.skip_if_database_isolation_mode
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, 
create_dummy_dag):
         """

Reply via email to