This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae101f59ec Skip SLA miss callback when in DB isolation mode (#41028)
ae101f59ec is described below
commit ae101f59ecc17413a7586e9a5a699bdb63e35f24
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Jul 25 17:53:12 2024 +0200
Skip SLA miss callback when in DB isolation mode (#41028)
This PR disables SLA miss callback for DB isolation mode because it
would require quite a havy refactoring (or rather rewriting from the
scratch). Currently SLA miss calculation heavily depends on the fact
that both - DB is available and DAG code is parsed
- basically that we have DAG object in memory that has the logic
that allows to calculate SLA misses while querying Airflow DB multiple
times. It's likely possible to unentangle it and make the logic separate
from the DAG object, but it's not really good idea to do it now. It's
better to disable it.
---
airflow/dag_processing/processor.py | 9 +++++++--
tests/conftest.py | 8 ++++++++
tests/dag_processing/test_processor.py | 23 +++++++++++++++++++++--
3 files changed, 36 insertions(+), 4 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 3cc2fe142b..68854dce1a 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -31,7 +31,7 @@ from setproctitle import setproctitle
from sqlalchemy import delete, event, func, or_, select
from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
+from airflow.api_internal.internal_api_call import InternalApiConfig,
internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
SlaCallbackRequest,
@@ -749,7 +749,12 @@ class DagFileProcessor(LoggingMixin):
if isinstance(request, TaskCallbackRequest):
cls._execute_task_callbacks(dagbag, request,
unit_test_mode, session=session)
elif isinstance(request, SlaCallbackRequest):
- DagFileProcessor.manage_slas(dagbag.dag_folder,
request.dag_id, session=session)
+ if InternalApiConfig.get_use_internal_api():
+ cls.logger().warning(
+ "SlaCallbacks are not supported when the
Internal API is enabled"
+ )
+ else:
+ DagFileProcessor.manage_slas(dagbag.dag_folder,
request.dag_id, session=session)
elif isinstance(request, DagCallbackRequest):
cls._execute_dag_callbacks(dagbag, request,
session=session)
except Exception:
diff --git a/tests/conftest.py b/tests/conftest.py
index b794d378f0..7acb731d00 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -235,6 +235,12 @@ def set_db_isolation_mode():
InternalApiConfig.set_use_internal_api("tests",
allow_tests_to_use_db=True)
+def skip_if_database_isolation_mode(item):
+ if os.environ.get("RUN_TESTS_WITH_DATABASE_ISOLATION", "false").lower() ==
"true":
+ for _ in item.iter_markers(name="skip_if_database_isolation_mode"):
+ pytest.skip("This test is skipped because it is not allowed in
database isolation mode.")
+
+
def pytest_addoption(parser: pytest.Parser):
"""Add options parser for custom plugins."""
group = parser.getgroup("airflow")
@@ -458,6 +464,7 @@ def pytest_configure(config: pytest.Config) -> None:
"external_python_operator: external python operator tests are 'long',
we should run them separately",
)
config.addinivalue_line("markers", "enable_redact: do not mock redact
secret masker")
+ config.addinivalue_line("markers", "skip_if_database_isolation_mode: skip
if DB isolation is enabled")
os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
@@ -697,6 +704,7 @@ def pytest_runtest_setup(item):
skip_if_platform_doesnt_match(marker)
for marker in item.iter_markers(name="backend"):
skip_if_wrong_backend(marker, item)
+ skip_if_database_isolation_mode(item)
selected_backend = item.config.option.backend
if selected_backend:
skip_if_not_marked_with_backend(selected_backend, item)
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 2cc0067cc6..0893072d0b 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -113,6 +113,7 @@ class TestDagFileProcessor:
dag_file_processor.process_file(file_path, [], False)
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag,
create_dummy_dag, get_test_dag):
"""
@@ -143,11 +144,13 @@ class TestDagFileProcessor:
mock_dagbag = mock.Mock()
mock_dagbag.get_dag.return_value = dag
mock_get_dagbag.return_value = mock_dagbag
+ session.commit()
DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
assert sla_callback.called
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_callback_invalid_sla(self,
mock_get_dagbag, create_dummy_dag):
"""
@@ -180,6 +183,7 @@ class TestDagFileProcessor:
DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
sla_callback.assert_not_called()
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_callback_sent_notification(self,
mock_get_dagbag, create_dummy_dag):
"""
@@ -225,6 +229,7 @@ class TestDagFileProcessor:
sla_callback.assert_not_called()
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.Stats.incr")
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(
@@ -272,6 +277,7 @@ class TestDagFileProcessor:
# ti is successful thereby trying to insert a duplicate record.
DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.Stats.incr")
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def
test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla(
@@ -318,11 +324,16 @@ class TestDagFileProcessor:
assert sla_miss_count == 2
mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id":
"test_sla_miss", "task_id": "dummy"})
+ @pytest.mark.skip_if_database_isolation_mode
@patch.object(DagFileProcessor, "logger")
@mock.patch("airflow.dag_processing.processor.Stats.incr")
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_callback_exception(
- self, mock_get_dagbag, mock_stats_incr, mock_get_log, create_dummy_dag
+ self,
+ mock_get_dagbag,
+ mock_stats_incr,
+ mock_get_log,
+ create_dummy_dag,
):
"""
Test that the dag file processor gracefully logs an exception if there
is a problem
@@ -372,6 +383,7 @@ class TestDagFileProcessor:
tags={"dag_id": f"test_sla_miss_{i}", "func_name":
sla_callback.__name__},
)
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.send_email")
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
@@ -407,12 +419,18 @@ class TestDagFileProcessor:
assert email1 in send_email_to
assert email2 not in send_email_to
+ @pytest.mark.skip_if_database_isolation_mode
@patch.object(DagFileProcessor, "logger")
@mock.patch("airflow.dag_processing.processor.Stats.incr")
@mock.patch("airflow.utils.email.send_email")
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_email_exception(
- self, mock_get_dagbag, mock_send_email, mock_stats_incr, mock_get_log,
create_dummy_dag
+ self,
+ mock_get_dagbag,
+ mock_send_email,
+ mock_stats_incr,
+ mock_get_log,
+ create_dummy_dag,
):
"""
Test that the dag file processor gracefully logs an exception if there
is a problem
@@ -453,6 +471,7 @@ class TestDagFileProcessor:
)
mock_stats_incr.assert_called_once_with("sla_email_notification_failure",
tags={"dag_id": dag_id})
+ @pytest.mark.skip_if_database_isolation_mode
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag,
create_dummy_dag):
"""